Skip to content

Commit

Permalink
Remove infinite loop
Browse files Browse the repository at this point in the history
  • Loading branch information
Sytten committed Oct 26, 2024
1 parent 23e5af1 commit 1534c28
Showing 1 changed file with 24 additions and 29 deletions.
53 changes: 24 additions & 29 deletions src/next/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use super::{
/// This type implements `IntoFuture` and should usually be spawned
/// with an async runtime.
pub struct ConnectionActor {
client: Option<async_channel::Receiver<ConnectionCommand>>,
client: async_channel::Receiver<ConnectionCommand>,
connection: Box<dyn ObjectSafeConnection>,
operations: HashMap<usize, async_channel::Sender<Value>>,
keep_alive: KeepAliveSettings,
Expand All @@ -39,7 +39,7 @@ impl ConnectionActor {
keep_alive: KeepAliveSettings,
) -> Self {
ConnectionActor {
client: Some(client),
client,
connection,
operations: HashMap::new(),
keep_alive_actor: Box::pin(keep_alive.run()),
Expand Down Expand Up @@ -160,39 +160,34 @@ impl ConnectionActor {
Message(Option<Message>),
KeepAlive(Option<ConnectionCommand>),
}
loop {
if let Some(client) = &mut self.client {
let command = async { Select::Command(client.recv().await.ok()) };
let message = async { Select::Message(self.connection.receive().await) };
let keep_alive = async { Select::KeepAlive(self.keep_alive_actor.next().await) };

match command.or(message).or(keep_alive).await {
Select::Command(Some(command)) | Select::KeepAlive(Some(command)) => {
return Some(Next::Command(command));
}
Select::Command(None) => {
self.client.take();
continue;
}
Select::Message(message) => {
self.keep_alive_actor = Box::pin(self.keep_alive.run());
return Some(Next::Message(message?));
}
Select::KeepAlive(None) => {
return Some(self.keep_alive.report_timeout());
}
}
}

if self.operations.is_empty() {
// If client has disconnected and we have no running operations
// then we should shut down
return None;
let command = async { Select::Command(self.client.recv().await.ok()) };
let message = async { Select::Message(self.connection.receive().await) };
let keep_alive = async { Select::KeepAlive(self.keep_alive_actor.next().await) };

match command.or(message).or(keep_alive).await {
Select::Command(Some(command)) | Select::KeepAlive(Some(command)) => {
Some(Next::Command(command))
}
Select::Command(None) => {
// All clients have disconnected
None
}
Select::Message(message) => {
self.keep_alive_actor = Box::pin(self.keep_alive.run());
Some(Next::Message(message?))
}
Select::KeepAlive(None) => Some(self.keep_alive.report_timeout()),
}
}
}

impl Drop for ConnectionActor {
fn drop(&mut self) {
println!("Dropping ConnectionActor");
}
}

enum Next {
Command(ConnectionCommand),
Message(Message),
Expand Down

0 comments on commit 1534c28

Please sign in to comment.