From 2d697fc92b012bf274e70d0f70bf7e4717a59f31 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 15 Aug 2024 08:44:36 -0700 Subject: [PATCH] tests: handle spurious EWOULDBLOCK in io_async_fd (#6776) * tests: handle spurious EWOULDBLOCK in io_async_fd ## Motivation The `io_async_fd.rs` tests contain a `drain()` function, which currently performs synchronous reads from a UDS socket until it returns `io::ErrorKind::WouldBlock` (i.e., errno `EWOULDBLOCK`/`EAGAIN`). The *intent* behind this function is to ensure that all data has been drained from the UDS socket's buffer...which is what it appears to do...on Linux. On other systems, it appears that an `EWOULDBLOCK` or `EAGAIN` may be returned before enough data has been read from the UDS socket to result in the other end being notified that the socket is now writable. In particular, this appears to be the case on illumos, where the tests using this function hang forever (see [this comment][1] on PR #6769). To my knowledge, this behavior is still POSIX-compliant --- the reader will still be notified that the socket is readable, and if it were actually doing non-blocking IO, it would continue reading upon receipt of that notification. So, relying on `EWOULDBLOCK` to indicate that the socket has been sufficiently drained appears to rely on Linux/FreeBSD behavior that isn't necessarily portable to other Unices. ## Solution This commit changes the `drain()` function to take an argument for the number of bytes *written* to the socket previously, and continue looping until it has read that many bytes, regardless of whether `EWOULDBLOCK` is returned. This should ensure that the socket is drained on all POSIX-compliant systems, and indeed, the `io_async_fd::reset_writable` and `io_async_fd::poll_fns` tests no longer hang forever on illumos. I think making this change is an appropriate solution to the test failure here, as the `drain()` function is part of the test, rather than the code in Tokio *being* tested, and (as I mentioned above) the use of blocking reads on a non-blocking socket without a mechanism to continue reading when the socket becomes readable again is not really something a real life program seems likely to do. Ensuring that all the written bytes have been read by passing in a byte count seems more faithful to what the test is actually *trying* to do here, anyway. Thanks to @jclulow for debugging what was going on here! This change was cherry-picked from commit f18d6ed7d4e0724bbe14db5519d7c80b3227a1a9 from PR #6769, so that the fix can be merged separately. [1]: https://github.com/tokio-rs/tokio/pull/6769#issuecomment-2284753794 Signed-off-by: Eliza Weisman --- tokio/tests/io_async_fd.rs | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/tokio/tests/io_async_fd.rs b/tokio/tests/io_async_fd.rs index 3ab1cebd884..f4dcfcf0927 100644 --- a/tokio/tests/io_async_fd.rs +++ b/tokio/tests/io_async_fd.rs @@ -135,15 +135,14 @@ fn socketpair() -> (FileDescriptor, FileDescriptor) { fds } -fn drain(mut fd: &FileDescriptor) { +fn drain(mut fd: &FileDescriptor, mut amt: usize) { let mut buf = [0u8; 512]; - #[allow(clippy::unused_io_amount)] - loop { + while amt > 0 { match fd.read(&mut buf[..]) { - Err(e) if e.kind() == ErrorKind::WouldBlock => break, + Err(e) if e.kind() == ErrorKind::WouldBlock => {} Ok(0) => panic!("unexpected EOF"), Err(e) => panic!("unexpected error: {:?}", e), - Ok(_) => continue, + Ok(x) => amt -= x, } } } @@ -219,10 +218,10 @@ async fn reset_writable() { let mut guard = afd_a.writable().await.unwrap(); // Write until we get a WouldBlock. This also clears the ready state. - while guard - .try_io(|_| afd_a.get_ref().write(&[0; 512][..])) - .is_ok() - {} + let mut bytes = 0; + while let Ok(Ok(amt)) = guard.try_io(|_| afd_a.get_ref().write(&[0; 512][..])) { + bytes += amt; + } // Writable state should be cleared now. let writable = afd_a.writable(); @@ -234,7 +233,7 @@ async fn reset_writable() { } // Read from the other side; we should become writable now. - drain(&b); + drain(&b, bytes); let _ = writable.await.unwrap(); } @@ -386,7 +385,10 @@ async fn poll_fns() { let afd_b = Arc::new(AsyncFd::new(b).unwrap()); // Fill up the write side of A - while afd_a.get_ref().write(&[0; 512]).is_ok() {} + let mut bytes = 0; + while let Ok(amt) = afd_a.get_ref().write(&[0; 512]) { + bytes += amt; + } let waker = TestWaker::new(); @@ -446,7 +448,7 @@ async fn poll_fns() { } // Make it writable now - drain(afd_b.get_ref()); + drain(afd_b.get_ref(), bytes); // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side) let _ = write_fut.await;