diff --git a/.vscode/settings.json b/.vscode/settings.json index 0db21e3..b39ce23 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -11,7 +11,7 @@ ".task": true, ".cargo": true, // ".github": true, - "rustfmt.toml": true, + // "rustfmt.toml": true, // "**/**/Cargo.toml": true, // ๐Ÿ“ฆ diff --git a/Cargo.lock b/Cargo.lock index 3258f57..23ba3f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -137,46 +137,6 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" -[[package]] -name = "async-channel" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" -dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", -] - -[[package]] -name = "async-io" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" -dependencies = [ - "async-lock", - "autocfg", - "cfg-if", - "concurrent-queue", - "futures-lite", - "log", - "parking", - "polling", - "rustix 0.37.27", - "slab", - "socket2 0.4.10", - "waker-fn", -] - -[[package]] -name = "async-lock" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" -dependencies = [ - "event-listener", -] - [[package]] name = "async-stream" version = "0.3.5" @@ -501,15 +461,6 @@ dependencies = [ "tokio-util", ] -[[package]] -name = "concurrent-queue" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "const-oid" version = "0.9.6" @@ -689,15 +640,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "fastrand" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" -dependencies = [ - "instant", -] - [[package]] name = "fastrand" version = "2.0.2" @@ -803,7 +745,6 @@ dependencies = [ "futures-core", "futures-task", "futures-util", - "num_cpus", ] [[package]] @@ -823,21 +764,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" -[[package]] -name = "futures-lite" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" -dependencies = [ - "fastrand 1.9.0", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - [[package]] name = "futures-macro" version = "0.3.30" @@ -861,18 +787,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" -[[package]] -name = "futures-time" -version = "3.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6404853a6824881fe5f7d662d147dc4e84ecd2259ba0378f272a71dab600758a" -dependencies = [ - "async-channel", - "async-io", - "futures-core", - "pin-project-lite", -] - [[package]] name = "futures-util" version = "0.3.30" @@ -937,16 +851,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] -name = "gloo-timers" -version = "0.2.6" +name = "glob-match" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" -dependencies = [ - "futures-channel", - "futures-core", - "js-sys", - "wasm-bindgen", -] +checksum = "9985c9503b412198aa4197559e9a318524ebc4519c229bfa05a535828c950b9d" [[package]] name = "h2" @@ -1111,7 +1019,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.6", + "socket2", "tokio", "tower-service", "tracing", @@ -1200,26 +1108,6 @@ dependencies = [ "syn 2.0.60", ] -[[package]] -name = "instant" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" -dependencies = [ - "cfg-if", -] - -[[package]] -name = "io-lifetimes" -version = "1.0.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" -dependencies = [ - "hermit-abi", - "libc", - "windows-sys 0.48.0", -] - [[package]] name = "itertools" version = "0.12.1" @@ -1276,12 +1164,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "linux-raw-sys" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" - [[package]] name = "linux-raw-sys" version = "0.4.13" @@ -1306,13 +1188,14 @@ checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" [[package]] name = "lool" -version = "0.2.1" +version = "0.3.2" source = "sparse+http://lugit.local/api/packages/lucodear/cargo/" -checksum = "79d0803f1329280a3e66cf9a0aad94d5a012514eb2072e6be9ea5a4b645ac2d7" +checksum = "749ebc401b1a8a0bcc99daf6172f1f8e2ede7d6658150937922033208bbd28e4" dependencies = [ "bitflags 2.5.0", "chrono", "eyre", + "glob-match", "log", "num-traits", "tokio", @@ -1582,12 +1465,6 @@ dependencies = [ "syn 2.0.60", ] -[[package]] -name = "parking" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" - [[package]] name = "parking_lot" version = "0.12.1" @@ -1701,22 +1578,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" -[[package]] -name = "polling" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" -dependencies = [ - "autocfg", - "bitflags 1.3.2", - "cfg-if", - "concurrent-queue", - "libc", - "log", - "pin-project-lite", - "windows-sys 0.48.0", -] - [[package]] name = "powerfmt" version = "0.2.0" @@ -1914,7 +1775,7 @@ dependencies = [ "pin-project-lite", "ryu", "sha1_smol", - "socket2 0.5.6", + "socket2", "tokio", "tokio-util", "url", @@ -2053,20 +1914,6 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" -[[package]] -name = "rustix" -version = "0.37.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" -dependencies = [ - "bitflags 1.3.2", - "errno", - "io-lifetimes", - "libc", - "linux-raw-sys 0.3.8", - "windows-sys 0.48.0", -] - [[package]] name = "rustix" version = "0.38.34" @@ -2076,7 +1923,7 @@ dependencies = [ "bitflags 2.5.0", "errno", "libc", - "linux-raw-sys 0.4.13", + "linux-raw-sys", "windows-sys 0.52.0", ] @@ -2093,11 +1940,13 @@ dependencies = [ "lool", "prost", "redis", - "rxrust", "sea-orm", "sea-orm-migration", + "serde", + "serde_json", "tokio", "tokio-tungstenite", + "tokio-util", "tonic", "tonic-build", ] @@ -2108,21 +1957,6 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80af6f9131f277a45a3fba6ce8e2258037bb0477a67e610d3c1fe046ab31de47" -[[package]] -name = "rxrust" -version = "1.0.0-beta.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9a529052da83e8897bb1f40459a0837b3c048eafabfae3bf1e7c050146b8141" -dependencies = [ - "futures", - "futures-time", - "gloo-timers", - "once_cell", - "pin-project-lite", - "smallvec", - "wasm-bindgen-futures", -] - [[package]] name = "ryu" version = "1.0.17" @@ -2334,18 +2168,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.198" +version = "1.0.203" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9846a40c979031340571da2545a4e5b7c4163bdae79b301d5f86d03979451fcc" +checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.198" +version = "1.0.203" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" +checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" dependencies = [ "proc-macro2", "quote", @@ -2354,9 +2188,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.116" +version = "1.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e17db7126d17feb94eb3fad46bf1a96b034e8aacbc2e775fe81505f8b0b2813" +checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" dependencies = [ "itoa", "ryu", @@ -2431,16 +2265,6 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" -[[package]] -name = "socket2" -version = "0.4.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "socket2" version = "0.5.6" @@ -2789,8 +2613,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ "cfg-if", - "fastrand 2.0.2", - "rustix 0.38.34", + "fastrand", + "rustix", "windows-sys 0.52.0", ] @@ -2882,7 +2706,7 @@ dependencies = [ "mio", "num_cpus", "pin-project-lite", - "socket2 0.5.6", + "socket2", "tokio-macros", "windows-sys 0.48.0", ] @@ -2933,16 +2757,15 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.10" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", "futures-sink", "pin-project-lite", "tokio", - "tracing", ] [[package]] @@ -3205,12 +3028,6 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" -[[package]] -name = "waker-fn" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" - [[package]] name = "want" version = "0.3.1" @@ -3257,18 +3074,6 @@ dependencies = [ "wasm-bindgen-shared", ] -[[package]] -name = "wasm-bindgen-futures" -version = "0.4.42" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76bc14366121efc8dbb487ab05bcc9d346b3b5ec0eaa76e46594cabbe51762c0" -dependencies = [ - "cfg-if", - "js-sys", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "wasm-bindgen-macro" version = "0.2.92" @@ -3298,16 +3103,6 @@ version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" -[[package]] -name = "web-sys" -version = "0.3.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef" -dependencies = [ - "js-sys", - "wasm-bindgen", -] - [[package]] name = "whoami" version = "1.5.1" @@ -3318,28 +3113,6 @@ dependencies = [ "wasite", ] -[[package]] -name = "winapi" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" -dependencies = [ - "winapi-i686-pc-windows-gnu", - "winapi-x86_64-pc-windows-gnu", -] - -[[package]] -name = "winapi-i686-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" - -[[package]] -name = "winapi-x86_64-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - [[package]] name = "windows-core" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index 4f1959b..531ba7b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,9 @@ async-trait = "0.1.79" # grpc & websocket tokio-tungstenite = { version = "0.21.0" } tonic = "0.11.0" -prost = "0.12.3" # protocol buffers +prost = "0.12.3" # protocol buffers +futures = "0.3.30" +tokio-util = "0.7.11" # database sea-orm = { version = "0.12.15", features = [ @@ -38,11 +40,13 @@ sea-orm-migration = { version = "0.12.15", features = [ "sqlx-sqlite", ] } redis = { version = "0.25.3", features = ["tokio-comp"] } -futures = "0.3.30" -rxrust = "1.0.0-beta.7" + +# other +serde = { version = "1.0.203", features = ["derive"] } +serde_json = "1.0.117" [dependencies.lool] -version = "^0.2.0" # crates: disable-check +version = "^0.3.2" # crates: disable-check registry = "lugit" features = [ "cli.stylize", diff --git a/examples/bus-sub.rs b/examples/bus-sub.rs index b98312f..7b04e27 100644 --- a/examples/bus-sub.rs +++ b/examples/bus-sub.rs @@ -1,16 +1,19 @@ use { eyre::{set_hook, DefaultHandler, Result}, + futures::{future, StreamExt}, rustler_core::{ bus::{self, SubscriberTrait}, rustlers::{Quote, Ticker}, }, - rxrust::observable::{ObservableExt, ObservableItem}, + tokio::sync::mpsc, }; #[tokio::main] async fn main() -> Result<()> { set_hook(Box::new(DefaultHandler::default_with))?; + let (cancel_tx, mut cancel_rx) = mpsc::channel::<()>(1); + let mut sx = bus::redis::subscriber::(&"redis://127.0.0.1/").await?; let ticker = Ticker { @@ -19,12 +22,24 @@ async fn main() -> Result<()> { quote_asset: None, }; - let _obs = sx.stream().await?.filter(move |quote| quote.belongs_to(&ticker)).subscribe(|v| { - println!("Received quote: {}", v); + let mut stream = + sx.stream().await?.filter(move |quote| future::ready(quote.belongs_to(&ticker))); + + tokio::spawn(async move { + // cancel the streaming after 10 seconds + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + println!("Cancelling stream"); + cancel_tx.send(()).await.unwrap(); }); - // wait for 10 seconds - tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + while let Some(quote) = stream.next().await { + println!("Received quote: {}", quote); + if cancel_rx.try_recv().is_ok() { + break; + } + } + + println!("Stream cancelled"); Ok(()) } diff --git a/examples/socket.rs b/examples/socket.rs new file mode 100644 index 0000000..ddaeb84 --- /dev/null +++ b/examples/socket.rs @@ -0,0 +1,89 @@ +use { + async_trait::async_trait, + eyre::Result, + futures::{SinkExt, StreamExt}, + lool::logger::{error, info, ConsoleLogger, Level}, + rustler_core::{ + bus::{self, SubscriberTrait}, + rustlers::Quote, + socket::{self, event, Error, EventDispatcher, Outgoing, Request, Response}, + }, + std::sync::Arc, + tokio::{join, sync::Mutex}, +}; + +#[derive(Clone)] +struct Dispatcher {} + +#[async_trait] +impl EventDispatcher for Dispatcher { + async fn dispatch( + &self, + event: String, + data: event::Data, + outgoing: Arc>, + ) -> Result<()> { + info!("Event: {}", event); + info!("Data: {:?}", data); + + let mut sx = bus::redis::subscriber::(&"redis://127.0.0.1/").await?; + let mut quote_feed = sx.stream().await?; + + tokio::spawn(async move { + while let Some(quote) = quote_feed.next().await { + let response = serde_json::to_string("e).unwrap(); + let mut o = outgoing.lock().await; + + if let Err(e) = o.send(response.into()).await { + // if error is AlreadyClosed or ConnectionClosed, then break the loop + match e { + Error::AlreadyClosed | Error::ConnectionClosed => { + break; + } + _ => { + error!("Error sending message: {:?}", e); + } + } + } + } + + info!("Hasta la vista, baby!"); + }); + + Ok(()) + } +} + +#[tokio::main] +async fn main() -> Result<()> { + ConsoleLogger::builder() + .with_level(Level::Trace) + .with_name("rustler") + .ignore("tungstenite::protocol") + .ignore("tungstenite::protocol::frame*") + .ignore("tokio_tungstenite::compat*") + .ignore("tokio_tungstenite") + .install()?; + + let dispatcher = Dispatcher {}; + let mut ws_server = socket::Server::new("127.0.0.1", "9002", dispatcher).await?; + + let handshaker = |_res: &Request, response: Response| { + Ok(response) + + // or fail the handshake, e.g. because of authentication failure + // + // let (mut parts, _) = response.into_parts(); + // parts.status = StatusCode::UNAUTHORIZED; + // let res = ErrorResponse::from_parts(parts, None); + // Err(res) + // + // or + // let res = Response::builder().status(401).body(None).unwrap(); + // Err(res) + }; + + join!(ws_server.start(handshaker)); + + Ok(()) +} diff --git a/lib/bus/mod.rs b/lib/bus/mod.rs index 9195a60..d5e6fa2 100644 --- a/lib/bus/mod.rs +++ b/lib/bus/mod.rs @@ -1,6 +1,8 @@ -use std::{convert::Infallible, fmt::Debug}; +use std::{fmt::Debug, pin::Pin}; -use {eyre::Result, rxrust::ops::box_it::CloneableBoxOpThreads, tonic::async_trait}; +use futures::Stream; + +use {eyre::Result, tonic::async_trait}; pub mod redis; @@ -39,5 +41,5 @@ pub trait SubscriberTrait { /// ๐ŸŽ ยป **stream** /// /// returns an `Observable` stream of messages from the redis bus - async fn stream(&mut self) -> Result>; + async fn stream(&mut self) -> Result + Send + 'static>>>; } diff --git a/lib/bus/redis/mod.rs b/lib/bus/redis/mod.rs index e7274f8..e4affdf 100644 --- a/lib/bus/redis/mod.rs +++ b/lib/bus/redis/mod.rs @@ -1,6 +1,7 @@ use {super::BusMessage, eyre::Result, redis::Client}; pub mod publish; +pub mod stream; pub mod subscribe; pub(crate) const KEY_PREFIX: &str = "rustler"; diff --git a/lib/bus/redis/stream/mod.rs b/lib/bus/redis/stream/mod.rs new file mode 100644 index 0000000..33fa96c --- /dev/null +++ b/lib/bus/redis/stream/mod.rs @@ -0,0 +1,65 @@ +use { + eyre::Result, + futures::Stream, + lool::fail, + std::{ + pin::Pin, + task::{Context, Poll}, + }, + tokio::sync::broadcast::{self, Receiver, Sender}, +}; + +use crate::bus::BusMessage; + +pub struct SourceStream { + sender: Option>, +} + +impl Default for SourceStream { + fn default() -> Self { + Self::new() + } +} + +impl SourceStream { + // Create a new SourceStream with a broadcast channel + pub fn new() -> Self { + let (sender, _) = broadcast::channel(100); // Adjust the buffer size as needed + SourceStream { + sender: Some(sender), + } + } + + pub fn sender(&self) -> Option> { + self.sender.clone() + } + + // Subscribe to the stream + pub fn subscribe(&self) -> Result + Send + 'static>>> { + if let Some(sender) = &self.sender { + let receiver = sender.subscribe(); + Ok(Box::pin(BroadcastStream { receiver })) + } else { + fail!("SourceStream has been consumed") + } + } +} + +// Wrapper around Receiver to implement Stream +struct BroadcastStream { + receiver: Receiver, +} + +impl Stream for BroadcastStream { + type Item = RM; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + // Spawn an async task to receive the message + tokio::task::block_in_place(|| match futures::executor::block_on(this.receiver.recv()) { + Ok(msg) => Poll::Ready(Some(msg)), + Err(broadcast::error::RecvError::Closed) => Poll::Ready(None), + Err(broadcast::error::RecvError::Lagged(_)) => Poll::Pending, + }) + } +} diff --git a/lib/bus/redis/subscribe.rs b/lib/bus/redis/subscribe.rs index 41d9582..3d252fb 100644 --- a/lib/bus/redis/subscribe.rs +++ b/lib/bus/redis/subscribe.rs @@ -1,14 +1,10 @@ use { - super::{key, PrefixedPubSub, RedisClient, KEY_PREFIX}, + super::{key, stream::SourceStream, PrefixedPubSub, RedisClient, KEY_PREFIX}, crate::bus::{BusMessage, SubscriberTrait}, eyre::Result, - futures::StreamExt, + futures::{Stream, StreamExt}, lool::{fail, s}, - rxrust::{ - observable::BoxIt, observer::Observer, ops::box_it::CloneableBoxOpThreads, - subject::SubjectThreads, subscription::Subscription, - }, - std::convert::Infallible, + std::pin::Pin, tonic::async_trait, }; @@ -24,9 +20,9 @@ pub struct RedisSubscriber { // this way we can just clone the connection when needing instead of storing the // redis client and creating a new connection client: redis::Client, - subject: Option>, key_prefix: String, pattern: String, + pub source_stream: Option>, } impl PrefixedPubSub for RedisSubscriber { @@ -50,7 +46,7 @@ impl RedisSubscriber { pattern: s!("*"), client: redis.get_client()?, key_prefix: s!(KEY_PREFIX), - subject: None, + source_stream: None, }) } @@ -67,21 +63,16 @@ impl RedisSubscriber { /// subscribe to the redis feed async fn start_streaming(&mut self) -> Result<()> { - if self.subject.is_none() { - self.subject = Some(SubjectThreads::default()); - } - - if self.subject.is_some() && self.subject.as_ref().unwrap().is_closed() { - drop(self.subject.take()); - self.subject = Some(SubjectThreads::default()); + if self.source_stream.is_none() { + self.source_stream = Some(SourceStream::new()); } let pattern = self.pattern.clone(); let mut conn = self.client.get_async_pubsub().await?; let prefix = self.get_prefix(); - if let Some(subject) = self.subject.as_mut() { - let mut stream = subject.clone(); + if let Some(stream) = self.source_stream.as_mut() { + let sender = stream.sender().unwrap(); tokio::spawn(async move { conn.psubscribe(key(prefix, pattern)).await?; @@ -92,13 +83,10 @@ impl RedisSubscriber { // TODO: handle possible panic when parsing message // using catch_unwind let message = RM::from_message(payload); - stream.next(message); + let _ = sender.send(message); } } - // if the connection with redis is closed, complete the stream - stream.complete(); - Result::<()>::Ok(()) }); } @@ -109,16 +97,13 @@ impl RedisSubscriber { #[async_trait] impl SubscriberTrait for RedisSubscriber { - /// ๐ŸŽ ยป **stream** - /// - /// returns an `Observable` stream of messages from the redis bus - async fn stream(&mut self) -> Result> { - if self.subject.is_none() { + async fn stream(&mut self) -> Result + Send + 'static>>> { + if self.source_stream.is_none() { self.start_streaming().await?; } - match self.subject.as_ref() { - Some(subject) => Ok(subject.clone().box_it()), + match self.source_stream.as_ref() { + Some(stream) => stream.subscribe(), None => fail!("Could not start streaming messages from redis bus"), } } diff --git a/lib/rustlers/rustler.rs b/lib/rustlers/rustler.rs index 164cf42..b1d61f0 100644 --- a/lib/rustlers/rustler.rs +++ b/lib/rustlers/rustler.rs @@ -11,6 +11,7 @@ use { chrono::{DateTime, Local}, eyre::Result, lool::s, + serde::Serialize, std::{ collections::HashMap, fmt::{self, Display, Formatter}, @@ -29,7 +30,7 @@ pub enum RustlerStatus { } /// ๐ŸŽ ยป an enum representing the different types of market hours -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub enum MarketHourType { Pre = 0, Regular = 1, @@ -57,7 +58,7 @@ impl From for MarketHourType { /// ๐ŸŽ ยป a struct storing a ticker's quote at a given time, and the change in price since the last /// quote -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub struct Quote { pub id: String, pub market: String, diff --git a/lib/socket/event.rs b/lib/socket/event.rs new file mode 100644 index 0000000..ed59754 --- /dev/null +++ b/lib/socket/event.rs @@ -0,0 +1,19 @@ +use { + serde::{Deserialize, Serialize}, + serde_json::{Map, Value}, +}; + +pub type Data = Map; + +#[derive(Serialize, Deserialize)] +pub struct WsEvent { + pub event: String, + pub data: Data, +} + +#[derive(Serialize, Deserialize)] +pub struct ErrorResponse { + #[serde(rename = "errorCode")] + pub error_code: u16, + pub msg: String, +} diff --git a/lib/socket/mod.rs b/lib/socket/mod.rs index 09ad0f0..d08180a 100644 --- a/lib/socket/mod.rs +++ b/lib/socket/mod.rs @@ -1 +1,5 @@ -// TODO: websocket gateway with tokio-tungstenite +pub mod event; +mod server; +pub mod stats; + +pub use server::*; diff --git a/lib/socket/server.rs b/lib/socket/server.rs new file mode 100644 index 0000000..66c1f94 --- /dev/null +++ b/lib/socket/server.rs @@ -0,0 +1,171 @@ +use { + super::{event, stats::ServerStats}, + async_trait::async_trait, + eyre::Result, + futures::{stream::SplitSink, StreamExt}, + lool::logger::{error, info}, + std::sync::Arc, + tokio::sync::Mutex, + tokio_tungstenite::{ + accept_hdr_async, + tungstenite::{handshake::server::Callback, Message}, + }, +}; + +pub use { + tokio::net::{TcpListener, TcpStream}, + tokio_tungstenite::{ + tungstenite::{ + handshake::server::{Request, Response}, + Error, + }, + WebSocketStream, + }, +}; + +pub type Outgoing = SplitSink, Message>; + +#[async_trait] +pub trait EventDispatcher: Send { + async fn dispatch( + &self, + event: String, + data: event::Data, + outgoing: Arc>, + ) -> Result<()>; +} + +/// ๐ŸŽ ยป **`socket::Server`** +/// -- +/// +/// A websocket gateway server that listens for incoming connections and dispatches events to the +/// appropriate event handlers by using the provided `EventDispatcher`. +/// +/// ### Example +/// See `examples/socket.rs` for a complete example. +pub struct Server +where + ED: EventDispatcher + Clone + Send + Sync + 'static, +{ + stats: Arc, + listener: TcpListener, + host: String, + port: String, + event_dispatcher: ED, +} + +impl Server +where + ED: EventDispatcher + Clone + Send + Sync + 'static, +{ + pub async fn new(host: &str, port: &str, event_dispatcher: ED) -> std::io::Result { + let listener = TcpListener::bind(format!("{}:{}", host, port)).await?; + + Ok(Self { + listener, + event_dispatcher, + host: host.to_string(), + port: port.to_string(), + stats: Arc::new(ServerStats::new()), + }) + } + + /// **๐ŸŽ ยป `start_no_cb`**: start the server + pub async fn start_no_cb(&mut self) { + let noop_cb = |_: &Request, response: Response| Ok(response); + self.start(noop_cb).await; + } + + /// **๐ŸŽ ยป `start`** + /// + /// Starts the server with a handshake callback. Usefull for customizing the + /// handshake process, e.g. checking headers, etc. + /// + /// **Tip:** if you don't need to customize the handshake process, use + /// `start_no_cb` instead. + pub async fn start(&mut self, cb: HCb) + where + HCb: Callback + Unpin + Clone, + { + info!("Started Rustler WS Server on {}:{}", self.host, self.port); + + let stats = &self.stats.clone(); + + while let Ok((stream, peer)) = self.listener.accept().await { + let dispatcher = self.event_dispatcher.clone(); + let cb = cb.clone(); + info!("Incoming connection from: {}", peer); + + // call the handshake callback + let ws_stream = accept_hdr_async(stream, cb).await; + + if let Ok(ws_stream) = ws_stream { + stats.inc_current_clients(); + + let stats = stats.clone(); + tokio::spawn(async move { + match Server::handle_connection(ws_stream, dispatcher).await { + Ok(_) => info!("Connection closed"), + Err(e) => error!("Error handling connection: {:?}", e), + }; + + // decrement client count + stats.clone().dec_current_clients(); + info!("{:?}", stats); + }); + } + + info!("{:?}", stats); + } + } + + /// subscribe to incoming messages + async fn handle_connection( + stream: WebSocketStream, + event_dispatcher: ED, + ) -> Result<()> { + let (outgoing, mut incoming) = stream.split(); + let synced_outgoing = Arc::new(Mutex::new(outgoing)); + + while let Some(msg) = incoming.next().await { + Server::handle_message(msg?, &event_dispatcher, synced_outgoing.clone()).await?; + } + + Ok(()) + } + + /// handle an incoming message + async fn handle_message( + msg: Message, + event_dispatcher: &ED, + outgoing: Arc>, + ) -> Result { + if msg.is_text() || msg.is_binary() { + if let Ok(event) = serde_json::from_str::(&msg.to_string()) { + let outgoing = Arc::clone(&outgoing); + let result = event_dispatcher.dispatch(event.event, event.data, outgoing).await; + + match result { + Ok(_) => {} + Err(e) => { + error!("Error dispatching event: {:?}", e); + } + }; + } + } + + if msg.is_close() { + return Ok(HandlingResult::Closed); + } + + // TODO: should we handle ping/pong messages? + + Ok(HandlingResult::Handled) + } +} + +#[derive(PartialEq)] +enum HandlingResult { + Handled, + Closed, +} diff --git a/lib/socket/stats.rs b/lib/socket/stats.rs new file mode 100644 index 0000000..1ea56bc --- /dev/null +++ b/lib/socket/stats.rs @@ -0,0 +1,41 @@ +use { + core::fmt, + std::{ + fmt::{Debug, Formatter}, + sync::{atomic::AtomicU64, Arc}, + }, +}; + +/// `ServerStats` is a struct that holds the server statistics such as total clients and current +/// clients; only used for debugging purposes. +#[derive(Default)] +pub struct ServerStats { + total_clients: Arc, + current_clients: Arc, +} + +impl ServerStats { + pub fn new() -> Self { + Self::default() + } + + /// increments the total clients and current clients count. + pub fn inc_current_clients(&self) { + self.current_clients.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.total_clients.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + /// decrements the current clients count. + pub fn dec_current_clients(&self) { + self.current_clients.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + } +} + +impl Debug for ServerStats { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("ServerStats") + .field("total_clients", &self.total_clients) + .field("current_clients", &self.current_clients) + .finish() + } +} diff --git a/rustfmt.toml b/rustfmt.toml index 76ddffc..eb331bf 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,7 +1,8 @@ max_width = 100 array_width = 80 chain_width = 100 -comment_width = 80 +comment_width = 70 imports_indent = "Block" imports_granularity = "One" empty_item_single_line = true +single_line_if_else_max_width = 70