feat: pub-sub feature

This commit is contained in:
Lucas Colombo 2024-05-10 22:34:32 -03:00
parent 091c726bcd
commit 0557781373
Signed by: lucas
GPG Key ID: EF34786CFEFFAE35
25 changed files with 912 additions and 91 deletions

302
Cargo.lock generated
View File

@ -137,6 +137,46 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711"
[[package]]
name = "async-channel"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-io"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af"
dependencies = [
"async-lock",
"autocfg",
"cfg-if",
"concurrent-queue",
"futures-lite",
"log",
"parking",
"polling",
"rustix 0.37.27",
"slab",
"socket2 0.4.10",
"waker-fn",
]
[[package]]
name = "async-lock"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b"
dependencies = [
"event-listener",
]
[[package]]
name = "async-stream"
version = "0.3.5"
@ -447,6 +487,29 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]]
name = "combine"
version = "4.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd"
dependencies = [
"bytes",
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "concurrent-queue"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "const-oid"
version = "0.9.6"
@ -626,6 +689,15 @@ dependencies = [
"once_cell",
]
[[package]]
name = "fastrand"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be"
dependencies = [
"instant",
]
[[package]]
name = "fastrand"
version = "2.0.2"
@ -699,6 +771,7 @@ checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
@ -730,6 +803,7 @@ dependencies = [
"futures-core",
"futures-task",
"futures-util",
"num_cpus",
]
[[package]]
@ -749,6 +823,32 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-lite"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce"
dependencies = [
"fastrand 1.9.0",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite",
"waker-fn",
]
[[package]]
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.60",
]
[[package]]
name = "futures-sink"
version = "0.3.30"
@ -761,6 +861,18 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
[[package]]
name = "futures-time"
version = "3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6404853a6824881fe5f7d662d147dc4e84ecd2259ba0378f272a71dab600758a"
dependencies = [
"async-channel",
"async-io",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "futures-util"
version = "0.3.30"
@ -770,6 +882,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
@ -823,6 +936,18 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "gloo-timers"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "h2"
version = "0.3.26"
@ -986,7 +1111,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"socket2 0.5.6",
"tokio",
"tower-service",
"tracing",
@ -1075,6 +1200,26 @@ dependencies = [
"syn 2.0.60",
]
[[package]]
name = "instant"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
dependencies = [
"cfg-if",
]
[[package]]
name = "io-lifetimes"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [
"hermit-abi",
"libc",
"windows-sys 0.48.0",
]
[[package]]
name = "itertools"
version = "0.12.1"
@ -1131,6 +1276,12 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "linux-raw-sys"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
[[package]]
name = "linux-raw-sys"
version = "0.4.13"
@ -1431,6 +1582,12 @@ dependencies = [
"syn 2.0.60",
]
[[package]]
name = "parking"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
[[package]]
name = "parking_lot"
version = "0.12.1"
@ -1544,6 +1701,22 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec"
[[package]]
name = "polling"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce"
dependencies = [
"autocfg",
"bitflags 1.3.2",
"cfg-if",
"concurrent-queue",
"libc",
"log",
"pin-project-lite",
"windows-sys 0.48.0",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
@ -1726,6 +1899,27 @@ dependencies = [
"getrandom",
]
[[package]]
name = "redis"
version = "0.25.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6472825949c09872e8f2c50bde59fcefc17748b6be5c90fd67cd8b4daca73bfd"
dependencies = [
"async-trait",
"bytes",
"combine",
"futures-util",
"itoa",
"percent-encoding",
"pin-project-lite",
"ryu",
"sha1_smol",
"socket2 0.5.6",
"tokio",
"tokio-util",
"url",
]
[[package]]
name = "redox_syscall"
version = "0.4.1"
@ -1859,6 +2053,20 @@ version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustix"
version = "0.37.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2"
dependencies = [
"bitflags 1.3.2",
"errno",
"io-lifetimes",
"libc",
"linux-raw-sys 0.3.8",
"windows-sys 0.48.0",
]
[[package]]
name = "rustix"
version = "0.38.34"
@ -1868,7 +2076,7 @@ dependencies = [
"bitflags 2.5.0",
"errno",
"libc",
"linux-raw-sys",
"linux-raw-sys 0.4.13",
"windows-sys 0.52.0",
]
@ -1880,9 +2088,12 @@ dependencies = [
"chrono",
"dotenvy",
"eyre",
"futures",
"getset",
"lool",
"prost",
"redis",
"rxrust",
"sea-orm",
"sea-orm-migration",
"tokio",
@ -1897,6 +2108,21 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80af6f9131f277a45a3fba6ce8e2258037bb0477a67e610d3c1fe046ab31de47"
[[package]]
name = "rxrust"
version = "1.0.0-beta.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9a529052da83e8897bb1f40459a0837b3c048eafabfae3bf1e7c050146b8141"
dependencies = [
"futures",
"futures-time",
"gloo-timers",
"once_cell",
"pin-project-lite",
"smallvec",
"wasm-bindgen-futures",
]
[[package]]
name = "ryu"
version = "1.0.17"
@ -2148,6 +2374,12 @@ dependencies = [
"digest",
]
[[package]]
name = "sha1_smol"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012"
[[package]]
name = "sha2"
version = "0.10.8"
@ -2199,6 +2431,16 @@ version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]]
name = "socket2"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "socket2"
version = "0.5.6"
@ -2547,8 +2789,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1"
dependencies = [
"cfg-if",
"fastrand",
"rustix",
"fastrand 2.0.2",
"rustix 0.38.34",
"windows-sys 0.52.0",
]
@ -2640,7 +2882,7 @@ dependencies = [
"mio",
"num_cpus",
"pin-project-lite",
"socket2",
"socket2 0.5.6",
"tokio-macros",
"windows-sys 0.48.0",
]
@ -2963,6 +3205,12 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "waker-fn"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690"
[[package]]
name = "want"
version = "0.3.1"
@ -3009,6 +3257,18 @@ dependencies = [
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76bc14366121efc8dbb487ab05bcc9d346b3b5ec0eaa76e46594cabbe51762c0"
dependencies = [
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.92"
@ -3038,6 +3298,16 @@ version = "0.2.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96"
[[package]]
name = "web-sys"
version = "0.3.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "whoami"
version = "1.5.1"
@ -3048,6 +3318,28 @@ dependencies = [
"wasite",
]
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.52.0"

View File

@ -2,7 +2,7 @@
name = "rustler-core"
version = "0.0.1"
edition = "2021"
description = "🤠 » rustler-core market data extractor core functionality"
description = "🐎 » rustler-core market data extractor core functionality"
authors = ["Lucas Colombo <lucas@lucode.ar>"]
build = "lib/build.rs"
@ -35,6 +35,9 @@ sea-orm-migration = { version = "0.12.15", features = [
"runtime-tokio-native-tls",
"sqlx-sqlite",
] }
redis = { version = "0.25.3", features = ["tokio-comp"] }
futures = "0.3.30"
rxrust = "1.0.0-beta.7"
[dependencies.lool]
version = "^0.2.0" # crates: disable-check

View File

@ -27,7 +27,6 @@ tasks:
desc: ⚡ build rustler «release»
cmds:
- cargo build --release
- python check_size.py
fmt:
desc: 🎨 format rustler

View File

@ -14,10 +14,18 @@ rustler!(
pub struct FooRustler {}
);
#[allow(dead_code)]
impl FooRustler {
pub fn create() -> impl Rustler {
Self::default()
}
pub fn create_with_external_stuff(name: String) -> impl Fn() -> FooRustler {
move || {
println!("Creating a new FooRustler using external name = {}", name);
Self::default()
}
}
}
#[async_trait]

88
examples/bus-pub.rs Normal file
View File

@ -0,0 +1,88 @@
mod binance;
use {
eyre::{set_hook, DefaultHandler, Result},
lool::s,
rustler_core::rustlers::{bus, MarketHourType, Quote},
};
#[tokio::main]
async fn main() -> Result<()> {
set_hook(Box::new(DefaultHandler::default_with))?;
let mut px = bus::publisher(&"redis://127.0.0.1/").await?;
let variations = vec![-4.3, -1.1, 2.0, -0.5, 1.5, -1.3, 0.7, 0.3, -0.1, 3.4];
let vars = variations.clone();
let vars2 = variations.clone();
let mut publisher = px.clone();
let publish1 = async move {
let mut idx = 0;
let mut price = 50000.0;
loop {
// sleep for 1 second
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
// random percentage change
let change_percent = vars[idx];
price = price + (price * change_percent / 100.0);
let quote = Quote {
market: s!("BINANCE"),
id: s!("BTCUSDT"),
change_percent,
market_hours: MarketHourType::Regular,
price,
time: 198798798798,
};
println!("Publishing quote, {}", quote);
publisher.publish(quote).await?;
// keep the index within 0 and length of variations
idx = (idx + 1) % vars.len();
}
#[allow(unreachable_code)]
Result::<()>::Ok(())
};
let publish2 = async move {
let mut idx = 0;
let mut price = 300.0;
loop {
// sleep for 1 second
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
// random percentage change
let change_percent = vars2[idx];
price = price + (price * change_percent / 100.0);
let quote = Quote {
market: s!("NASDAQ"),
id: s!("GOOGL"),
change_percent,
market_hours: MarketHourType::Regular,
price,
time: 198798798798,
};
println!("Publishing quote, {}", quote);
px.publish(quote).await?;
// keep the index within 0 and length of vars2
idx = (idx + 1) % vars2.len();
}
#[allow(unreachable_code)]
Result::<()>::Ok(())
};
let _ = tokio::join!(publish1, publish2);
Ok(())
}

26
examples/bus-sub.rs Normal file
View File

@ -0,0 +1,26 @@
use {
eyre::{set_hook, DefaultHandler, Result},
rustler_core::rustlers::{bus, Quote, Ticker},
rxrust::observable::{ObservableExt, ObservableItem},
};
#[tokio::main]
async fn main() -> Result<()> {
set_hook(Box::new(DefaultHandler::default_with))?;
let mut sx = bus::subscriber::<Quote, _>(&"redis://127.0.0.1/").await?;
let ticker = Ticker {
market: "BINANCE".to_string(),
symbol: "BTCUSDT".to_string(),
};
let _obs = sx.stream().await?.filter(move |quote| quote.belongs_to(&ticker)).subscribe(|v| {
println!("Received quote: {}", v);
});
// wait for 10 seconds
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
Ok(())
}

View File

@ -19,7 +19,7 @@ async fn main() -> Result<()> {
let mut rustler = RustlersSvc::new(
conn.clone(),
rustlerjar! {
"BINANCE" => FooRustler
"BINANCE" => FooRustler::create
},
)
.await;

View File

@ -1,7 +1,7 @@
use sea_orm_migration::{async_trait::async_trait, prelude::*};
#[derive(DeriveMigrationName)]
/// 🤠 » create table `market`
/// 🐎 » create table `market`
pub struct Migration;
#[async_trait]

View File

@ -1,7 +1,7 @@
use sea_orm_migration::{async_trait::async_trait, prelude::*};
#[derive(DeriveMigrationName)]
/// 🤠 » create table `ticker`
/// 🐎 » create table `ticker`
pub struct Migration;
#[async_trait]

View File

@ -4,7 +4,7 @@ use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "market")]
/// 🤠 » market entity model
/// 🐎 » market entity model
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: String,

View File

@ -4,7 +4,7 @@ use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "ticker")]
/// 🤠 » ticker entity model
/// 🐎 » ticker entity model
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: String,

