Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix send race by handling logs in the main thread when async #1831

Open
wants to merge 9 commits into
base: async
Choose a base branch
from

Conversation

technillogue
Copy link
Contributor

@technillogue technillogue commented Jul 26, 2024

If events can be written from an event loop and from a thread, some race conditions can occur. This PR tried to fix this by conditionally moving the stream redirector into the main thread when we switch to async, and registering the file descriptors with the main select loop instead of using epoll in a thread.


This solves a similar problem as #1758: in some cases, currently emitting a metric or yielding an output can result in an EAGAIN error, and we believe Done events are sometimes dropped. In async land, we shouldn't need a lock; multiple writers should be able to write to a StreamWriter safely. We also keep a sync Lock when the predictor is not async.

However, I'm still not sure if StreamWriter is thread-safe. After some research, StreamWriter.write calls self._transport.write and when StreamWriter is wrapping a unix socket and using the default selector, the _transport is a _SelectorSocketTransport, which calls [self._buffer.extend](https://github.com/python/cpython/blob/3.11/Lib/asyncio/selector_events.py#L1077), and self._buffer is a [bytearray](https://github.com/python/cpython/blob/3.11/Lib/asyncio/selector_events.py#L761). However, on 3.12, [append is called](https://github.com/python/cpython/blob/3.12/Lib/asyncio/selector_events.py#L1091) instead of extend.

After more soul-searching, I think write should be basically thread-safe: the two critical calls, socket.send() and bytearray.append/extend should be represented as a single bytecode instruction and some native code which works on python objects and does not release the GIL. As far as I understand, python threads are switched "between bytecodes". Even though the individual calls are thread-safe, the overall write method is not quite thread safe: a relevant thread switch could occur between the if not self._buffer, _sock.send, and _buffer.extend lines. However, in that case the race condition would result data could being sent out of order or incorrectly delayed/sent early, but it I think it shouldn't be corrupted, and we don't really care about the ordering of log lines vs outputs.

I can see two solutions:

  1. Ideally, we would probably add an alternate implementation of StreamWriter that uses the main event loop and wraps stderr/stdout in StreamReaders, start a separate task for each stream, and use await wrapped_stream.read() so that threads are not necessary. there's a tricky moment where we need to use the threaded StreamRedirector to capture logs while the predictor is being imported, since we don't know if we're going to be async or not.
  2. Alternatively, we could try to use a queue or deque to communicate between threads, so instead of calling stream_write_hook StreamRedirector would do queue.put, the main event loop would do queue.get and not worry about thread safety for the pipe. The problem with this is that asyncio.Queue is not thread-safe and queue.Queue could block the event loop. You could use a busy wait or similar and get_nowait, but that has other downsides.

previously, AsyncConnection would only be used in _loop_async, and _events.send would always be used, which usually immediately makes a write(2) syscall. in contrast, AsyncConnection has a StreamWriter, which should be safe to call from different coroutines.
@technillogue technillogue force-pushed the syl/cleaner-review-fix-send-race-async branch from 1f08a8b to 5417f10 Compare July 26, 2024 04:16
@nickstenning
Copy link
Member

Without a failing test to demonstrate the problem you're trying to solve, I really don't know how to evaluate this. Based on my limited understanding, this could fix the problem, but it could also not. It could also introduce new bugs that are even worse than the one we're trying to fix. I just don't have any framework to evaluate the change.

If we can't reproduce the bug in a test, I do wonder whether we really understand what the bug we're chasing even is.

@technillogue
Copy link
Contributor Author

thanks, that's very reasonable, I'll get on a test

Comment on lines +28 to +31
# we don't want to see EAGAIN, we'd rather wait
# however, perhaps this is wrong and in some cases this could still block terribly
# sock.setblocking(False)
sock.setblocking(True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having trouble understanding this in the context of the commented out code. Are those concerns for setblocking(True)? It'd be nice for this comment to provide enough relevant context for someone to pick this up if we need to revisit this behavior.

@mattt mattt mentioned this pull request Jul 31, 2024
@technillogue technillogue changed the title fix send race [review] fix send race Oct 22, 2024
@technillogue technillogue changed the title fix send race fix send race by handling logs in the main thread when async Oct 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants