From 2d16c6230f8b48a71dd195f14cbc8f0ae77b9199 Mon Sep 17 00:00:00 2001 From: Lucas Colombo Date: Sun, 2 Jun 2024 09:08:41 -0300 Subject: [PATCH] =?UTF-8?q?feat:=20=E2=9C=A8=20add=20conn=20cancellation?= =?UTF-8?q?=20token?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/socket.rs | 5 ++++- lib/socket/server.rs | 41 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/examples/socket.rs b/examples/socket.rs index 07b361b..abdad4f 100644 --- a/examples/socket.rs +++ b/examples/socket.rs @@ -6,7 +6,9 @@ use { rustler_core::{ bus::{self, SubscriberTrait}, rustlers::Quote, - socket::{self, event, Error, EventDispatcher, Outgoing, Request, Response}, + socket::{ + self, event, CancellationToken, Error, EventDispatcher, Outgoing, Request, Response, + }, }, std::sync::Arc, tokio::{join, sync::Mutex}, @@ -23,6 +25,7 @@ impl EventDispatcher for Dispatcher { data: event::Data, outgoing: Arc>, conn_id: String, + _cancel_token: CancellationToken, ) -> Result<()> { info!("Event: {}", event); info!("Data: {:?}", data); diff --git a/lib/socket/server.rs b/lib/socket/server.rs index bc50571..55e554d 100644 --- a/lib/socket/server.rs +++ b/lib/socket/server.rs @@ -21,6 +21,7 @@ pub use { }, WebSocketStream, }, + tokio_util::sync::CancellationToken, }; pub type Outgoing = SplitSink, Message>; @@ -33,6 +34,7 @@ pub trait EventDispatcher: Send { data: event::Data, outgoing: Arc>, conn_id: String, + cancel_token: CancellationToken, ) -> Result<()>; } @@ -105,9 +107,24 @@ where let stats = stats.clone(); let conn_id = uuid::Uuid::new_v4(); + // main cancellation token that will be used to cancel the connection + let cancel_tkn = CancellationToken::new(); + tokio::spawn(async move { - match Server::handle_connection(ws_stream, dispatcher, conn_id).await { - Ok(_) => info!("Connection {} closed", conn_id), + match Server::handle_connection( + ws_stream, + dispatcher, + conn_id, + // each connection will have a child token (child tokens can't cancel parent + // tokens but are cancelled when the parent token is cancelled) + cancel_tkn.child_token(), + ) + .await + { + Ok(_) => { + cancel_tkn.cancel(); + info!("Connection {} closed", conn_id); + } Err(e) => error!("Error handling connection: {:?}", e), }; @@ -126,13 +143,20 @@ where stream: WebSocketStream, event_dispatcher: ED, conn_id: uuid::Uuid, + cancel_sgn: CancellationToken, ) -> Result<()> { let (outgoing, mut incoming) = stream.split(); let synced_outgoing = Arc::new(Mutex::new(outgoing)); while let Some(msg) = incoming.next().await { - Server::handle_message(msg?, &event_dispatcher, synced_outgoing.clone(), conn_id) - .await?; + Server::handle_message( + msg?, + &event_dispatcher, + synced_outgoing.clone(), + conn_id, + cancel_sgn.clone(), + ) + .await?; } Ok(()) @@ -144,12 +168,19 @@ where event_dispatcher: &ED, outgoing: Arc>, conn_id: uuid::Uuid, + cancel_sgn: CancellationToken, ) -> Result { if msg.is_text() || msg.is_binary() { if let Ok(event) = serde_json::from_str::(&msg.to_string()) { let outgoing = Arc::clone(&outgoing); let result = event_dispatcher - .dispatch(event.event, event.data, outgoing, conn_id.into()) + .dispatch( + event.event, + event.data, + outgoing, + conn_id.into(), + cancel_sgn, + ) .await; match result {