View File

@ -7,7 +7,7 @@ use {
sea_orm::{DatabaseConnection, DbErr, EntityTrait, IntoActiveModel},
};
/// 🤠 » service for the `Market` entity
/// 🐎 » service for the `Market` entity
pub struct Service {
conn: DatabaseConnection,
}
@ -17,19 +17,19 @@ impl Service {
Self { conn }
}
/// 🤠 » gets all markets from the database
/// 🐎 » gets all markets from the database
pub async fn get_all(&self) -> Result<Vec<MarketModel>, DbErr> {
let markets = Market::find().all(&self.conn).await?;
Ok(markets)
}
/// 🤠 » gets a market by its id
/// 🐎 » gets a market by its id
pub async fn create(&self, market: MarketModel) -> Result<MarketModel, DbErr> {
Market::insert(market.clone().into_active_model()).exec(&self.conn).await?;
Ok(market)
}
/// 🤠 » gets all markets with their tickers
/// 🐎 » gets all markets with their tickers
pub async fn get_all_with_tickers(
&self,
) -> Result<Vec<(MarketModel, Vec<TickerModel>)>, DbErr> {

View File

@ -7,7 +7,7 @@ use {
sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, IntoActiveModel, QueryFilter},
};
/// 🤠 » service for the `Ticker` entity
/// 🐎 » service for the `Ticker` entity
pub struct Service {
conn: DatabaseConnection,
}

