From ae000732a2a9f337078abccaff70dea545711f29 Mon Sep 17 00:00:00 2001 From: Lucas Colombo Date: Sun, 2 Jun 2024 01:08:20 -0300 Subject: [PATCH] =?UTF-8?q?feat:=20=E2=9C=A8=20connection=20id?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 27 +++++++++++++++------------ Cargo.toml | 21 +++++++++------------ examples/socket.rs | 2 ++ lib/socket/server.rs | 17 ++++++++++++----- 4 files changed, 38 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 35e4733..c4b8d8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1644,9 +1644,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.12.4" +version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0f5d036824e4761737860779c906171497f6d55681139d8312388f8fe398922" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" dependencies = [ "bytes", "prost-derive", @@ -1675,9 +1675,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.12.4" +version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19de2de2a00075bf566bee3bd4db014b11587e84184d3f7a791bc17f1a8e9e48" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", "itertools", @@ -1762,9 +1762,9 @@ dependencies = [ [[package]] name = "redis" -version = "0.25.3" +version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6472825949c09872e8f2c50bde59fcefc17748b6be5c90fd67cd8b4daca73bfd" +checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" dependencies = [ "async-trait", "bytes", @@ -1949,6 +1949,7 @@ dependencies = [ "tokio-util", "tonic", "tonic-build", + "uuid", ] [[package]] @@ -2696,9 +2697,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.37.0" +version = "1.38.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" +checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" dependencies = [ "backtrace", "bytes", @@ -2723,9 +2724,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", @@ -2745,9 +2746,9 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.21.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +checksum = "d46baf930138837d65e25e3b33be49c9228579a6135dbf756b5cb9e4283e7cef" dependencies = [ "futures-util", "log", @@ -3013,6 +3014,8 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" dependencies = [ + "getrandom", + "rand", "serde", ] diff --git a/Cargo.toml b/Cargo.toml index 3a0ade6..a973a9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,17 +15,17 @@ path = "lib/lib.rs" # utils eyre = { version = "0.6.12", default-features = false } dotenvy = "0.15.7" -chrono = "0.4.37" +chrono = "0.4.38" getset = "0.1.2" # async -tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread"] } -async-trait = "0.1.79" +tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] } +async-trait = "0.1.80" # grpc & websocket -tokio-tungstenite = { version = "0.21.0" } +tokio-tungstenite = { version = "0.22.0" } tonic = "0.11.0" -prost = "0.12.3" # protocol buffers +prost = "0.12.6" # protocol buffers futures = "0.3.30" tokio-util = "0.7.11" @@ -39,22 +39,19 @@ sea-orm-migration = { version = "0.12.15", features = [ "runtime-tokio-native-tls", "sqlx-sqlite", ] } -redis = { version = "0.25.3", features = ["tokio-comp"] } +redis = { version = "0.25.4", features = ["tokio-comp"] } # other serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.117" - -[dependencies.lool] -version = "^0.3.2" # crates: disable-check -registry = "lugit" -features = [ +uuid = { version = "1.8.0", features = ["v4", "fast-rng"] } +lool = { version = "^0.3.2", registry = "lugit", features = [ "cli.stylize", "logger", "sched.tokio", "sched.rule-recurrence", "macros", -] +] } [build-dependencies] tonic-build = "0.11.0" diff --git a/examples/socket.rs b/examples/socket.rs index ddaeb84..07b361b 100644 --- a/examples/socket.rs +++ b/examples/socket.rs @@ -22,9 +22,11 @@ impl EventDispatcher for Dispatcher { event: String, data: event::Data, outgoing: Arc>, + conn_id: String, ) -> Result<()> { info!("Event: {}", event); info!("Data: {:?}", data); + info!("Connection ID: {}", conn_id); let mut sx = bus::redis::subscriber::(&"redis://127.0.0.1/").await?; let mut quote_feed = sx.stream().await?; diff --git a/lib/socket/server.rs b/lib/socket/server.rs index 81df8e0..bc50571 100644 --- a/lib/socket/server.rs +++ b/lib/socket/server.rs @@ -32,6 +32,7 @@ pub trait EventDispatcher: Send { event: String, data: event::Data, outgoing: Arc>, + conn_id: String, ) -> Result<()>; } @@ -101,11 +102,12 @@ where if let Ok(ws_stream) = ws_stream { stats.inc_current_clients(); - let stats = stats.clone(); + let conn_id = uuid::Uuid::new_v4(); + tokio::spawn(async move { - match Server::handle_connection(ws_stream, dispatcher).await { - Ok(_) => info!("Connection closed"), + match Server::handle_connection(ws_stream, dispatcher, conn_id).await { + Ok(_) => info!("Connection {} closed", conn_id), Err(e) => error!("Error handling connection: {:?}", e), }; @@ -123,12 +125,14 @@ where async fn handle_connection( stream: WebSocketStream, event_dispatcher: ED, + conn_id: uuid::Uuid, ) -> Result<()> { let (outgoing, mut incoming) = stream.split(); let synced_outgoing = Arc::new(Mutex::new(outgoing)); while let Some(msg) = incoming.next().await { - Server::handle_message(msg?, &event_dispatcher, synced_outgoing.clone()).await?; + Server::handle_message(msg?, &event_dispatcher, synced_outgoing.clone(), conn_id) + .await?; } Ok(()) @@ -139,11 +143,14 @@ where msg: Message, event_dispatcher: &ED, outgoing: Arc>, + conn_id: uuid::Uuid, ) -> Result { if msg.is_text() || msg.is_binary() { if let Ok(event) = serde_json::from_str::(&msg.to_string()) { let outgoing = Arc::clone(&outgoing); - let result = event_dispatcher.dispatch(event.event, event.data, outgoing).await; + let result = event_dispatcher + .dispatch(event.event, event.data, outgoing, conn_id.into()) + .await; match result { Ok(_) => {}