diff --git a/Cargo.lock b/Cargo.lock index 06d0051..4891bc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -137,6 +137,46 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix 0.37.27", + "slab", + "socket2 0.4.10", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -447,6 +487,29 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -626,6 +689,15 @@ dependencies = [ "once_cell", ] +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.0.2" @@ -699,6 +771,7 @@ checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -730,6 +803,7 @@ dependencies = [ "futures-core", "futures-task", "futures-util", + "num_cpus", ] [[package]] @@ -749,6 +823,32 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -761,6 +861,18 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +[[package]] +name = "futures-time" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6404853a6824881fe5f7d662d147dc4e84ecd2259ba0378f272a71dab600758a" +dependencies = [ + "async-channel", + "async-io", + "futures-core", + "pin-project-lite", +] + [[package]] name = "futures-util" version = "0.3.30" @@ -770,6 +882,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -823,6 +936,18 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.3.26" @@ -986,7 +1111,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.5.6", "tokio", "tower-service", "tracing", @@ -1075,6 +1200,26 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "itertools" version = "0.12.1" @@ -1131,6 +1276,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + [[package]] name = "linux-raw-sys" version = "0.4.13" @@ -1431,6 +1582,12 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.1" @@ -1544,6 +1701,22 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -1726,6 +1899,27 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redis" +version = "0.25.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6472825949c09872e8f2c50bde59fcefc17748b6be5c90fd67cd8b4daca73bfd" +dependencies = [ + "async-trait", + "bytes", + "combine", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.5.6", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -1859,6 +2053,20 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustix" +version = "0.37.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys 0.48.0", +] + [[package]] name = "rustix" version = "0.38.34" @@ -1868,7 +2076,7 @@ dependencies = [ "bitflags 2.5.0", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.13", "windows-sys 0.52.0", ] @@ -1880,9 +2088,12 @@ dependencies = [ "chrono", "dotenvy", "eyre", + "futures", "getset", "lool", "prost", + "redis", + "rxrust", "sea-orm", "sea-orm-migration", "tokio", @@ -1897,6 +2108,21 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80af6f9131f277a45a3fba6ce8e2258037bb0477a67e610d3c1fe046ab31de47" +[[package]] +name = "rxrust" +version = "1.0.0-beta.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9a529052da83e8897bb1f40459a0837b3c048eafabfae3bf1e7c050146b8141" +dependencies = [ + "futures", + "futures-time", + "gloo-timers", + "once_cell", + "pin-project-lite", + "smallvec", + "wasm-bindgen-futures", +] + [[package]] name = "ryu" version = "1.0.17" @@ -2148,6 +2374,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + [[package]] name = "sha2" version = "0.10.8" @@ -2199,6 +2431,16 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.5.6" @@ -2547,8 +2789,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ "cfg-if", - "fastrand", - "rustix", + "fastrand 2.0.2", + "rustix 0.38.34", "windows-sys 0.52.0", ] @@ -2640,7 +2882,7 @@ dependencies = [ "mio", "num_cpus", "pin-project-lite", - "socket2", + "socket2 0.5.6", "tokio-macros", "windows-sys 0.48.0", ] @@ -2963,6 +3205,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" + [[package]] name = "want" version = "0.3.1" @@ -3009,6 +3257,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76bc14366121efc8dbb487ab05bcc9d346b3b5ec0eaa76e46594cabbe51762c0" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.92" @@ -3038,6 +3298,16 @@ version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" +[[package]] +name = "web-sys" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "whoami" version = "1.5.1" @@ -3048,6 +3318,28 @@ dependencies = [ "wasite", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-core" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index a9de92e..4666d6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "rustler-core" version = "0.0.1" edition = "2021" -description = "🤠 » rustler-core market data extractor core functionality" +description = "🐎 » rustler-core market data extractor core functionality" authors = ["Lucas Colombo "] build = "lib/build.rs" @@ -23,7 +23,7 @@ async-trait = "0.1.79" # grpc & websocket tokio-tungstenite = { version = "0.21.0" } tonic = "0.11.0" -prost = "0.12.3" # protocol buffers +prost = "0.12.3" # protocol buffers # database sea-orm = { version = "0.12.15", features = [ @@ -35,6 +35,9 @@ sea-orm-migration = { version = "0.12.15", features = [ "runtime-tokio-native-tls", "sqlx-sqlite", ] } +redis = { version = "0.25.3", features = ["tokio-comp"] } +futures = "0.3.30" +rxrust = "1.0.0-beta.7" [dependencies.lool] version = "^0.2.0" # crates: disable-check diff --git a/Taskfile.yaml b/Taskfile.yaml index 6c2a213..90ed7ae 100644 --- a/Taskfile.yaml +++ b/Taskfile.yaml @@ -3,48 +3,47 @@ version: '3' tasks: - db:up: - desc: 🗃️ run database migrations (up) - cmds: - - sea migrate up - - db:down: - desc: 🗃️ run database migrations (down) - cmds: - - sea migrate down + db:up: + desc: 🗃️ run database migrations (up) + cmds: + - sea migrate up - run:watch: - desc: 🚀 watch rustler - cmds: - - cargo watch -c -x "run --example=rustler" + db:down: + desc: 🗃️ run database migrations (down) + cmds: + - sea migrate down - build:watch: - desc: 🚀 watch rustler «build» - cmds: - - cargo watch -c -x "build" + run:watch: + desc: 🚀 watch rustler + cmds: + - cargo watch -c -x "run --example=rustler" - build: - desc: ⚡ build rustler «release» - cmds: - - cargo build --release - - python check_size.py + build:watch: + desc: 🚀 watch rustler «build» + cmds: + - cargo watch -c -x "build" - fmt: - desc: 🎨 format rustler - cmds: - - cargo +nightly fmt --all + build: + desc: ⚡ build rustler «release» + cmds: + - cargo build --release - test: - desc: 🧪 test lool - cmds: - - cargo nextest run --all-features --workspace + fmt: + desc: 🎨 format rustler + cmds: + - cargo +nightly fmt --all - lint: - desc: 🧶 lint rustler - cmds: - - cargo clippy --fix --workspace --allow-staged + test: + desc: 🧪 test lool + cmds: + - cargo nextest run --all-features --workspace - publish: - desc: 🚀 publish lool - cmds: - - cargo publish --registry lugit-sa + lint: + desc: 🧶 lint rustler + cmds: + - cargo clippy --fix --workspace --allow-staged + + publish: + desc: 🚀 publish lool + cmds: + - cargo publish --registry lugit-sa diff --git a/examples/binance/mod.rs b/examples/binance/mod.rs index ede5f57..46d4609 100644 --- a/examples/binance/mod.rs +++ b/examples/binance/mod.rs @@ -14,10 +14,18 @@ rustler!( pub struct FooRustler {} ); +#[allow(dead_code)] impl FooRustler { pub fn create() -> impl Rustler { Self::default() } + + pub fn create_with_external_stuff(name: String) -> impl Fn() -> FooRustler { + move || { + println!("Creating a new FooRustler using external name = {}", name); + Self::default() + } + } } #[async_trait] diff --git a/examples/bus-pub.rs b/examples/bus-pub.rs new file mode 100644 index 0000000..13a2ec4 --- /dev/null +++ b/examples/bus-pub.rs @@ -0,0 +1,88 @@ +mod binance; + +use { + eyre::{set_hook, DefaultHandler, Result}, + lool::s, + rustler_core::rustlers::{bus, MarketHourType, Quote}, +}; + +#[tokio::main] +async fn main() -> Result<()> { + set_hook(Box::new(DefaultHandler::default_with))?; + let mut px = bus::publisher(&"redis://127.0.0.1/").await?; + let variations = vec![-4.3, -1.1, 2.0, -0.5, 1.5, -1.3, 0.7, 0.3, -0.1, 3.4]; + + let vars = variations.clone(); + let vars2 = variations.clone(); + + let mut publisher = px.clone(); + let publish1 = async move { + let mut idx = 0; + let mut price = 50000.0; + + loop { + // sleep for 1 second + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + // random percentage change + let change_percent = vars[idx]; + price = price + (price * change_percent / 100.0); + + let quote = Quote { + market: s!("BINANCE"), + id: s!("BTCUSDT"), + change_percent, + market_hours: MarketHourType::Regular, + price, + time: 198798798798, + }; + + println!("Publishing quote, {}", quote); + + publisher.publish(quote).await?; + + // keep the index within 0 and length of variations + idx = (idx + 1) % vars.len(); + } + + #[allow(unreachable_code)] + Result::<()>::Ok(()) + }; + + let publish2 = async move { + let mut idx = 0; + let mut price = 300.0; + + loop { + // sleep for 1 second + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + // random percentage change + let change_percent = vars2[idx]; + price = price + (price * change_percent / 100.0); + + let quote = Quote { + market: s!("NASDAQ"), + id: s!("GOOGL"), + change_percent, + market_hours: MarketHourType::Regular, + price, + time: 198798798798, + }; + + println!("Publishing quote, {}", quote); + + px.publish(quote).await?; + + // keep the index within 0 and length of vars2 + idx = (idx + 1) % vars2.len(); + } + + #[allow(unreachable_code)] + Result::<()>::Ok(()) + }; + + let _ = tokio::join!(publish1, publish2); + + Ok(()) +} diff --git a/examples/bus-sub.rs b/examples/bus-sub.rs new file mode 100644 index 0000000..dafe12e --- /dev/null +++ b/examples/bus-sub.rs @@ -0,0 +1,26 @@ +use { + eyre::{set_hook, DefaultHandler, Result}, + rustler_core::rustlers::{bus, Quote, Ticker}, + rxrust::observable::{ObservableExt, ObservableItem}, +}; + +#[tokio::main] +async fn main() -> Result<()> { + set_hook(Box::new(DefaultHandler::default_with))?; + + let mut sx = bus::subscriber::(&"redis://127.0.0.1/").await?; + + let ticker = Ticker { + market: "BINANCE".to_string(), + symbol: "BTCUSDT".to_string(), + }; + + let _obs = sx.stream().await?.filter(move |quote| quote.belongs_to(&ticker)).subscribe(|v| { + println!("Received quote: {}", v); + }); + + // wait for 10 seconds + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + + Ok(()) +} diff --git a/examples/rustler.rs b/examples/rustler.rs index ea40fb0..d2e7038 100644 --- a/examples/rustler.rs +++ b/examples/rustler.rs @@ -19,7 +19,7 @@ async fn main() -> Result<()> { let mut rustler = RustlersSvc::new( conn.clone(), rustlerjar! { - "BINANCE" => FooRustler + "BINANCE" => FooRustler::create }, ) .await; diff --git a/lib/entities/migration/m20220101_000001_create_table_market.rs b/lib/entities/migration/m20220101_000001_create_table_market.rs index 3af09fb..a3514bd 100644 --- a/lib/entities/migration/m20220101_000001_create_table_market.rs +++ b/lib/entities/migration/m20220101_000001_create_table_market.rs @@ -1,7 +1,7 @@ use sea_orm_migration::{async_trait::async_trait, prelude::*}; #[derive(DeriveMigrationName)] -/// 🤠 » create table `market` +/// 🐎 » create table `market` pub struct Migration; #[async_trait] diff --git a/lib/entities/migration/m20240325_200049_create_table_ticker.rs b/lib/entities/migration/m20240325_200049_create_table_ticker.rs index 6fea9fa..ab50754 100644 --- a/lib/entities/migration/m20240325_200049_create_table_ticker.rs +++ b/lib/entities/migration/m20240325_200049_create_table_ticker.rs @@ -1,7 +1,7 @@ use sea_orm_migration::{async_trait::async_trait, prelude::*}; #[derive(DeriveMigrationName)] -/// 🤠 » create table `ticker` +/// 🐎 » create table `ticker` pub struct Migration; #[async_trait] diff --git a/lib/entities/orm/market.rs b/lib/entities/orm/market.rs index a04589d..c16daf9 100644 --- a/lib/entities/orm/market.rs +++ b/lib/entities/orm/market.rs @@ -4,7 +4,7 @@ use sea_orm::entity::prelude::*; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "market")] -/// 🤠 » market entity model +/// 🐎 » market entity model pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub id: String, diff --git a/lib/entities/orm/ticker.rs b/lib/entities/orm/ticker.rs index 92fda31..7ce060e 100644 --- a/lib/entities/orm/ticker.rs +++ b/lib/entities/orm/ticker.rs @@ -4,7 +4,7 @@ use sea_orm::entity::prelude::*; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "ticker")] -/// 🤠 » ticker entity model +/// 🐎 » ticker entity model pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub id: String, diff --git a/lib/entities/services/market.rs b/lib/entities/services/market.rs index 00acc94..2e19659 100644 --- a/lib/entities/services/market.rs +++ b/lib/entities/services/market.rs @@ -7,7 +7,7 @@ use { sea_orm::{DatabaseConnection, DbErr, EntityTrait, IntoActiveModel}, }; -/// 🤠 » service for the `Market` entity +/// 🐎 » service for the `Market` entity pub struct Service { conn: DatabaseConnection, } @@ -17,19 +17,19 @@ impl Service { Self { conn } } - /// 🤠 » gets all markets from the database + /// 🐎 » gets all markets from the database pub async fn get_all(&self) -> Result, DbErr> { let markets = Market::find().all(&self.conn).await?; Ok(markets) } - /// 🤠 » gets a market by its id + /// 🐎 » gets a market by its id pub async fn create(&self, market: MarketModel) -> Result { Market::insert(market.clone().into_active_model()).exec(&self.conn).await?; Ok(market) } - /// 🤠 » gets all markets with their tickers + /// 🐎 » gets all markets with their tickers pub async fn get_all_with_tickers( &self, ) -> Result)>, DbErr> { diff --git a/lib/entities/services/ticker.rs b/lib/entities/services/ticker.rs index f0c7ea0..68dc8d9 100644 --- a/lib/entities/services/ticker.rs +++ b/lib/entities/services/ticker.rs @@ -7,7 +7,7 @@ use { sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, IntoActiveModel, QueryFilter}, }; -/// 🤠 » service for the `Ticker` entity +/// 🐎 » service for the `Ticker` entity pub struct Service { conn: DatabaseConnection, } diff --git a/lib/grpc/server.rs b/lib/grpc/server.rs index e002930..147ef69 100644 --- a/lib/grpc/server.rs +++ b/lib/grpc/server.rs @@ -12,7 +12,7 @@ use { const RUSTLER_GRPC_API_ADDR: &str = "RUSTLER_GRPC_API_ADDR"; -/// 🤠 » starts the rustler gRPC server +/// 🐎 » starts the rustler gRPC server pub async fn start(conn: DatabaseConnection) -> Result<()> { fn get_default_addr() -> String { let addr = "0.0.0.0:50051"; diff --git a/lib/grpc/services/market.rs b/lib/grpc/services/market.rs index 39b5362..3afe318 100644 --- a/lib/grpc/services/market.rs +++ b/lib/grpc/services/market.rs @@ -47,7 +47,7 @@ impl Market { } } -/// 🤠 » grpc Server to manage market entities +/// 🐎 » grpc Server to manage market entities pub struct GrpcServer { pub(crate) svc: market::Service, } @@ -59,7 +59,7 @@ impl GrpcServer { } } - /// 🤠 » creates the market api server + /// 🐎 » creates the market api server pub fn svc(self) -> MarketApiServer { MarketApiServer::new(self) } diff --git a/lib/grpc/services/ticker.rs b/lib/grpc/services/ticker.rs index b32e6fc..9fff7d8 100644 --- a/lib/grpc/services/ticker.rs +++ b/lib/grpc/services/ticker.rs @@ -32,7 +32,7 @@ impl Ticker { } } -/// 🤠 » grpc Server to manage ticker entities +/// 🐎 » grpc Server to manage ticker entities pub struct GrpcServer { pub(crate) svc: ticker::Service, } @@ -44,7 +44,7 @@ impl GrpcServer { } } - /// 🤠 » creates the ticker api server + /// 🐎 » creates the ticker api server pub fn svc(self) -> TickerApiServer { TickerApiServer::new(self) } diff --git a/lib/rustlers/bus/mod.rs b/lib/rustlers/bus/mod.rs new file mode 100644 index 0000000..dfbc298 --- /dev/null +++ b/lib/rustlers/bus/mod.rs @@ -0,0 +1,123 @@ +use {eyre::Result, redis::Client, std::fmt::Debug}; + +pub mod publish; +pub mod subscribe; + +pub(crate) const KEY_PREFIX: &str = "rustler"; + +/// creates a message redis key from a prefix and a key +pub(crate) fn key, K: AsRef>(prefix: T, key: K) -> String { + let prefix = prefix.as_ref(); + let key = key.as_ref(); + + match prefix.is_empty() { + true => key.to_string(), + false => format!("{}:{}", prefix, key), + } +} + +/// represents a pub or sub handler that can be prefixed +pub trait PrefixedPubSub { + fn get_prefix(&self) -> String; + fn set_prefix(&mut self, prefix: &str) -> &mut Self; + + fn with_prefix(&mut self, prefix: &str) -> &mut Self { + self.set_prefix(prefix); + self + } + + fn without_prefix(&mut self) -> &mut Self { + self.set_prefix(""); + self + } +} + +/// 🐎 » represents a an entity that can provide a redis client +pub trait RedisClient { + fn get_client(&self) -> Result; +} + +impl RedisClient for Client { + fn get_client(&self) -> Result { + let redis = self.clone(); + Ok(redis) + } +} + +impl RedisClient for &str { + fn get_client(&self) -> Result { + let redis = Client::open(*self)?; + Ok(redis) + } +} + +impl RedisClient for String { + fn get_client(&self) -> Result { + let redis = Client::open(self.as_str())?; + Ok(redis) + } +} + +/// 🐎 » represents a value that can be serialized to a redis value +pub trait ToRedisVal { + fn to_redis_val(&self) -> Vec<(String, String)>; +} + +/// 🐎 » represents a value that can be serialized to a redis key +pub trait ToRedisKey { + fn to_redis_key(&self) -> String; +} + +/// 🐎 » represents a value that can be serialized to and from a redis message +pub trait ToFromRedisMessage { + fn as_message(&self) -> String; + fn from_message>(msg: T) -> Self; +} + +/// 🐎 » supertrait combining all redis object traits + debug + send + sync + 'static +pub trait RedisMessage: + ToRedisVal + ToRedisKey + ToFromRedisMessage + Debug + Clone + Send + Sync + PartialEq + 'static +{ +} + +/// 🐎 » **publisher**: create bus publisher +/// +/// **Arguments** +/// - `redis` - a redis client or a redis connection string +/// +/// **Returns** +/// - a new `Publisher` instance +pub async fn publisher( + redis: &RC, +) -> Result> { + publish::Publisher::new(redis).await +} + +/// 🐎 » **subscriber**: create bus subscriber +/// +/// **Arguments** +/// - `redis` - a redis client or a redis connection string +/// +/// **Returns** +/// - a new `Subscriber` instance +pub async fn subscriber( + redis: &RC, +) -> Result> { + subscribe::Subscriber::new(redis).await +} + +/// 🐎 » **pubsub*: create bus publisher and subscriber +/// +/// **Arguments** +/// - `redis` - a redis client or a redis connection string +/// +/// **Returns** +/// - a tuple containing a `Publisher` and a `Subscriber` instance +pub async fn pubsub( + redis: &T, +) -> Result<(publish::Publisher, subscribe::Subscriber)> { + let publisher = publish::Publisher::new(redis).await?; + let subscriber = subscribe::Subscriber::new(redis).await?; + + Ok((publisher, subscriber)) +} diff --git a/lib/rustlers/bus/publish.rs b/lib/rustlers/bus/publish.rs new file mode 100644 index 0000000..2d95127 --- /dev/null +++ b/lib/rustlers/bus/publish.rs @@ -0,0 +1,52 @@ +use { + super::{key, PrefixedPubSub, RedisClient, RedisMessage, KEY_PREFIX}, + eyre::Result, + redis::{aio::MultiplexedConnection, AsyncCommands}, +}; + +/// 🐎 » bus **Publisher** +#[derive(Clone)] +pub struct Publisher { + conn: MultiplexedConnection, + key_prefix: String, + resource_type: std::marker::PhantomData, +} + +impl PrefixedPubSub for Publisher { + fn get_prefix(&self) -> String { + self.key_prefix.clone() + } + + fn set_prefix(&mut self, prefix: &str) -> &mut Self { + self.key_prefix = prefix.to_string(); + self + } +} + +impl Publisher { + /// 🐎 » create a new bus publisher + pub async fn new(redis: &RC) -> Result + where + RC: RedisClient, + { + let redis = redis.get_client()?; + let conn = redis.get_multiplexed_tokio_connection().await?; + + Ok(Self { + conn, + key_prefix: KEY_PREFIX.to_string(), + resource_type: std::marker::PhantomData, + }) + } + + /// 🐎 » publish a message to the bus + pub async fn publish(&mut self, value: RM) -> Result<()> { + let obj_key = key(self.get_prefix(), value.to_redis_key()); + // set hash key + self.conn.hset_multiple(&obj_key, value.to_redis_val().as_slice()).await?; + + // publish to the appropriate channel + self.conn.publish(&obj_key, value.as_message()).await?; + Ok(()) + } +} diff --git a/lib/rustlers/bus/subscribe.rs b/lib/rustlers/bus/subscribe.rs new file mode 100644 index 0000000..af6980a --- /dev/null +++ b/lib/rustlers/bus/subscribe.rs @@ -0,0 +1,113 @@ +use { + super::{key, PrefixedPubSub, RedisClient, RedisMessage, KEY_PREFIX}, + eyre::Result, + futures::StreamExt, + lool::{fail, s}, + rxrust::{ + observable::BoxIt, observer::Observer, ops::box_it::CloneableBoxOpThreads, + subject::SubjectThreads, subscription::Subscription, + }, + std::convert::Infallible, +}; + +// IDEA: create another version using tokio broadcast channels +// https://github.com/exein-io/pulsar/blob/99ad35c8d13eaf1a37d7b6a9dcb812a5a1231d00/crates/pulsar-core/src/bus.rs + +/// 🐎 » bus **Subscriber** +pub struct Subscriber { + // TODO: replace with storing tokio multiplexed connection like in publish.rs when redis@0.26.0 + // is released see https://github.com/redis-rs/redis-rs/issues/1137. + // this way we can just clone the connection when needing instead of storing the + // redis client and creating a new connection + client: redis::Client, + subject: Option>, + key_prefix: String, + pattern: String, +} + +impl PrefixedPubSub for Subscriber { + fn get_prefix(&self) -> String { + self.key_prefix.clone() + } + + fn set_prefix(&mut self, prefix: &str) -> &mut Self { + self.key_prefix = s!(prefix); + self + } +} + +impl Subscriber { + /// 🐎 » create a new bus subscriber + pub async fn new(redis: &RC) -> Result + where + RC: RedisClient, + { + Ok(Self { + pattern: s!("*"), + client: redis.get_client()?, + key_prefix: s!(KEY_PREFIX), + subject: None, + }) + } + + pub fn with_pattern(&mut self, pattern: &str) -> &mut Self { + self.pattern = s!(pattern); + self + } + + pub fn get_pattern(&self) -> String { + key(self.get_prefix(), self.pattern.clone()) + } + + /// 🐎 » subscribe to a channel + pub async fn start_streaming(&mut self) -> Result<()> { + if self.subject.is_none() { + self.subject = Some(SubjectThreads::default()); + } + + if self.subject.is_some() && self.subject.as_ref().unwrap().is_closed() { + drop(self.subject.take()); + self.subject = Some(SubjectThreads::default()); + } + + let pattern = self.pattern.clone(); + let mut conn = self.client.get_async_pubsub().await?; + let prefix = self.get_prefix(); + + if let Some(subject) = self.subject.as_mut() { + let mut stream = subject.clone(); + + tokio::spawn(async move { + conn.psubscribe(key(prefix, pattern)).await?; + + let mut msg_stream = conn.on_message(); + while let Some(msg) = msg_stream.next().await { + if let Ok(payload) = msg.get_payload::() { + // TODO: handle possible panic when parsing message + // using catch_unwind + let message = RM::from_message(payload); + stream.next(message); + } + } + + // if the connection with redis is closed, complete the stream + stream.complete(); + + Result::<()>::Ok(()) + }); + } + + Ok(()) + } + + pub async fn stream(&mut self) -> Result> { + if self.subject.is_none() { + self.start_streaming().await?; + } + + match self.subject.as_ref() { + Some(subject) => Ok(subject.clone().box_it()), + None => fail!("Could not start streaming messages from redis bus"), + } + } +} diff --git a/lib/rustlers/mod.rs b/lib/rustlers/mod.rs index 8e5f582..da5b1a1 100644 --- a/lib/rustlers/mod.rs +++ b/lib/rustlers/mod.rs @@ -1,6 +1,6 @@ mod rustler; +pub mod bus; pub mod rustlerjar; pub mod svc; - pub use rustler::*; diff --git a/lib/rustlers/rustler.rs b/lib/rustlers/rustler.rs index aad90c3..ced2716 100644 --- a/lib/rustlers/rustler.rs +++ b/lib/rustlers/rustler.rs @@ -2,11 +2,16 @@ pub extern crate chrono; pub extern crate eyre; use { + super::bus::{RedisMessage, ToFromRedisMessage, ToRedisKey, ToRedisVal}, crate::entities::{market, ticker}, async_trait::async_trait, chrono::{DateTime, Local}, eyre::Result, - std::collections::HashMap, + lool::s, + std::{ + collections::HashMap, + fmt::{self, Display, Formatter}, + }, }; #[derive(Debug, Clone, PartialEq, Eq, Default)] @@ -20,10 +25,28 @@ pub enum RustlerStatus { #[derive(Debug, Clone)] pub enum MarketHourType { - Pre, - Regular, - Post, - Extended, + Pre = 0, + Regular = 1, + Post = 2, + Extended = 3, +} + +impl From for u8 { + fn from(market_hour_type: MarketHourType) -> Self { + market_hour_type as u8 + } +} + +impl From for MarketHourType { + fn from(market_hour_type: u8) -> Self { + match market_hour_type { + 0 => MarketHourType::Pre, + 1 => MarketHourType::Regular, + 2 => MarketHourType::Post, + 3 => MarketHourType::Extended, + _ => MarketHourType::Regular, + } + } } #[derive(Debug, Clone)] @@ -31,11 +54,105 @@ pub struct Quote { pub id: String, pub market: String, pub price: f64, - pub change_percent: Option, - pub time: Option, - pub market_hours: Option, + pub change_percent: f64, + pub time: i64, + pub market_hours: MarketHourType, } +impl Quote { + pub fn belongs_to(&self, ticker: &Ticker) -> bool { + self.id == ticker.symbol && self.market == ticker.market + } +} + +impl Display for Quote { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +impl ToRedisVal for Quote { + fn to_redis_val(&self) -> Vec<(String, String)> { + let market_hours_u8: u8 = self.market_hours.clone().into(); + + vec![ + (s!("id"), self.id.to_owned()), + (s!("market"), self.market.to_owned()), + (s!("price"), self.price.to_string()), + (s!("market_hours"), market_hours_u8.to_string()), + (s!("time"), self.time.to_string()), + (s!("change_percent"), self.change_percent.to_string()), + ] + } +} + +impl ToRedisKey for Quote { + fn to_redis_key(&self) -> String { + format!("quote:{}:{}", self.market, self.id) + } +} + +impl ToFromRedisMessage for Quote { + fn as_message(&self) -> String { + // id¦market¦price¦change_percent¦time¦market_hours + format!( + "{}¦{}¦{}¦{}¦{}¦{}", + self.id, + self.market, + self.price, + self.change_percent, + self.time, + Into::::into(self.market_hours.clone()) + ) + } + + /// 🐎 » creates a `Quote` from a message + /// + /// the message should be in the format `id¦market¦price¦change_percent¦time¦market_hours` + /// + /// **panics** if the message is not in the correct format + fn from_message>(msg: T) -> Self { + let msg = msg.as_ref(); + let parts: Vec<&str> = msg.split('¦').collect(); + + let id = parts[0].to_string(); + let market = parts[1].to_string(); + let price = parts[2].parse::().unwrap(); + let change_percent = parts[3].parse::().unwrap(); + let time = parts[4].parse::().unwrap(); + let market_hours = parts[5].parse::().unwrap().into(); + + Self { + id, + market, + price, + change_percent, + time, + market_hours, + } + } +} + +impl PartialEq for Quote { + fn eq(&self, other: &Ticker) -> bool { + self.id == other.symbol && self.market == other.market + } +} + +impl PartialEq for Ticker { + fn eq(&self, other: &Quote) -> bool { + self.symbol == other.id && self.market == other.market + } +} + +impl PartialEq for Quote { + fn eq(&self, other: &Quote) -> bool { + self.id == other.id && self.market == other.market + } +} + +impl RedisMessage for Quote {} + #[derive(Debug, Clone)] pub struct RustlerOpts { pub connect_on_start: bool, @@ -58,7 +175,7 @@ pub struct ScrapperCallbacks { pub on_message: Option Result<()>>, } -/// 🤠 » a scruct representing a ticker +/// 🐎 » a scruct representing a ticker /// /// in `rustler` a ticker is the union between a symbol (stock identifier) and its market /// @@ -83,7 +200,7 @@ impl Ticker { tickers.iter().map(|t| Self::from(t, market)).collect() } - /// 🤠 » returns the key of the ticker + /// 🐎 » returns the key of the ticker pub fn key(&self) -> String { format!("{}:{}", self.market, self.symbol) } @@ -130,17 +247,17 @@ pub trait RustlerAccessor { #[async_trait] pub trait Rustler: RustlerAccessor + Send + Sync { // #region Unimplemented trait functions - /// 🤠 » fn called after tickers are added to the rustler + /// 🐎 » fn called after tickers are added to the rustler fn on_add(&mut self, tickers: &[Ticker]) -> Result<()>; - /// 🤠 » fn called after tickers are deleted from the rustler + /// 🐎 » fn called after tickers are deleted from the rustler fn on_delete(&mut self, tickers: &[Ticker]) -> Result<()>; - /// 🤠 » connects the rustler to the data source + /// 🐎 » connects the rustler to the data source async fn connect(&mut self) -> Result<()>; - /// 🤠 » disconnects the rustler from the data source + /// 🐎 » disconnects the rustler from the data source async fn disconnect(&mut self) -> Result<()>; // #endregion - /// 🤠 » starts the rustler + /// 🐎 » starts the rustler async fn start(&mut self) -> Result<()> { let opts = self.opts(); if opts.connect_on_start { @@ -149,7 +266,7 @@ pub trait Rustler: RustlerAccessor + Send + Sync { Ok(()) } - /// 🤠 » updates last stop and last run times and calls the appropriate callback + /// 🐎 » updates last stop and last run times and calls the appropriate callback /// /// should be called after the status of the rustler changes fn handle_status_change(&mut self) -> Result<()> { @@ -340,7 +457,7 @@ macro_rules! rustler_accessors { }; } -/// **🤠 » rustler builder macro** +/// **🐎 » rustler builder macro** /// /// The `rustler!` macro is used to define a new `Rustler` struct, expanding the struct definition /// with the required fields and derives, and implementing the `RustlerAccessor` trait for the diff --git a/lib/rustlers/rustlerjar.rs b/lib/rustlers/rustlerjar.rs index cad59ba..d7232f9 100644 --- a/lib/rustlers/rustlerjar.rs +++ b/lib/rustlers/rustlerjar.rs @@ -5,7 +5,7 @@ use { tokio::sync::Mutex, }; -/// **🤠 » rustlerjar! macro** +/// **🐎 » rustlerjar! macro** /// /// A macro to create a `RustlerJar` with multiple Rustler instances and their corresponding /// mappings. @@ -14,20 +14,20 @@ use { /// /// ```rust /// let rustler_jar = rustlerjar! { -/// "NYSE", "NASDAQ" => FooRustler, -/// "BINANCE" => BarRustler, +/// "NYSE", "NASDAQ" => FooRustler::create, +/// "BINANCE" => BarRustler::create(url), /// }; /// ``` #[macro_export] macro_rules! rustlerjar { - ($($($name:expr),* => $rustler:ident),* $(,)?) => {{ + ($($($name:expr),* => $rustler:expr),* $(,)?) => {{ use $crate::rustlers::RustlerAccessor; let mut instances: Vec> = Vec::new(); let mut mappings = std::collections::HashMap::new(); $( - let instance = Box::new($rustler::create()); + let instance = Box::new($rustler()); $( mappings.insert($name.to_string(), instance.name()); )* @@ -38,7 +38,7 @@ macro_rules! rustlerjar { }}; } -/// **🤠 » RustlerJar** +/// **🐎 » RustlerJar** /// /// A `RustlerJar` is a collection of Rustlers and their corresponding mappings to the markets. /// Which indicates which Rustler should be used for a given market. Rustlers are stored as @@ -50,8 +50,8 @@ macro_rules! rustlerjar { /// The easiest way to create a `RustlerJar` is by using the `rustlerjar!` macro. /// ```rust /// let rustler_jar = rustlerjar! { -/// "NYSE", "NASDAQ" => FooRustler, -/// "BINANCE" => BarRustler, +/// "NYSE", "NASDAQ" => FooRustler::create, +/// "BINANCE" => BarRustler::create(url), /// }; /// /// let rustler = rustler_jar.get(&market); diff --git a/lib/rustlers/svc.rs b/lib/rustlers/svc.rs index 26e2cc8..921eb76 100644 --- a/lib/rustlers/svc.rs +++ b/lib/rustlers/svc.rs @@ -33,7 +33,7 @@ use { // stopJob?: Job, // } -/// **🤠 » Rustlers Service** +/// **🐎 » Rustlers Service** /// /// The `RustlersSvc` is a service that manages the rustlers and orchestrates their executions. pub struct RustlersSvc { @@ -43,7 +43,7 @@ pub struct RustlersSvc { } impl RustlersSvc { - /// **🤠 » create service** + /// **🐎 » create service** /// /// creates a new instance of the `RustlersSvc` /// @@ -64,7 +64,7 @@ impl RustlersSvc { } } - /// **🤠 » start rustlers** + /// **🐎 » start rustlers** /// /// gets market data from the the database and starts /// the corresponding rustler for each market @@ -79,7 +79,7 @@ impl RustlersSvc { Ok(()) } - /// **🤠 » restart rustlers** + /// **🐎 » restart rustlers** /// /// stops all rustlers and then starts them again pub async fn restart(&self) -> Result<()> { diff --git a/lib/socket/mod.rs b/lib/socket/mod.rs index 78decec..09ad0f0 100644 --- a/lib/socket/mod.rs +++ b/lib/socket/mod.rs @@ -1 +1 @@ -// TODO: websocket gateway +// TODO: websocket gateway with tokio-tungstenite diff --git a/🤠 rustler.code-workspace b/🐎 rustler-core.code-workspace similarity index 100% rename from 🤠 rustler.code-workspace rename to 🐎 rustler-core.code-workspace