View File

@ -12,7 +12,7 @@ use {
const RUSTLER_GRPC_API_ADDR: &str = "RUSTLER_GRPC_API_ADDR";
/// 🤠 » starts the rustler gRPC server
/// 🐎 » starts the rustler gRPC server
pub async fn start(conn: DatabaseConnection) -> Result<()> {
fn get_default_addr() -> String {
let addr = "0.0.0.0:50051";

View File

@ -47,7 +47,7 @@ impl Market {
}
}
/// 🤠 » grpc Server to manage market entities
/// 🐎 » grpc Server to manage market entities
pub struct GrpcServer {
pub(crate) svc: market::Service,
}
@ -59,7 +59,7 @@ impl GrpcServer {
}
}
/// 🤠 » creates the market api server
/// 🐎 » creates the market api server
pub fn svc(self) -> MarketApiServer<GrpcServer> {
MarketApiServer::new(self)
}

View File

@ -32,7 +32,7 @@ impl Ticker {
}
}
/// 🤠 » grpc Server to manage ticker entities
/// 🐎 » grpc Server to manage ticker entities
pub struct GrpcServer {
pub(crate) svc: ticker::Service,
}
@ -44,7 +44,7 @@ impl GrpcServer {
}
}
/// 🤠 » creates the ticker api server
/// 🐎 » creates the ticker api server
pub fn svc(self) -> TickerApiServer<GrpcServer> {
TickerApiServer::new(self)
}

