diff --git a/.github/img/diagram.svg b/.github/img/diagram.svg new file mode 100644 index 0000000..790e5e6 --- /dev/null +++ b/.github/img/diagram.svg @@ -0,0 +1,813 @@ + + + + + + + + + + + + + + + rustlers: RustlerJar + + + channerl: (Sender, Receiver) + + + publisher: dyn PublisherTrait + + + + start(): Result<()> + + + + + + + + + RustlerSvc + + + + + + + + + + + + + + + + + + rustlers: RustlerJar + + + + + + + + + RustlerJar + + + + + + + + + + + + + Rustler + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + publisher + [bus] + + + + + + + + + + + + + + + + + RustlerSvc + + + + + + + + + + + + + + + + + + + + + + start + + + + + + + + + + + + + + + + + + start + receiving + + + + + + + + + + + + + Rustler + + + + + + + + + + + + + + + + + + + + + + + + + + add + tickers + + + + + + + + + + + + + Rustler + [thread] + + + + + + + + + + + + + + + + + + + connect + and start + rustling + (optional) + + + + + + + + + + + + + + + + + + connect + + + + + + + + + + + + + external + source + + + + + + + + + + + + + + + + + + + + + + + + + + + + quote + msg + + + + + + + + + + + + + + + + + + sender.send(quote) + + + + + + + + + + + + + + + + + + publish quote + [bus] + + + + + + + + + + + + + RustlerSvc + [thread] + + + + + + + recv + + + Redis + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + hset + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.github/img/rustler.svg b/.github/img/rustler.svg index 3e148fd..1359691 100644 --- a/.github/img/rustler.svg +++ b/.github/img/rustler.svg @@ -5,7 +5,5 @@ .a { fill: #ffffff; } } - - - + \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index 18edc72..428cdc6 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -4,13 +4,13 @@ // "migration": true, // "entities": true, - // โš™๏ธ + // โš™๏ธ // ".env": true, "Taskfile.yaml": true, ".cocorc": true, ".task": true, ".cargo": true, - ".github": true, + // ".github": true, "rustfmt.toml": true, // "**/**/Cargo.toml": true, @@ -32,6 +32,7 @@ // "rustler.db": true, "rustler.db-shm": true, "rustler.db-wal": true, - "rustler.db.bkp": true, + "rustler.db.bkp": true } -} \ No newline at end of file + // "editor.formatOnSave": true +} diff --git a/README.md b/README.md index e0c82dc..e0221d0 100644 --- a/README.md +++ b/README.md @@ -14,33 +14,40 @@ ## Why "rustler" -A `rustler` is a person who steals live***stock***. Well, this library is a service that collects _stock_ market data from the internet. So, it's a "_rustler_" for stock market data. +A `rustler` is a person who steals live**_stock_**. Well, this library is a service that collects _stock_ market data from the internet. So, it's a "_rustler_" for stock market data. -Also, this library is built using the `Rust` programming language... so, ***rust***ler ๐Ÿ˜Š +Also, this library is built using the `Rust` programming language... so, **_rust_**ler ๐Ÿ˜Š ## What this library includes This library defines the core functionality for a `rustler`. It includes the following: -- A [`Rustler`](./lib/rustlers/rustlers.rs) trait that defines the core functionality for a `rustler`. -- A [`RustlersSvc`](./lib/rustlers/svc.rs) which orchestrates the `rustlers` at runtime, scheduling them to scrape stock pricing data between market hours. +- A [`Rustler`](./lib/rustlers/rustlers.rs) trait that defines the core functionality for a `rustler`. +- A [`RustlersSvc`](./lib/rustlers/svc.rs) which orchestrates the `rustlers` at runtime, scheduling them to scrape stock pricing data between market hours. Apart from the above, this library also defines: -- a [database schema](./lib/entities/orm/) for storing market hours, which is used by the `RustlersSvc` to schedule the `rustlers`. -- initial [database migrations](./lib/entities/migration) to create the schema. -- a [grpc server](./lib/grpc/) to interact with the rustlers database. -- unimplemented a [websocket gateway server](./lib/socket/) to stream stock pricing data to subscribed clients +- a [database schema](./lib/entities/orm/) for storing market hours, which is used by the `RustlersSvc` to schedule the `rustlers`. +- initial [database migrations](./lib/entities/migration) to create the schema. +- a [grpc server](./lib/grpc/) to interact with the rustlers database. +- unimplemented a [websocket gateway server](./lib/socket/) to stream stock pricing data to subscribed clients > [!NOTE] -> +> > This library defines a _rustler_ as a service that scrapes stock pricing data for a > particular market. -> +> > Although this library contains the core and abstract functionality for the rustlers, it doesn't include any concrete implementation for them. -> +> > Actual concrete implementations for each market cannot be published for many reasons. +## Diagram + +The following diagram shows the core components of the `rustler-core` library and how they interact +with each other. + +

+ ## Example Check the [examples](./examples) directory for an example of how to use this library. diff --git a/examples/binance/mod.rs b/examples/binance/mod.rs index 46d4609..e1e11b3 100644 --- a/examples/binance/mod.rs +++ b/examples/binance/mod.rs @@ -1,12 +1,16 @@ use { async_trait::async_trait, - eyre::Result, - lool::logger::info, + eyre::{OptionExt, Result}, + lool::{ + logger::{debug, error, info}, + s, + }, rustler_core::{ rustler, - rustlers::{Rustler, RustlerAccessor, RustlerStatus, Ticker}, + rustlers::{svc::quote, MarketHourType, Rustler, RustlerAccessor, RustlerStatus, Ticker}, }, std::collections::HashMap, + tokio::select, }; rustler!( @@ -26,6 +30,41 @@ impl FooRustler { Self::default() } } + + async fn start_rustling(&mut self) -> Result<()> { + let sender = self.msg_sender().as_ref().ok_or_eyre("Sender not found")?.clone(); + + tokio::spawn(async move { + debug!("Starting rustling"); + select! { + _ = async move { + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + let result = sender + .send(quote( + s!("BTCUSDT"), + s!("BINANCE"), + 50000.0, + 0.0, + 198798798798, + MarketHourType::Regular, + )) + .await; + + if let Err(e) = result { + error!("Failed to send message: {}", e); + } + } + } => { + error!("Rustling stopped"); + }, + + } + }); + + Ok(()) + } } #[async_trait] @@ -37,7 +76,9 @@ impl Rustler for FooRustler { self.set_status(RustlerStatus::Connecting)?; - info!("Connecting to data source"); + info!("(mock) Connecting to data source"); + self.start_rustling().await?; + info!("(mock) Connected to data source"); self.set_status(RustlerStatus::Connected)?; @@ -51,23 +92,20 @@ impl Rustler for FooRustler { } self.set_status(RustlerStatus::Disconnecting)?; - - info!("Disconnecting from data source"); - + info!("(mock) Disconnecting from data source"); self.set_status(RustlerStatus::Disconnected)?; + info!("(mock) Disconnected from data source"); Ok(()) } fn on_add(&mut self, tickers: &[Ticker]) -> Result<()> { - info!("Adding tickers: {:?}", tickers); - + info!("(mock) Adding tickers: {:?}", tickers); Ok(()) } fn on_delete(&mut self, tickers: &[Ticker]) -> Result<()> { - info!("Deleting tickers: {:?}", tickers); - + info!("(mock) Deleting tickers: {:?}", tickers); Ok(()) } } diff --git a/examples/bus-pub.rs b/examples/bus-pub.rs index 13a2ec4..51de13d 100644 --- a/examples/bus-pub.rs +++ b/examples/bus-pub.rs @@ -3,13 +3,16 @@ mod binance; use { eyre::{set_hook, DefaultHandler, Result}, lool::s, - rustler_core::rustlers::{bus, MarketHourType, Quote}, + rustler_core::rustlers::{ + bus::{self, PublisherTrait}, + MarketHourType, Quote, + }, }; #[tokio::main] async fn main() -> Result<()> { set_hook(Box::new(DefaultHandler::default_with))?; - let mut px = bus::publisher(&"redis://127.0.0.1/").await?; + let mut px = bus::redis::publisher(&"redis://127.0.0.1/").await?; let variations = vec![-4.3, -1.1, 2.0, -0.5, 1.5, -1.3, 0.7, 0.3, -0.1, 3.4]; let vars = variations.clone(); diff --git a/examples/bus-sub.rs b/examples/bus-sub.rs index dafe12e..0c6f765 100644 --- a/examples/bus-sub.rs +++ b/examples/bus-sub.rs @@ -1,6 +1,9 @@ use { eyre::{set_hook, DefaultHandler, Result}, - rustler_core::rustlers::{bus, Quote, Ticker}, + rustler_core::rustlers::{ + bus::{self, SubscriberTrait}, + Quote, Ticker, + }, rxrust::observable::{ObservableExt, ObservableItem}, }; @@ -8,7 +11,7 @@ use { async fn main() -> Result<()> { set_hook(Box::new(DefaultHandler::default_with))?; - let mut sx = bus::subscriber::(&"redis://127.0.0.1/").await?; + let mut sx = bus::redis::subscriber::(&"redis://127.0.0.1/").await?; let ticker = Ticker { market: "BINANCE".to_string(), diff --git a/examples/rustler.rs b/examples/rustler.rs index d2e7038..a8c69cf 100644 --- a/examples/rustler.rs +++ b/examples/rustler.rs @@ -5,7 +5,11 @@ use { dotenvy::dotenv, eyre::{set_hook, DefaultHandler, Result}, lool::logger::{info, ConsoleLogger, Level}, - rustler_core::{entities::db::get_connection, grpc, rustlerjar, rustlers::svc::RustlersSvc}, + rustler_core::{ + entities::db::get_connection, + grpc, rustlerjar, + rustlers::{bus, svc::RustlersSvc, Quote}, + }, tokio::join, }; @@ -15,12 +19,15 @@ async fn main() -> Result<()> { ConsoleLogger::default_setup(Level::Trace, "rustler")?; dotenv()?; + let publisher = bus::redis::publisher::(&"redis://127.0.0.1/").await?; + let conn = get_connection().await?; let mut rustler = RustlersSvc::new( conn.clone(), rustlerjar! { "BINANCE" => FooRustler::create }, + publisher, ) .await; diff --git a/lib/rustlers/bus.rs b/lib/rustlers/bus.rs new file mode 100644 index 0000000..7bb22f3 --- /dev/null +++ b/lib/rustlers/bus.rs @@ -0,0 +1,45 @@ +use std::{convert::Infallible, fmt::Debug}; + +use eyre::Result; +use rxrust::ops::box_it::CloneableBoxOpThreads; +use tonic::async_trait; + +pub mod redis; + +/// ๐ŸŽ ยป represents a value that can be serialized to a bus value +pub trait ToBusVal { + fn to_bus_val(&self) -> Vec<(String, String)>; +} + +/// ๐ŸŽ ยป represents a value that can be serialized to a bus key +pub trait ToBusKey { + fn to_bus_key(&self) -> String; +} + +/// ๐ŸŽ ยป represents a value that can be serialized to and from a bus message +pub trait ToFromBusMessage { + fn as_message(&self) -> String; + fn from_message>(msg: T) -> Self; +} + +/// ๐ŸŽ ยป supertrait combining all bus object traits + debug + send + sync + 'static +pub trait BusMessage: + ToBusVal + ToBusKey + ToFromBusMessage + Debug + Clone + Send + Sync + PartialEq + 'static +{ +} + +/// ๐ŸŽ ยป trait for bus **Publisher**s +#[async_trait] +pub trait PublisherTrait { + /// ๐ŸŽ ยป publish a message to the bus + async fn publish(&mut self, value: RM) -> Result<()>; +} + +/// ๐ŸŽ ยป trait for bus **Publisher**s +#[async_trait] +pub trait SubscriberTrait { + /// ๐ŸŽ ยป **stream** + /// + /// returns an `Observable` stream of messages from the redis bus + async fn stream(&mut self) -> Result>; +} diff --git a/lib/rustlers/bus/mod.rs b/lib/rustlers/bus/redis/mod.rs similarity index 61% rename from lib/rustlers/bus/mod.rs rename to lib/rustlers/bus/redis/mod.rs index dfbc298..e7274f8 100644 --- a/lib/rustlers/bus/mod.rs +++ b/lib/rustlers/bus/redis/mod.rs @@ -1,4 +1,4 @@ -use {eyre::Result, redis::Client, std::fmt::Debug}; +use {super::BusMessage, eyre::Result, redis::Client}; pub mod publish; pub mod subscribe; @@ -58,28 +58,6 @@ impl RedisClient for String { } } -/// ๐ŸŽ ยป represents a value that can be serialized to a redis value -pub trait ToRedisVal { - fn to_redis_val(&self) -> Vec<(String, String)>; -} - -/// ๐ŸŽ ยป represents a value that can be serialized to a redis key -pub trait ToRedisKey { - fn to_redis_key(&self) -> String; -} - -/// ๐ŸŽ ยป represents a value that can be serialized to and from a redis message -pub trait ToFromRedisMessage { - fn as_message(&self) -> String; - fn from_message>(msg: T) -> Self; -} - -/// ๐ŸŽ ยป supertrait combining all redis object traits + debug + send + sync + 'static -pub trait RedisMessage: - ToRedisVal + ToRedisKey + ToFromRedisMessage + Debug + Clone + Send + Sync + PartialEq + 'static -{ -} - /// ๐ŸŽ ยป **publisher**: create bus publisher /// /// **Arguments** @@ -87,10 +65,10 @@ pub trait RedisMessage: /// /// **Returns** /// - a new `Publisher` instance -pub async fn publisher( +pub async fn publisher( redis: &RC, -) -> Result> { - publish::Publisher::new(redis).await +) -> Result> { + publish::RedisPublisher::new(redis).await } /// ๐ŸŽ ยป **subscriber**: create bus subscriber @@ -100,10 +78,10 @@ pub async fn publisher( /// /// **Returns** /// - a new `Subscriber` instance -pub async fn subscriber( +pub async fn subscriber( redis: &RC, -) -> Result> { - subscribe::Subscriber::new(redis).await +) -> Result> { + subscribe::RedisSubscriber::new(redis).await } /// ๐ŸŽ ยป **pubsub*: create bus publisher and subscriber @@ -113,11 +91,11 @@ pub async fn subscriber( /// /// **Returns** /// - a tuple containing a `Publisher` and a `Subscriber` instance -pub async fn pubsub( +pub async fn pubsub( redis: &T, -) -> Result<(publish::Publisher, subscribe::Subscriber)> { - let publisher = publish::Publisher::new(redis).await?; - let subscriber = subscribe::Subscriber::new(redis).await?; +) -> Result<(publish::RedisPublisher, subscribe::RedisSubscriber)> { + let publisher = publish::RedisPublisher::new(redis).await?; + let subscriber = subscribe::RedisSubscriber::new(redis).await?; Ok((publisher, subscriber)) } diff --git a/lib/rustlers/bus/publish.rs b/lib/rustlers/bus/redis/publish.rs similarity index 65% rename from lib/rustlers/bus/publish.rs rename to lib/rustlers/bus/redis/publish.rs index eaf3419..0ffc4c3 100644 --- a/lib/rustlers/bus/publish.rs +++ b/lib/rustlers/bus/redis/publish.rs @@ -1,20 +1,22 @@ use { - super::{key, PrefixedPubSub, RedisClient, RedisMessage, KEY_PREFIX}, + super::{key, BusMessage, PrefixedPubSub, RedisClient, KEY_PREFIX}, + crate::rustlers::bus::PublisherTrait, eyre::Result, redis::{aio::MultiplexedConnection, AsyncCommands}, + tonic::async_trait, }; /// ๐ŸŽ ยป bus **Publisher** /// /// allows to push a message or resource to the bus #[derive(Clone)] -pub struct Publisher { +pub struct RedisPublisher { conn: MultiplexedConnection, key_prefix: String, resource_type: std::marker::PhantomData, } -impl PrefixedPubSub for Publisher { +impl PrefixedPubSub for RedisPublisher { fn get_prefix(&self) -> String { self.key_prefix.clone() } @@ -25,7 +27,7 @@ impl PrefixedPubSub for Publisher { } } -impl Publisher { +impl RedisPublisher { /// ๐ŸŽ ยป create a new bus publisher pub async fn new(redis: &RC) -> Result where @@ -40,12 +42,15 @@ impl Publisher { resource_type: std::marker::PhantomData, }) } +} +#[async_trait] +impl PublisherTrait for RedisPublisher { /// ๐ŸŽ ยป publish a message to the bus - pub async fn publish(&mut self, value: RM) -> Result<()> { - let obj_key = key(self.get_prefix(), value.to_redis_key()); + async fn publish(&mut self, value: RM) -> Result<()> { + let obj_key = key(self.get_prefix(), value.to_bus_key()); // set hash key - self.conn.hset_multiple(&obj_key, value.to_redis_val().as_slice()).await?; + self.conn.hset_multiple(&obj_key, value.to_bus_val().as_slice()).await?; // publish to the appropriate channel self.conn.publish(&obj_key, value.as_message()).await?; diff --git a/lib/rustlers/bus/subscribe.rs b/lib/rustlers/bus/redis/subscribe.rs similarity index 89% rename from lib/rustlers/bus/subscribe.rs rename to lib/rustlers/bus/redis/subscribe.rs index c7c3fec..4239ff8 100644 --- a/lib/rustlers/bus/subscribe.rs +++ b/lib/rustlers/bus/redis/subscribe.rs @@ -1,5 +1,6 @@ use { - super::{key, PrefixedPubSub, RedisClient, RedisMessage, KEY_PREFIX}, + super::{key, PrefixedPubSub, RedisClient, KEY_PREFIX}, + crate::rustlers::bus::{BusMessage, SubscriberTrait}, eyre::Result, futures::StreamExt, lool::{fail, s}, @@ -8,6 +9,7 @@ use { subject::SubjectThreads, subscription::Subscription, }, std::convert::Infallible, + tonic::async_trait, }; // IDEA: create another version using tokio broadcast channels @@ -16,7 +18,7 @@ use { /// ๐ŸŽ ยป bus **Subscriber** /// /// allows to subscribe to a redis key pattern and receive messages from the redis bus -pub struct Subscriber { +pub struct RedisSubscriber { // TODO: replace with storing tokio multiplexed connection like in publish.rs when redis@0.26.0 // is released see https://github.com/redis-rs/redis-rs/issues/1137. // this way we can just clone the connection when needing instead of storing the @@ -27,7 +29,7 @@ pub struct Subscriber { pattern: String, } -impl PrefixedPubSub for Subscriber { +impl PrefixedPubSub for RedisSubscriber { fn get_prefix(&self) -> String { self.key_prefix.clone() } @@ -38,7 +40,7 @@ impl PrefixedPubSub for Subscriber { } } -impl Subscriber { +impl RedisSubscriber { /// ๐ŸŽ ยป create a new bus subscriber pub async fn new(redis: &RC) -> Result where @@ -63,20 +65,6 @@ impl Subscriber { key(self.get_prefix(), self.pattern.clone()) } - /// ๐ŸŽ ยป **stream** - /// - /// returns an `Observable` stream of messages from the redis bus - pub async fn stream(&mut self) -> Result> { - if self.subject.is_none() { - self.start_streaming().await?; - } - - match self.subject.as_ref() { - Some(subject) => Ok(subject.clone().box_it()), - None => fail!("Could not start streaming messages from redis bus"), - } - } - /// subscribe to the redis feed async fn start_streaming(&mut self) -> Result<()> { if self.subject.is_none() { @@ -118,3 +106,20 @@ impl Subscriber { Ok(()) } } + +#[async_trait] +impl SubscriberTrait for RedisSubscriber { + /// ๐ŸŽ ยป **stream** + /// + /// returns an `Observable` stream of messages from the redis bus + async fn stream(&mut self) -> Result> { + if self.subject.is_none() { + self.start_streaming().await?; + } + + match self.subject.as_ref() { + Some(subject) => Ok(subject.clone().box_it()), + None => fail!("Could not start streaming messages from redis bus"), + } + } +} diff --git a/lib/rustlers/rustler.rs b/lib/rustlers/rustler.rs index c7bfef5..22c1fbe 100644 --- a/lib/rustlers/rustler.rs +++ b/lib/rustlers/rustler.rs @@ -2,7 +2,10 @@ pub extern crate chrono; pub extern crate eyre; use { - super::bus::{RedisMessage, ToFromRedisMessage, ToRedisKey, ToRedisVal}, + super::{ + bus::{BusMessage, ToBusKey, ToBusVal, ToFromBusMessage}, + svc::RustlerMsg, + }, crate::entities::{market, ticker}, async_trait::async_trait, chrono::{DateTime, Local}, @@ -12,6 +15,7 @@ use { collections::HashMap, fmt::{self, Display, Formatter}, }, + tokio::sync::mpsc::Sender, }; /// ๐ŸŽ ยป a struct representing the status of a rustler at a given time @@ -75,8 +79,8 @@ impl Display for Quote { } } -impl ToRedisVal for Quote { - fn to_redis_val(&self) -> Vec<(String, String)> { +impl ToBusVal for Quote { + fn to_bus_val(&self) -> Vec<(String, String)> { let market_hours_u8: u8 = self.market_hours.clone().into(); vec![ @@ -90,13 +94,13 @@ impl ToRedisVal for Quote { } } -impl ToRedisKey for Quote { - fn to_redis_key(&self) -> String { +impl ToBusKey for Quote { + fn to_bus_key(&self) -> String { format!("quote:{}:{}", self.market, self.id) } } -impl ToFromRedisMessage for Quote { +impl ToFromBusMessage for Quote { /// ๐ŸŽ ยป converts a `Quote` to a serialized message that can be sent over a redis channel /// /// the message is in the format `idยฆmarketยฆpriceยฆchange_percentยฆtimeยฆmarket_hours` @@ -158,7 +162,7 @@ impl PartialEq for Quote { } } -impl RedisMessage for Quote {} +impl BusMessage for Quote {} #[derive(Debug, Clone)] pub struct RustlerOpts { @@ -175,13 +179,6 @@ impl Default for RustlerOpts { } } -#[derive(Debug, Clone, Default)] -pub struct ScrapperCallbacks { - pub on_connected: Option Result<()>>, - pub on_disconnected: Option Result<()>>, - pub on_message: Option Result<()>>, -} - /// ๐ŸŽ ยป a scruct representing a ticker /// /// in `rustler` a ticker is the union between a symbol (stock identifier) and its market @@ -246,8 +243,9 @@ pub trait RustlerAccessor { fn tickers_mut(&mut self) -> &mut HashMap; fn set_tickers(&mut self, tickers: HashMap); - fn callbacks(&self) -> &Option; - fn set_callbacks(&mut self, callbacks: Option); + fn msg_sender(&self) -> &Option>; + fn msg_sender_mut(&mut self) -> &mut Option>; + fn set_msg_sender(&mut self, sender: Option>); // #endregion } @@ -281,20 +279,20 @@ pub trait Rustler: RustlerAccessor + Send + Sync { RustlerStatus::Disconnected => { self.set_last_stop(Some(Local::now())); - if let Some(callbacks) = self.callbacks() { - if let Some(on_disconnected) = callbacks.on_disconnected { - on_disconnected()?; - } - } + // if let Some(callbacks) = self.callbacks() { + // if let Some(on_disconnected) = callbacks.on_disconnected { + // on_disconnected()?; + // } + // } } RustlerStatus::Connected => { self.set_last_run(Some(Local::now())); - if let Some(callbacks) = self.callbacks() { - if let Some(on_connected) = callbacks.on_connected { - on_connected()?; - } - } + // if let Some(callbacks) = self.callbacks() { + // if let Some(on_connected) = callbacks.on_connected { + // on_connected()?; + // } + // } } _ => {} }; @@ -343,17 +341,6 @@ pub trait Rustler: RustlerAccessor + Send + Sync { self.on_delete(new_tickers)?; Ok(()) } - - /// registers a new quote by passing it to the on_message callback - fn register_quote(&self, quote: Quote) -> Result<()> { - if let Some(callbacks) = self.callbacks() { - if let Some(on_message) = callbacks.on_message { - on_message(quote)?; - } - } - - Ok(()) - } } /// macro that expands to the accessor functions for a `Rustler` struct @@ -458,11 +445,21 @@ macro_rules! rustler_accessors { fn set_tickers(&mut self, tickers: HashMap) { self.tickers = tickers; } - fn callbacks(&self) -> &Option<$crate::rustlers::ScrapperCallbacks> { - &self.callbacks + fn msg_sender( + &self, + ) -> &Option> { + &self.msg_sender } - fn set_callbacks(&mut self, callbacks: Option<$crate::rustlers::ScrapperCallbacks>) { - self.callbacks = callbacks; + fn msg_sender_mut( + &mut self, + ) -> &mut Option> { + &mut self.msg_sender + } + fn set_msg_sender( + &mut self, + sender: Option>, + ) { + self.msg_sender = sender; } }; } @@ -491,7 +488,7 @@ macro_rules! rustler { last_update: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>, opts: $crate::rustlers::RustlerOpts, tickers: HashMap, - callbacks: Option<$crate::rustlers::ScrapperCallbacks>, + msg_sender: Option>, $($fields)* } diff --git a/lib/rustlers/svc.rs b/lib/rustlers/svc.rs index 921eb76..e025ca6 100644 --- a/lib/rustlers/svc.rs +++ b/lib/rustlers/svc.rs @@ -2,8 +2,12 @@ use { super::{ rustler::{Rustler, Ticker}, rustlerjar::RustlerJar, + MarketHourType, + }, + crate::{ + entities::{market, sea_orm::DatabaseConnection, ticker}, + rustlers::{bus::PublisherTrait, Quote}, }, - crate::entities::{market, sea_orm::DatabaseConnection, ticker}, eyre::Result, lool::{ logger::{info, warn}, @@ -13,36 +17,54 @@ use { }, }, std::sync::Arc, - tokio::sync::Mutex, + tokio::{ + spawn, + sync::{mpsc::Sender, Mutex}, + }, }; -// interface MarketExecData { -// entity: MarketModel; -// startJob?: Job; -// stopJob?: Job; -// } +/// **๐ŸŽ ยป Rustler Message** +pub enum RustlerMsg { + QuoteMsg(Quote), +} -// interface ScrapperData { -// markets?: MarketExecData[]; -// // subscription?: Subscription -// }]] - -// struct MarketExecData { -// entity: MarketModel, -// startJob?: Job, -// stopJob?: Job, -// } +/// **๐ŸŽ ยป create a quote message** +#[inline] +pub fn quote( + id: String, + market: String, + price: f64, + change_percent: f64, + time: i64, + market_hours: MarketHourType, +) -> RustlerMsg { + RustlerMsg::QuoteMsg(Quote { + id, + market, + price, + change_percent, + time, + market_hours, + }) +} /// **๐ŸŽ ยป Rustlers Service** /// /// The `RustlersSvc` is a service that manages the rustlers and orchestrates their executions. -pub struct RustlersSvc { +pub struct RustlersSvc

+where + P: PublisherTrait + Send + Sync + 'static + Clone, +{ market_svc: market::Service, sched: Scheduler, rustlers: RustlerJar, + publisher: P, } -impl RustlersSvc { +impl RustlersSvc +where + Publisher: PublisherTrait + Send + Sync + 'static + Clone, +{ /// **๐ŸŽ ยป create service** /// /// creates a new instance of the `RustlersSvc` @@ -53,7 +75,7 @@ impl RustlersSvc { /// /// **Returns** /// the created `RustlersSvc` instance - pub async fn new(conn: DatabaseConnection, rustlers: RustlerJar) -> Self { + pub async fn new(conn: DatabaseConnection, rustlers: RustlerJar, publisher: Publisher) -> Self { let market_svc = market::Service::new(conn).await; let sched = Scheduler::new(); @@ -61,6 +83,7 @@ impl RustlersSvc { market_svc, rustlers, sched, + publisher, } } @@ -72,8 +95,26 @@ impl RustlersSvc { info!("Starting rustlers"); let markets = self.market_svc.get_all_with_tickers().await?; - for (market, tickers) in markets { - self.schedule_rustler_for((market, tickers)).await?; + if markets.len() > 0 { + let (sender, mut receiver) = tokio::sync::mpsc::channel(100); + + for (market, tickers) in markets { + self.schedule_rustler_for((market, tickers), sender.clone()).await?; + } + + let mut publisher = self.publisher.clone(); + spawn(async move { + while let Some(msg) = receiver.recv().await { + match msg { + RustlerMsg::QuoteMsg(quote) => publisher.publish(quote).await?, + } + } + + info!("Rustler message receiver stopped"); + Ok::<(), eyre::Report>(()) + }); + } else { + warn!("No markets found with tickers"); } Ok(()) @@ -101,6 +142,7 @@ impl RustlersSvc { async fn schedule_rustler_for( &mut self, market: (market::Model, Vec), + sender: Sender, ) -> Result<()> { let (market, tickers) = market; let tickers: Vec = tickers.into_iter().map(|t| Ticker::from(&t, &market)).collect(); @@ -109,6 +151,12 @@ impl RustlersSvc { let rustler = self.rustlers.get(&market); if let Some(rustler) = rustler { + { + let mut rustler = rustler.lock().await; + info!("Setting message sender for rustler '{}'", rustler.name()); + rustler.set_msg_sender(Some(sender)) + } + if let Some((start, stop)) = rules { let start_name = format!("start-rustler-{}", market.short_name); let end_name = format!("end-rustler-{}", market.short_name); diff --git a/๐ŸŽ rustler-core.code-workspace b/๐ŸŽ rustler-core.code-workspace index 2c07b60..0726f56 100644 --- a/๐ŸŽ rustler-core.code-workspace +++ b/๐ŸŽ rustler-core.code-workspace @@ -7,15 +7,26 @@ "settings": { "terminal.integrated.env.windows": { // allows to run the command in the terminal for development purposes - "PATH": "${workspaceFolder}/target/debug;${env:PATH}", + "PATH": "${workspaceFolder}/target/debug;${env:PATH}" }, "lucodear-icons.activeIconPack": "rust_ferris", "lucodear-icons.folders.associations": { - ".cargo": "rust", - }, - "lucodear-icons.files.associations": { - + ".cargo": "rust" }, + "lucodear-icons.folders.customClones": [ + { + "name": "redis", + "base": "database", + "color": "deep-orange-400", + "folderNames": ["redis"] + }, + { + "name": "bus", + "base": "pipe", + "color": "blue-500", + "folderNames": ["bus"] + } + ], "rust-analyzer.checkOnSave": true } }