Skip to content

Commit

Permalink
Use ntex-service 3.0 (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored May 28, 2024
1 parent 766c4a8 commit 971a188
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 104 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [3.0.0] - 2024-05-28

* Use ntex-service 3.0

## [2.1.7] - 2024-05-12

* Cleanup pending transfers and deliveries on link detach
Expand Down
7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "2.1.7"
version = "3.0.0"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -24,8 +24,7 @@ default = []
frame-trace = []

[dependencies]
ntex = "1"
ntex-util = "1.0.1"
ntex = "2"
ntex-amqp-codec = "0.9"

bitflags = "2"
Expand All @@ -38,7 +37,7 @@ uuid = { version = "1", features = ["v4"] }
[dev-dependencies]
env_logger = "0.11"
rand = "0.8"
ntex = { version = "1", features = ["tokio"] }
ntex = { version = "2", features = ["tokio"] }
ntex-amqp = { path = ".", features = ["frame-trace"] }

[patch.crates-io]
Expand Down
152 changes: 71 additions & 81 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::collections::VecDeque;
use std::{cell, future::Future, marker, pin::Pin, rc::Rc, task::Context, task::Poll};
use std::{
cell, future::poll_fn, future::Future, marker, pin::Pin, rc::Rc, task::Context, task::Poll,
};

use ntex::service::{Pipeline, PipelineCall, Service, ServiceCtx};
use ntex::service::{Pipeline, PipelineBinding, PipelineCall, Service, ServiceCtx};
use ntex::time::{sleep, Millis, Sleep};
use ntex::util::{ready, BoxFuture, Either};
use ntex::util::{ready, Either};
use ntex::{io::DispatchItem, rt::spawn, task::LocalWaker};

use crate::codec::{protocol::Frame, AmqpCodec, AmqpFrame};
Expand All @@ -24,13 +26,12 @@ impl ControlQueue {
}

/// Amqp server dispatcher service.
pub(crate) struct Dispatcher<Sr, Ctl: Service<ControlFrame>> {
pub(crate) struct Dispatcher<Sr: Service<types::Message>, Ctl: Service<ControlFrame>> {
sink: Connection,
service: Pipeline<Sr>,
ctl_service: Pipeline<Ctl>,
service: PipelineBinding<Sr, types::Message>,
ctl_service: PipelineBinding<Ctl, ControlFrame>,
ctl_fut: cell::RefCell<Vec<(ControlFrame, PipelineCall<Ctl, ControlFrame>)>>,
ctl_queue: Rc<ControlQueue>,
shutdown: cell::RefCell<Option<BoxFuture<'static, ()>>>,
expire: Sleep,
idle_timeout: Millis,
}
Expand All @@ -51,17 +52,16 @@ where
Dispatcher {
sink,
idle_timeout,
service,
ctl_service,
ctl_queue,
service: service.bind(),
ctl_service: ctl_service.bind(),
ctl_fut: cell::RefCell::new(Vec::new()),
shutdown: cell::RefCell::new(None),
expire: sleep(idle_timeout),
}
}

fn call_control_service(&self, frame: ControlFrame) {
let fut = self.ctl_service.call_static(frame.clone());
let fut = self.ctl_service.call(frame.clone());
self.ctl_fut.borrow_mut().push((frame, fut));
self.ctl_queue.waker.wake();
}
Expand Down Expand Up @@ -152,7 +152,7 @@ where
let frm = frm.clone();
let fut = self
.service
.call_static(types::Message::Attached(frm.clone(), link.clone()));
.call(types::Message::Attached(frm.clone(), link.clone()));
let _ = ntex::rt::spawn(async move {
let result = fut.await;
if let Err(err) = result {
Expand Down Expand Up @@ -200,80 +200,72 @@ where
type Response = Option<AmqpFrame>;
type Error = AmqpDispatcherError;

fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.ctl_queue.waker.register(cx.waker());
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
poll_fn(|cx| {
self.ctl_queue.waker.register(cx.waker());

// idle timeout
self.handle_idle_timeout(cx);
// idle timeout
self.handle_idle_timeout(cx);

// process control frame
let mut control_fut_pending = !self.handle_control_fut(cx)?;
// process control frame
let mut control_fut_pending = !self.handle_control_fut(cx)?;

// check readiness
let service_poll = self.service.poll_ready(cx).map_err(|err| {
let err = Error::from(err);
log::error!(
"{}: Publish service readiness check failed: {:?}",
self.sink.tag(),
err
);
let _ = self.sink.close_with_error(err);
AmqpDispatcherError::Service
})?;
// check readiness
let service_poll = self.service.poll_ready(cx).map_err(|err| {
let err = Error::from(err);
log::error!(
"{}: Publish service readiness check failed: {:?}",
self.sink.tag(),
err
);
let _ = self.sink.close_with_error(err);
AmqpDispatcherError::Service
})?;

let ctl_service_poll = self.ctl_service.poll_ready(cx).map_err(|err| {
let err = Error::from(err);
log::error!(
"{}: Control service readiness check failed: {:?}",
self.sink.tag(),
err
);
let _ = self.sink.close_with_error(err);
AmqpDispatcherError::Service
})?;
let ctl_service_poll = self.ctl_service.poll_ready(cx).map_err(|err| {
let err = Error::from(err);
log::error!(
"{}: Control service readiness check failed: {:?}",
self.sink.tag(),
err
);
let _ = self.sink.close_with_error(err);
AmqpDispatcherError::Service
})?;

// enqueue pending control frames
if ctl_service_poll.is_ready() && !self.ctl_queue.pending.borrow().is_empty() {
self.ctl_queue
.pending
.borrow_mut()
.drain(..)
.for_each(|frame| {
self.call_control_service(frame);
});
control_fut_pending = true;
}
// enqueue pending control frames
if ctl_service_poll.is_ready() && !self.ctl_queue.pending.borrow().is_empty() {
self.ctl_queue
.pending
.borrow_mut()
.drain(..)
.for_each(|frame| {
self.call_control_service(frame);
});
control_fut_pending = true;
}

if control_fut_pending || service_poll.is_pending() || ctl_service_poll.is_pending() {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
if control_fut_pending || service_poll.is_pending() || ctl_service_poll.is_pending() {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
})
.await
}

fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
let mut shutdown = self.shutdown.borrow_mut();
if !shutdown.is_some() {
self.sink
.0
.get_mut()
.set_error(AmqpProtocolError::Disconnected);
let fut = self
.ctl_service
.call_static(ControlFrame::new_kind(ControlFrameKind::Closed));
*shutdown = Some(Box::pin(async move {
let _ = fut.await;
}));
}
async fn shutdown(&self) {
self.sink
.0
.get_mut()
.set_error(AmqpProtocolError::Disconnected);
let _ = self
.ctl_service
.call(ControlFrame::new_kind(ControlFrameKind::Closed))
.await;

let res0 = shutdown.as_mut().expect("guard above").as_mut().poll(cx);
let res1 = self.service.poll_shutdown(cx);
let res2 = self.ctl_service.poll_shutdown(cx);
if res0.is_pending() || res1.is_pending() || res2.is_pending() {
Poll::Pending
} else {
Poll::Ready(())
}
self.service.shutdown().await;
self.ctl_service.shutdown().await;
}

async fn call(
Expand Down Expand Up @@ -334,7 +326,7 @@ where
}
types::Action::DetachReceiver(link, frm) => {
let lnk = link.clone();
let fut = self.service.call_static(types::Message::Detached(lnk));
let fut = self.service.call(types::Message::Detached(lnk));
let _ = spawn(async move {
let _ = fut.await;
});
Expand All @@ -352,9 +344,7 @@ where
})
.collect();

let fut = self
.service
.call_static(types::Message::DetachedAll(receivers));
let fut = self.service.call(types::Message::DetachedAll(receivers));
let _ = spawn(async move {
let _ = fut.await;
});
Expand Down
37 changes: 22 additions & 15 deletions src/router.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{future::poll_fn, marker, rc::Rc};
use std::{marker, rc::Rc};

use ntex::router::{IntoPattern, Router as PatternRouter};
use ntex::service::{
Expand Down Expand Up @@ -141,34 +141,41 @@ impl<S: 'static> Service<Message> for RouterService<S> {
log::trace!("Releasing handler service for {}", link.name());
let name = link.name().clone();
let _ = ntex::rt::spawn(async move {
poll_fn(move |cx| srv.poll_shutdown(cx)).await;
srv.shutdown().await;
log::trace!("Handler service for {} has shutdown", name);
});
}
Ok(())
}
Message::DetachedAll(links) => {
let futs: Vec<_> = links
let links: Vec<_> = links
.into_iter()
.filter_map(|link| {
self.0.get_mut().handlers.remove(&link).and_then(|srv| {
srv.map(|srv| {
log::trace!(
"Releasing handler service for {} (session ended)",
link.name()
);
poll_fn(move |cx| srv.poll_shutdown(cx))
})
})
self.0
.get_mut()
.handlers
.remove(&link)
.and_then(move |srv| srv.map(|srv| (link, srv)))
})
.collect();

log::trace!(
"Shutting down {} handler services (session ended)",
futs.len()
links.len()
);

let _ = ntex::rt::spawn(async move {
let futs: Vec<_> = links
.iter()
.map(|(link, srv)| {
log::trace!(
"Releasing handler service for {} (session ended)",
link.name()
);
srv.shutdown()
})
.collect();

let len = futs.len();
let _ = join_all(futs).await;
log::trace!(
Expand Down Expand Up @@ -279,8 +286,8 @@ where
type Response = Outcome;
type Error = Error;

ntex::forward_poll_ready!(service);
ntex::forward_poll_shutdown!(service);
ntex::forward_ready!(service);
ntex::forward_shutdown!(service);

async fn call(
&self,
Expand Down
23 changes: 19 additions & 4 deletions src/server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,15 @@ where
type Response = ();
type Error = ServerError<H::Error>;

ntex::forward_poll_ready!(handshake, ServerError::Service);
ntex::forward_poll_shutdown!(handshake);
#[inline]
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.handshake.ready().await.map_err(ServerError::Service)
}

#[inline]
async fn shutdown(&self) {
self.handshake.shutdown().await
}

async fn call(
&self,
Expand All @@ -256,9 +263,17 @@ where
type Response = ();
type Error = ServerError<H::Error>;

ntex::forward_poll_ready!(handshake, ServerError::Service);
ntex::forward_poll_shutdown!(handshake);
#[inline]
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.handshake.ready().await.map_err(ServerError::Service)
}

#[inline]
async fn shutdown(&self) {
self.handshake.shutdown().await
}

#[inline]
async fn call(
&self,
req: IoBoxed,
Expand Down

0 comments on commit 971a188

Please sign in to comment.