123
lib/rustlers/bus/mod.rs Normal file
View File

@ -0,0 +1,123 @@
use {eyre::Result, redis::Client, std::fmt::Debug};
pub mod publish;
pub mod subscribe;
pub(crate) const KEY_PREFIX: &str = "rustler";
/// creates a message redis key from a prefix and a key
pub(crate) fn key<T: AsRef<str>, K: AsRef<str>>(prefix: T, key: K) -> String {
let prefix = prefix.as_ref();
let key = key.as_ref();
match prefix.is_empty() {
true => key.to_string(),
false => format!("{}:{}", prefix, key),
}
}
/// represents a pub or sub handler that can be prefixed
pub trait PrefixedPubSub {
fn get_prefix(&self) -> String;
fn set_prefix(&mut self, prefix: &str) -> &mut Self;
fn with_prefix(&mut self, prefix: &str) -> &mut Self {
self.set_prefix(prefix);
self
}
fn without_prefix(&mut self) -> &mut Self {
self.set_prefix("");
self
}
}
/// 🐎 » represents a an entity that can provide a redis client
pub trait RedisClient {
fn get_client(&self) -> Result<Client>;
}
impl RedisClient for Client {
fn get_client(&self) -> Result<Client> {
let redis = self.clone();
Ok(redis)
}
}
impl RedisClient for &str {
fn get_client(&self) -> Result<Client> {
let redis = Client::open(*self)?;
Ok(redis)
}
}
impl RedisClient for String {
fn get_client(&self) -> Result<Client> {
let redis = Client::open(self.as_str())?;
Ok(redis)
}
}
/// 🐎 » represents a value that can be serialized to a redis value
pub trait ToRedisVal {
fn to_redis_val(&self) -> Vec<(String, String)>;
}
/// 🐎 » represents a value that can be serialized to a redis key
pub trait ToRedisKey {
fn to_redis_key(&self) -> String;
}
/// 🐎 » represents a value that can be serialized to and from a redis message
pub trait ToFromRedisMessage {
fn as_message(&self) -> String;
fn from_message<T: AsRef<str>>(msg: T) -> Self;
}
/// 🐎 » supertrait combining all redis object traits + debug + send + sync + 'static
pub trait RedisMessage:
ToRedisVal + ToRedisKey + ToFromRedisMessage + Debug + Clone + Send + Sync + PartialEq + 'static
{
}
/// 🐎 » **publisher**: create bus publisher
///
/// **Arguments**
/// - `redis` - a redis client or a redis connection string
///
/// **Returns**
/// - a new `Publisher` instance
pub async fn publisher<RM: RedisMessage, RC: RedisClient>(
redis: &RC,
) -> Result<publish::Publisher<RM>> {
publish::Publisher::new(redis).await
}
/// 🐎 » **subscriber**: create bus subscriber
///
/// **Arguments**
/// - `redis` - a redis client or a redis connection string
///
/// **Returns**
/// - a new `Subscriber` instance
pub async fn subscriber<RM: RedisMessage, RC: RedisClient>(
redis: &RC,
) -> Result<subscribe::Subscriber<RM>> {
subscribe::Subscriber::new(redis).await
}
/// 🐎 » **pubsub*: create bus publisher and subscriber
///
/// **Arguments**
/// - `redis` - a redis client or a redis connection string
///
/// **Returns**
/// - a tuple containing a `Publisher` and a `Subscriber` instance
pub async fn pubsub<RO: RedisMessage, T: RedisClient>(
redis: &T,
) -> Result<(publish::Publisher<RO>, subscribe::Subscriber<RO>)> {
let publisher = publish::Publisher::new(redis).await?;
let subscriber = subscribe::Subscriber::new(redis).await?;
Ok((publisher, subscriber))
}

View File

@ -0,0 +1,52 @@
use {
super::{key, PrefixedPubSub, RedisClient, RedisMessage, KEY_PREFIX},
eyre::Result,
redis::{aio::MultiplexedConnection, AsyncCommands},
};
/// 🐎 » bus **Publisher**
#[derive(Clone)]
pub struct Publisher<RM: RedisMessage> {
conn: MultiplexedConnection,
key_prefix: String,
resource_type: std::marker::PhantomData<RM>,
}
impl<RM: RedisMessage> PrefixedPubSub for Publisher<RM> {
fn get_prefix(&self) -> String {
self.key_prefix.clone()
}
fn set_prefix(&mut self, prefix: &str) -> &mut Self {
self.key_prefix = prefix.to_string();
self
}
}
impl<RM: RedisMessage> Publisher<RM> {
/// 🐎 » create a new bus publisher
pub async fn new<RC>(redis: &RC) -> Result<Self>
where
RC: RedisClient,
{
let redis = redis.get_client()?;
let conn = redis.get_multiplexed_tokio_connection().await?;
Ok(Self {
conn,
key_prefix: KEY_PREFIX.to_string(),
resource_type: std::marker::PhantomData,
})
}
/// 🐎 » publish a message to the bus
pub async fn publish(&mut self, value: RM) -> Result<()> {
let obj_key = key(self.get_prefix(), value.to_redis_key());
// set hash key
self.conn.hset_multiple(&obj_key, value.to_redis_val().as_slice()).await?;
// publish to the appropriate channel
self.conn.publish(&obj_key, value.as_message()).await?;
Ok(())
}
}

View File

@ -0,0 +1,113 @@
use {
super::{key, PrefixedPubSub, RedisClient, RedisMessage, KEY_PREFIX},
eyre::Result,
futures::StreamExt,
lool::{fail, s},
rxrust::{
observable::BoxIt, observer::Observer, ops::box_it::CloneableBoxOpThreads,
subject::SubjectThreads, subscription::Subscription,
},
std::convert::Infallible,
};
// IDEA: create another version using tokio broadcast channels
// https://github.com/exein-io/pulsar/blob/99ad35c8d13eaf1a37d7b6a9dcb812a5a1231d00/crates/pulsar-core/src/bus.rs
/// 🐎 » bus **Subscriber**
pub struct Subscriber<RM: RedisMessage> {
// TODO: replace with storing tokio multiplexed connection like in publish.rs when redis@0.26.0
// is released see https://github.com/redis-rs/redis-rs/issues/1137.
// this way we can just clone the connection when needing instead of storing the
// redis client and creating a new connection
client: redis::Client,
subject: Option<SubjectThreads<RM, Infallible>>,
key_prefix: String,
pattern: String,
}
impl<RM: RedisMessage> PrefixedPubSub for Subscriber<RM> {
fn get_prefix(&self) -> String {
self.key_prefix.clone()
}
fn set_prefix(&mut self, prefix: &str) -> &mut Self {
self.key_prefix = s!(prefix);
self
}
}
impl<RM: RedisMessage> Subscriber<RM> {
/// 🐎 » create a new bus subscriber
pub async fn new<RC>(redis: &RC) -> Result<Self>
where
RC: RedisClient,
{
Ok(Self {
pattern: s!("*"),
client: redis.get_client()?,
key_prefix: s!(KEY_PREFIX),
subject: None,
})
}
pub fn with_pattern(&mut self, pattern: &str) -> &mut Self {
self.pattern = s!(pattern);
self
}
pub fn get_pattern(&self) -> String {
key(self.get_prefix(), self.pattern.clone())
}
/// 🐎 » subscribe to a channel
pub async fn start_streaming(&mut self) -> Result<()> {
if self.subject.is_none() {
self.subject = Some(SubjectThreads::default());
}
if self.subject.is_some() && self.subject.as_ref().unwrap().is_closed() {
drop(self.subject.take());
self.subject = Some(SubjectThreads::default());
}
let pattern = self.pattern.clone();
let mut conn = self.client.get_async_pubsub().await?;
let prefix = self.get_prefix();
if let Some(subject) = self.subject.as_mut() {
let mut stream = subject.clone();
tokio::spawn(async move {
conn.psubscribe(key(prefix, pattern)).await?;
let mut msg_stream = conn.on_message();
while let Some(msg) = msg_stream.next().await {
if let Ok(payload) = msg.get_payload::<String>() {
// TODO: handle possible panic when parsing message
// using catch_unwind
let message = RM::from_message(payload);
stream.next(message);
}
}
// if the connection with redis is closed, complete the stream
stream.complete();
Result::<()>::Ok(())
});
}
Ok(())
}
pub async fn stream(&mut self) -> Result<CloneableBoxOpThreads<RM, Infallible>> {
if self.subject.is_none() {
self.start_streaming().await?;
}
match self.subject.as_ref() {
Some(subject) => Ok(subject.clone().box_it()),
None => fail!("Could not start streaming messages from redis bus"),
}
}
}

View File

@ -1,6 +1,6 @@
mod rustler;
pub mod bus;
pub mod rustlerjar;
pub mod svc;
pub use rustler::*;

View File

@ -2,11 +2,16 @@ pub extern crate chrono;
pub extern crate eyre;
use {
super::bus::{RedisMessage, ToFromRedisMessage, ToRedisKey, ToRedisVal},
crate::entities::{market, ticker},
async_trait::async_trait,
chrono::{DateTime, Local},
eyre::Result,
std::collections::HashMap,
lool::s,
std::{
collections::HashMap,
fmt::{self, Display, Formatter},
},
};
#[derive(Debug, Clone, PartialEq, Eq, Default)]
@ -20,10 +25,28 @@ pub enum RustlerStatus {
#[derive(Debug, Clone)]
pub enum MarketHourType {
Pre,
Regular,
Post,
Extended,
Pre = 0,
Regular = 1,
Post = 2,
Extended = 3,
}
impl From<MarketHourType> for u8 {
fn from(market_hour_type: MarketHourType) -> Self {
market_hour_type as u8
}
}
impl From<u8> for MarketHourType {
fn from(market_hour_type: u8) -> Self {
match market_hour_type {
0 => MarketHourType::Pre,
1 => MarketHourType::Regular,
2 => MarketHourType::Post,
3 => MarketHourType::Extended,
_ => MarketHourType::Regular,
}
}
}
#[derive(Debug, Clone)]
@ -31,11 +54,105 @@ pub struct Quote {
pub id: String,
pub market: String,
pub price: f64,
pub change_percent: Option<f64>,
pub time: Option<i64>,
pub market_hours: Option<MarketHourType>,
pub change_percent: f64,
pub time: i64,
pub market_hours: MarketHourType,
}
impl Quote {
pub fn belongs_to(&self, ticker: &Ticker) -> bool {
self.id == ticker.symbol && self.market == ticker.market
}
}
impl Display for Quote {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl ToRedisVal for Quote {
fn to_redis_val(&self) -> Vec<(String, String)> {
let market_hours_u8: u8 = self.market_hours.clone().into();
vec![
(s!("id"), self.id.to_owned()),
(s!("market"), self.market.to_owned()),
(s!("price"), self.price.to_string()),
(s!("market_hours"), market_hours_u8.to_string()),
(s!("time"), self.time.to_string()),
(s!("change_percent"), self.change_percent.to_string()),
]
}
}
impl ToRedisKey for Quote {
fn to_redis_key(&self) -> String {
format!("quote:{}:{}", self.market, self.id)
}
}
impl ToFromRedisMessage for Quote {
fn as_message(&self) -> String {
// id¦market¦price¦change_percent¦time¦market_hours
format!(
"{}¦{}¦{}¦{}¦{}¦{}",
self.id,
self.market,
self.price,
self.change_percent,
self.time,
Into::<u8>::into(self.market_hours.clone())
)
}
/// 🐎 » creates a `Quote` from a message
///
/// the message should be in the format `id¦market¦price¦change_percent¦time¦market_hours`
///
/// **panics** if the message is not in the correct format
fn from_message<T: AsRef<str>>(msg: T) -> Self {
let msg = msg.as_ref();
let parts: Vec<&str> = msg.split('¦').collect();
let id = parts[0].to_string();
let market = parts[1].to_string();
let price = parts[2].parse::<f64>().unwrap();
let change_percent = parts[3].parse::<f64>().unwrap();
let time = parts[4].parse::<i64>().unwrap();
let market_hours = parts[5].parse::<u8>().unwrap().into();
Self {
id,
market,
price,
change_percent,
time,
market_hours,
}
}
}
impl PartialEq<Ticker> for Quote {
fn eq(&self, other: &Ticker) -> bool {
self.id == other.symbol && self.market == other.market
}
}
impl PartialEq<Quote> for Ticker {
fn eq(&self, other: &Quote) -> bool {
self.symbol == other.id && self.market == other.market
}
}
impl PartialEq<Quote> for Quote {
fn eq(&self, other: &Quote) -> bool {
self.id == other.id && self.market == other.market
}
}
impl RedisMessage for Quote {}
#[derive(Debug, Clone)]
pub struct RustlerOpts {
pub connect_on_start: bool,
@ -58,7 +175,7 @@ pub struct ScrapperCallbacks {
pub on_message: Option<fn(message: Quote) -> Result<()>>,
}
/// 🤠 » a scruct representing a ticker
/// 🐎 » a scruct representing a ticker
///
/// in `rustler` a ticker is the union between a symbol (stock identifier) and its market
///
@ -83,7 +200,7 @@ impl Ticker {
tickers.iter().map(|t| Self::from(t, market)).collect()
}
/// 🤠 » returns the key of the ticker
/// 🐎 » returns the key of the ticker
pub fn key(&self) -> String {
format!("{}:{}", self.market, self.symbol)
}
@ -130,17 +247,17 @@ pub trait RustlerAccessor {
#[async_trait]
pub trait Rustler: RustlerAccessor + Send + Sync {
// #region Unimplemented trait functions
/// 🤠 » fn called after tickers are added to the rustler
/// 🐎 » fn called after tickers are added to the rustler
fn on_add(&mut self, tickers: &[Ticker]) -> Result<()>;
/// 🤠 » fn called after tickers are deleted from the rustler
/// 🐎 » fn called after tickers are deleted from the rustler
fn on_delete(&mut self, tickers: &[Ticker]) -> Result<()>;
/// 🤠 » connects the rustler to the data source
/// 🐎 » connects the rustler to the data source
async fn connect(&mut self) -> Result<()>;
/// 🤠 » disconnects the rustler from the data source
/// 🐎 » disconnects the rustler from the data source
async fn disconnect(&mut self) -> Result<()>;
// #endregion
/// 🤠 » starts the rustler
/// 🐎 » starts the rustler
async fn start(&mut self) -> Result<()> {
let opts = self.opts();
if opts.connect_on_start {
@ -149,7 +266,7 @@ pub trait Rustler: RustlerAccessor + Send + Sync {
Ok(())
}
/// 🤠 » updates last stop and last run times and calls the appropriate callback
/// 🐎 » updates last stop and last run times and calls the appropriate callback
///
/// should be called after the status of the rustler changes
fn handle_status_change(&mut self) -> Result<()> {
@ -340,7 +457,7 @@ macro_rules! rustler_accessors {
};
}
/// **🤠 » rustler builder macro**
/// **🐎 » rustler builder macro**
///
/// The `rustler!` macro is used to define a new `Rustler` struct, expanding the struct definition
/// with the required fields and derives, and implementing the `RustlerAccessor` trait for the

View File

@ -5,7 +5,7 @@ use {
tokio::sync::Mutex,
};
/// **🤠 » rustlerjar! macro**
/// **🐎 » rustlerjar! macro**
///
/// A macro to create a `RustlerJar` with multiple Rustler instances and their corresponding
/// mappings.
@ -14,20 +14,20 @@ use {
///
/// ```rust
/// let rustler_jar = rustlerjar! {
/// "NYSE", "NASDAQ" => FooRustler,
/// "BINANCE" => BarRustler,
/// "NYSE", "NASDAQ" => FooRustler::create,
/// "BINANCE" => BarRustler::create(url),
/// };
/// ```
#[macro_export]
macro_rules! rustlerjar {
($($($name:expr),* => $rustler:ident),* $(,)?) => {{
($($($name:expr),* => $rustler:expr),* $(,)?) => {{
use $crate::rustlers::RustlerAccessor;
let mut instances: Vec<Box<dyn $crate::rustlers::Rustler>> = Vec::new();
let mut mappings = std::collections::HashMap::new();
$(
let instance = Box::new($rustler::create());
let instance = Box::new($rustler());
$(
mappings.insert($name.to_string(), instance.name());
)*
@ -38,7 +38,7 @@ macro_rules! rustlerjar {
}};
}
/// **🤠 » RustlerJar**
/// **🐎 » RustlerJar**
///
/// A `RustlerJar` is a collection of Rustlers and their corresponding mappings to the markets.
/// Which indicates which Rustler should be used for a given market. Rustlers are stored as
@ -50,8 +50,8 @@ macro_rules! rustlerjar {
/// The easiest way to create a `RustlerJar` is by using the `rustlerjar!` macro.
/// ```rust
/// let rustler_jar = rustlerjar! {
/// "NYSE", "NASDAQ" => FooRustler,
/// "BINANCE" => BarRustler,
/// "NYSE", "NASDAQ" => FooRustler::create,
/// "BINANCE" => BarRustler::create(url),
/// };
///
/// let rustler = rustler_jar.get(&market);

View File

@ -33,7 +33,7 @@ use {
// stopJob?: Job,
// }
/// **🤠 » Rustlers Service**
/// **🐎 » Rustlers Service**
///
/// The `RustlersSvc` is a service that manages the rustlers and orchestrates their executions.
pub struct RustlersSvc {
@ -43,7 +43,7 @@ pub struct RustlersSvc {
}
impl RustlersSvc {
/// **🤠 » create service**
/// **🐎 » create service**
///
/// creates a new instance of the `RustlersSvc`
///
@ -64,7 +64,7 @@ impl RustlersSvc {
}
}
/// **🤠 » start rustlers**
/// **🐎 » start rustlers**
///
/// gets market data from the the database and starts
/// the corresponding rustler for each market
@ -79,7 +79,7 @@ impl RustlersSvc {
Ok(())
}
/// **🤠 » restart rustlers**
/// **🐎 » restart rustlers**
///
/// stops all rustlers and then starts them again
pub async fn restart(&self) -> Result<()> {

View File

@ -1 +1 @@
// TODO: websocket gateway
// TODO: websocket gateway with tokio-tungstenite