From cbf6ba1f5aaf54c52f8d1d0343348349dee0d61e Mon Sep 17 00:00:00 2001 From: Lucas Colombo Date: Mon, 22 Apr 2024 05:18:03 -0300 Subject: [PATCH] =?UTF-8?q?wip:=20=F0=9F=9A=A7=20rustlers=20core=20further?= =?UTF-8?q?=20development?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/main.rs | 9 +- entities/src/lib.rs | 19 ++-- entities/src/orm/ticker.rs | 2 +- entities/src/services/market.rs | 2 +- entities/src/services/ticker.rs | 2 +- grpc/src/lib.rs | 7 +- grpc/src/server.rs | 2 +- grpc/src/services/market.rs | 2 +- grpc/src/services/ticker.rs | 2 +- rustlers/src/lib.rs | 5 +- rustlers/src/rustlerjar.rs | 34 ++++--- rustlers/src/rustlers.rs | 81 +++++++++++----- rustlers/src/rustlers/binance.rs | 10 +- rustlers/src/svc.rs | 158 +++++++++++++++++++------------ 14 files changed, 206 insertions(+), 129 deletions(-) diff --git a/app/main.rs b/app/main.rs index 4507978..7642020 100644 --- a/app/main.rs +++ b/app/main.rs @@ -1,7 +1,8 @@ use { dotenvy::dotenv, eyre::Result, - lool::logger::{info, ConsoleLogger, Level}, tokio::{join, select}, + lool::logger::{info, ConsoleLogger, Level}, + tokio::join, }; // TODO: here we will trigger the start of both the grpc server and the websocket gateway @@ -13,11 +14,11 @@ async fn main() -> Result<()> { dotenv()?; let conn = entities::db::get_connection().await?; - let mut rustler = rustlers::svc::RustlersSvc::new(conn.clone()).await; + let mut rustler = rustlers::svc::RustlersSvc::new(conn.clone(), None).await; - let (grpc_res, rustlers_res) = join! { + let (_grpc_res, _rustlers_res) = join! { grpc::server::start(conn.clone()), - rustler.start(), + rustler.start(), }; info!("Shutting down"); diff --git a/entities/src/lib.rs b/entities/src/lib.rs index 1d9a0a1..d5ea9ea 100644 --- a/entities/src/lib.rs +++ b/entities/src/lib.rs @@ -1,25 +1,27 @@ pub use sea_orm; mod orm { - #[path = "market.rs"] pub mod market; - #[path = "ticker.rs"] pub mod ticker; + #[path = "market.rs"] + pub mod market; + #[path = "ticker.rs"] + pub mod ticker; } mod services { - #[path = "market.rs"] pub mod market; - #[path = "ticker.rs"] pub mod ticker; + #[path = "market.rs"] + pub mod market; + #[path = "ticker.rs"] + pub mod ticker; } /// market entities and services pub mod market { - pub use super::orm::market::*; - pub use super::services::market::*; + pub use super::{orm::market::*, services::market::*}; } /// ticker entities and services pub mod ticker { - pub use super::orm::ticker::*; - pub use super::services::ticker::*; + pub use super::{orm::ticker::*, services::ticker::*}; } /// database connection stuff @@ -30,7 +32,6 @@ pub mod db { sea_orm::{ ConnectOptions, ConnectionTrait, Database, DatabaseConnection, DbBackend, Statement, }, - std::sync::Arc, }; const RUSTLER_DATABASE: &str = "RUSTLER_DATABASE"; diff --git a/entities/src/orm/ticker.rs b/entities/src/orm/ticker.rs index 676ad4e..2e3e94c 100644 --- a/entities/src/orm/ticker.rs +++ b/entities/src/orm/ticker.rs @@ -29,4 +29,4 @@ impl Related for Entity { } } -impl ActiveModelBehavior for ActiveModel {} \ No newline at end of file +impl ActiveModelBehavior for ActiveModel {} diff --git a/entities/src/services/market.rs b/entities/src/services/market.rs index a04a8d3..72177b5 100644 --- a/entities/src/services/market.rs +++ b/entities/src/services/market.rs @@ -4,7 +4,7 @@ use { ticker::{Entity as Ticker, Model as TickerModel}, }, eyre::Result, - sea_orm::{DatabaseConnection, DbErr, EntityTrait, IntoActiveModel} + sea_orm::{DatabaseConnection, DbErr, EntityTrait, IntoActiveModel}, }; pub struct Service { diff --git a/entities/src/services/ticker.rs b/entities/src/services/ticker.rs index bdf41d7..77c5494 100644 --- a/entities/src/services/ticker.rs +++ b/entities/src/services/ticker.rs @@ -4,7 +4,7 @@ use { ticker::{Entity as Ticker, Model as TickerModel}, }, eyre::Result, - sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, IntoActiveModel, QueryFilter} + sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, IntoActiveModel, QueryFilter}, }; pub struct Service { diff --git a/grpc/src/lib.rs b/grpc/src/lib.rs index 193891e..6d1d02f 100644 --- a/grpc/src/lib.rs +++ b/grpc/src/lib.rs @@ -1,5 +1,8 @@ mod services { - use entities::sea_orm::{DbErr, SqlErr}; + use { + entities::sea_orm::{DbErr, SqlErr}, + lool::s, + }; pub mod market; pub mod ticker; @@ -12,7 +15,7 @@ mod services { tonic::Status::already_exists(format!("{} already exists", entity_name)) } Some(SqlErr::ForeignKeyConstraintViolation(_)) => { - tonic::Status::failed_precondition(format!("Related entity does not exist")) + tonic::Status::failed_precondition(s!("Related entity does not exist")) } _ => tonic::Status::internal(format!("Error {} {}", action, entity_name)), } diff --git a/grpc/src/server.rs b/grpc/src/server.rs index 1ddb0b1..9cb75c5 100644 --- a/grpc/src/server.rs +++ b/grpc/src/server.rs @@ -3,7 +3,7 @@ use { entities::{market, sea_orm::DatabaseConnection, ticker}, eyre::Result, lool::{cli::stylize::Stylize, logger::info}, - std::{net::SocketAddr, sync::Arc}, + std::net::SocketAddr, tonic::transport::Server, }; diff --git a/grpc/src/services/market.rs b/grpc/src/services/market.rs index dcd4d84..74068ca 100644 --- a/grpc/src/services/market.rs +++ b/grpc/src/services/market.rs @@ -73,7 +73,7 @@ impl MarketApi for GrpcServer { let response = match result { Ok(mkts) => Ok(Response::new(Markets { - markets: mkts.to_owned().into_iter().map(Market::from_model).collect(), + markets: mkts.iter().cloned().map(Market::from_model).collect(), })), Err(err) => Err(handle_sql_err(err, "Getting", "markets")), }; diff --git a/grpc/src/services/ticker.rs b/grpc/src/services/ticker.rs index 98084fa..cca1fb4 100644 --- a/grpc/src/services/ticker.rs +++ b/grpc/src/services/ticker.rs @@ -79,7 +79,7 @@ impl TickerApi for GrpcServer { let response = match result { Ok(mkts) => Ok(Response::new(Tickers { - tickers: mkts.to_owned().into_iter().map(Ticker::from_model).collect(), + tickers: mkts.iter().cloned().map(Ticker::from_model).collect(), })), Err(err) => Err(handle_sql_err(err, "Getting", "tickers")), }; diff --git a/rustlers/src/lib.rs b/rustlers/src/lib.rs index 0b463dc..cc08e09 100644 --- a/rustlers/src/lib.rs +++ b/rustlers/src/lib.rs @@ -1,6 +1,3 @@ -pub mod rustlers; pub mod rustlerjar; +pub mod rustlers; pub mod svc; - -use entities::{market, sea_orm::DatabaseConnection, ticker}; - diff --git a/rustlers/src/rustlerjar.rs b/rustlers/src/rustlerjar.rs index 1a17438..62593df 100644 --- a/rustlers/src/rustlerjar.rs +++ b/rustlers/src/rustlerjar.rs @@ -1,12 +1,15 @@ -use entities::market; -use std::collections::HashMap; -use crate::rustlers::Rustler; +use { + crate::rustlers::Rustler, + entities::market, + std::{collections::HashMap, sync::Arc}, + tokio::sync::Mutex, +}; /// **🤠 » rustlerjar! macro** -/// +/// /// A macro to create a `RustlerJar` with multiple Rustler instances and their corresponding /// mappings. -/// +/// /// **Usage** /// /// ```rust @@ -34,47 +37,52 @@ macro_rules! rustlerjar { } /// **🤠 » RustlerJar** -/// +/// /// A `RustlerJar` is a collection of Rustlers and their corresponding mappings to the markets. /// Which indicates which Rustler should be used for a given market. Rustlers are stored as /// instances of `Box`, and the mappings are stored as a `HashMap` ( /// where the key is the market short name and the value is the Rustler name). -/// +/// /// **Usage** -/// +/// /// The easiest way to create a `RustlerJar` is by using the `rustlerjar!` macro. /// ```rust /// let rustler_jar = rustlerjar! { /// "NYSE", "NASDAQ" => FooRustler, /// "BINANCE" => BarRustler, /// }; -/// +/// /// let rustler = rustler_jar.get(&market); /// ``` pub struct RustlerJar { - rustlers: HashMap>, + rustlers: HashMap>>>, mappings: HashMap, } impl RustlerJar { /// create a new `RustlerJar` with the given Rustlers and mappings. - /// + /// /// **☢️ warn**: using the `rustlerjar!` macro is recommended pub fn new(rustlers_list: Vec>, mappings: HashMap) -> Self { let mut rustlers = HashMap::new(); for rustler in rustlers_list { - rustlers.insert(rustler.name(), rustler); + rustlers.insert(rustler.name(), Arc::new(Mutex::new(rustler))); } Self { rustlers, mappings } } /// get the Rustler for the given market - pub fn get(&self, market: &market::Model) -> Option<&Box> { + pub fn get(&self, market: &market::Model) -> Option<&Arc>>> { let key = self.get_key(market); self.rustlers.get(key) } + pub fn get_mut(&mut self, market: &market::Model) -> Option<&mut Arc>>> { + let key = self.get_key(market).to_owned(); + self.rustlers.get_mut(&key) + } + /// get the key from the mappings for the given market fn get_key(&self, market: &market::Model) -> &str { self.mappings.get(&market.short_name).unwrap() diff --git a/rustlers/src/rustlers.rs b/rustlers/src/rustlers.rs index 2599d1e..44b0dce 100644 --- a/rustlers/src/rustlers.rs +++ b/rustlers/src/rustlers.rs @@ -1,16 +1,15 @@ pub mod binance; +pub extern crate chrono; +pub extern crate eyre; use { async_trait::async_trait, chrono::{DateTime, Local}, entities::{market, ticker}, - std::collections::HashMap, eyre::Result, + std::collections::HashMap, }; -pub extern crate eyre; -pub extern crate chrono; - #[derive(Debug, Clone, PartialEq, Eq, Default)] pub enum RustlerStatus { Connecting, @@ -73,6 +72,14 @@ impl Ticker { market: m.short_name.clone(), } } + + pub fn many_from(tickers: &[ticker::Model], market: &market::Model) -> Vec { + tickers.iter().map(|t| Self::from(t, market)).collect() + } + + pub fn key(&self) -> String { + format!("{}:{}", self.market, self.symbol) + } } pub trait RustlerAccessor { @@ -117,9 +124,9 @@ pub trait RustlerAccessor { pub trait Rustler: RustlerAccessor + Send + Sync { // #region Unimplemented trait functions /// fn called after tickers are added to the rustler - fn on_add(&mut self, tickers: Vec) -> Result<()>; + fn on_add(&mut self, tickers: &[Ticker]) -> Result<()>; /// fn called after tickers are deleted from the rustler - fn on_delete(&mut self, tickers: Vec) -> Result<()>; + fn on_delete(&mut self, tickers: &[Ticker]) -> Result<()>; /// connects the rustler to the data source async fn connect(&mut self) -> Result<()>; /// disconnects the rustler from the data source @@ -165,16 +172,16 @@ pub trait Rustler: RustlerAccessor + Send + Sync { } /// adds new tickers to the rustler - async fn add(&mut self, new_tickers: Vec) -> Result<()> { + async fn add(&mut self, new_tickers: &Vec) -> Result<()> { let tickers = self.tickers_mut(); - for new_ticker in &new_tickers { + for new_ticker in new_tickers { // if the ticker already exists in the tickers map, skip it - if tickers.contains_key(&new_ticker.symbol) { + if tickers.contains_key(&new_ticker.key()) { continue; } - tickers.insert(new_ticker.symbol.clone(), new_ticker.clone()); + tickers.insert(new_ticker.key(), new_ticker.clone()); } if self.opts().connect_on_add { @@ -189,11 +196,11 @@ pub trait Rustler: RustlerAccessor + Send + Sync { } /// deletes tickers from the rustler - async fn delete(&mut self, new_tickers: Vec) -> Result<()> { + async fn delete(&mut self, new_tickers: &Vec) -> Result<()> { let tickers = self.tickers_mut(); - for new_ticker in &new_tickers { - tickers.remove(&new_ticker.symbol); + for new_ticker in new_tickers { + tickers.remove(&new_ticker.key()); } // if after deleting the tickers the tickers map is @@ -232,7 +239,10 @@ macro_rules! rustler_accessors { fn status(&self) -> &$crate::rustlers::RustlerStatus { &self.status } - fn set_status(&mut self, status: $crate::rustlers::RustlerStatus) -> $crate::rustlers::eyre::Result<()> { + fn set_status( + &mut self, + status: $crate::rustlers::RustlerStatus, + ) -> $crate::rustlers::eyre::Result<()> { self.status = status; self.handle_status_change()?; @@ -247,31 +257,56 @@ macro_rules! rustler_accessors { fn next_run(&self) -> &$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local> { &self.next_run } - fn set_next_run(&mut self, next_run: $crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>) { + fn set_next_run( + &mut self, + next_run: $crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>, + ) { self.next_run = next_run; } - fn next_stop(&self) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> { + fn next_stop( + &self, + ) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> { &self.next_stop } - fn set_next_stop(&mut self, next_stop: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>) { + fn set_next_stop( + &mut self, + next_stop: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>, + ) { self.next_stop = next_stop; } - fn last_run(&self) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> { + fn last_run( + &self, + ) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> { &self.last_run } - fn set_last_run(&mut self, last_run: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>) { + fn set_last_run( + &mut self, + last_run: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>, + ) { self.last_run = last_run; } - fn last_stop(&self) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> { + fn last_stop( + &self, + ) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> { &self.last_stop } - fn set_last_stop(&mut self, last_stop: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>) { + fn set_last_stop( + &mut self, + last_stop: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>, + ) { self.last_stop = last_stop; } - fn last_update(&self) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> { + fn last_update( + &self, + ) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> { &self.last_update } - fn set_last_update(&mut self, last_update: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>) { + fn set_last_update( + &mut self, + last_update: Option< + $crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>, + >, + ) { self.last_update = last_update; } fn opts(&self) -> &$crate::rustlers::RustlerOpts { diff --git a/rustlers/src/rustlers/binance.rs b/rustlers/src/rustlers/binance.rs index 28c15f2..eba4938 100644 --- a/rustlers/src/rustlers/binance.rs +++ b/rustlers/src/rustlers/binance.rs @@ -1,9 +1,7 @@ use { crate::{ rustler, - rustlers::{ - Rustler, RustlerAccessor, RustlerStatus, Ticker - }, + rustlers::{Rustler, RustlerAccessor, RustlerStatus, Ticker}, }, async_trait::async_trait, eyre::Result, @@ -23,7 +21,7 @@ rustler!( ); impl BinanceRustler { - pub fn new() -> impl Rustler { + pub fn create() -> impl Rustler { Self::default() } } @@ -51,13 +49,13 @@ impl Rustler for BinanceRustler { Ok(()) } - fn on_add(&mut self, tickers: Vec) -> Result<()> { + fn on_add(&mut self, tickers: &[Ticker]) -> Result<()> { info!("Adding tickers: {:?}", tickers); Ok(()) } - fn on_delete(&mut self, tickers: Vec) -> Result<()> { + fn on_delete(&mut self, tickers: &[Ticker]) -> Result<()> { info!("Deleting tickers: {:?}", tickers); Ok(()) diff --git a/rustlers/src/svc.rs b/rustlers/src/svc.rs index b2639ab..427642a 100644 --- a/rustlers/src/svc.rs +++ b/rustlers/src/svc.rs @@ -1,16 +1,21 @@ -use entities::{market, sea_orm::DatabaseConnection, ticker}; -use eyre::{Ok, Result}; -use lool::{ - logger::{info, warn}, - s, - sched::{ - recur, ruleset, scheduler::tokio::Scheduler, utils::parse_time, RecurrenceRuleSet, - SchedulingRule, +use { + entities::{market, sea_orm::DatabaseConnection, ticker}, + eyre::Result, + lool::{ + logger::{info, warn}, + sched::{ + recur, ruleset, scheduler::tokio::Scheduler, utils::parse_time, RecurrenceRuleSet, + SchedulingRule, + }, }, + std::sync::Arc, + tokio::sync::Mutex, }; -use std::collections::HashMap; -use crate::rustlers::{Rustler, Ticker}; +use crate::{ + rustlerjar::RustlerJar, + rustlers::{Rustler, Ticker}, +}; // interface MarketExecData { // entity: MarketModel; @@ -23,27 +28,28 @@ use crate::rustlers::{Rustler, Ticker}; // // subscription?: Subscription // }]] -struct MarketExecData { - // entity: MarketModel, - // startJob?: Job, - // stopJob?: Job, -} - -type RustlerFactory = Box Option>>; +// struct MarketExecData { +// entity: MarketModel, +// startJob?: Job, +// stopJob?: Job, +// } pub struct RustlersSvc { market_svc: market::Service, - factory: RustlerFactory, + sched: Scheduler, + rustlers: RustlerJar, } impl RustlersSvc { - pub async fn new(conn: DatabaseConnection) -> Self { + /// creates a new instance of the `RustlersSvc` + pub async fn new(conn: DatabaseConnection, rustlers: Option) -> Self { let market_svc = market::Service::new(conn).await; - let factory = |_mkt: &market::Model| None; + let sched = Scheduler::new(); Self { market_svc, - factory: Box::new(factory), + rustlers: rustlers.unwrap(), + sched, } } @@ -54,7 +60,7 @@ impl RustlersSvc { let markets = self.market_svc.get_all_with_tickers().await?; for (market, tickers) in markets { - self.start_rustler_for((market, tickers)).await?; + self.schedule_rustler_for((market, tickers)).await?; } Ok(()) @@ -65,29 +71,45 @@ impl RustlersSvc { todo!() } - /// gets the corresponding rustler for the given market and starts it + /// gets the right rustler for the given market and starts it /// /// depending on the market configuraation, the rustler might be started /// immediately or its start might be scheduled for a later time /// /// this function also schedules the stop of the rustler at the end of the market /// trading hours if the market is configured to stop at a specific time - async fn start_rustler_for( + async fn schedule_rustler_for( &mut self, market: (market::Model, Vec), ) -> Result<()> { let (market, tickers) = market; + let tickers: Vec = tickers.into_iter().map(|t| Ticker::from(&t, &market)).collect(); + let rules = self.get_schedule_rules_for(&market)?; - let rustler = (self.factory)(&market); + let rustler = self.rustlers.get(&market); if let Some(rustler) = rustler { if let Some((start, end)) = rules { - let mut sched = Scheduler::new(); - let start_name = format!("start-{}", market.short_name); - let end_name = format!("end-{}", market.short_name); + let start_name = format!("start-rustler-{}", market.short_name); + let end_name = format!("end-rustler-{}", market.short_name); - let start_job = sched.schedule(start_name, || async move {}, start).await; - let end_job = sched.schedule(end_name, || async move {}, end).await; + let _start_job = self + .sched + .schedule_fut( + start_name.to_owned(), + Self::start_rustler_for(rustler.clone(), tickers.clone()), + start, + ) + .await; + + let _end_job = self + .sched + .schedule_fut( + end_name.to_owned(), + Self::stop_rustler_for(rustler.clone(), tickers), + end, + ) + .await; Ok(()) } else { @@ -123,44 +145,56 @@ impl RustlersSvc { Ok(Some((recur(&start_rule), recur(&stop_rule)))) } - /// get the rustler according to the market - fn get_rustler_for(&mut self, market: &market::Model) -> Option> { - let scrapper = (self.factory)(market); - - - scrapper - } - /// starts a rustler by adding the tickers to it - async fn start_rustler( - rustler: &mut Box, - market: market::Model, - tickers: Vec, - ) -> Result<()> { - if tickers.len() > 0 { - let tickers: Vec = - tickers.into_iter().map(|t| Ticker::from(&t, &market)).collect(); + async fn start_rustler_for(rustler: Arc>>, tickers: Vec) { + let mut rustler = rustler.lock().await; + match rustler.start().await { + Ok(()) => { + if !tickers.is_empty() { + info!("Rustler {} started for market", rustler.name()); - rustler.add(tickers).await?; - } - - Ok(()) + match rustler.add(&tickers).await { + Ok(()) => info!( + "Tickers {:?} added to rustler '{}'", + tickers, + rustler.name() + ), + Err(e) => warn!( + "Failed to add tickers to rustler '{}': {}", + rustler.name(), + e + ), + } + } + } + Err(e) => warn!("Failed to start rustler '{}': {}", rustler.name(), e), + }; } - /// stops a rustler by deleting all its tickers - async fn stop_rustler( - rustler: &mut Box, - market: market::Model, - tickers: Vec, - ) -> Result<()> { - if tickers.len() > 0 { - let tickers: Vec = - tickers.into_iter().map(|t| Ticker::from(&t, &market)).collect(); + /// stops a rustler for the given market/tickers + /// + /// if the rustler is being used by other markets, or the ticker list does not contain + /// all the tickers that the rustler is using for the given market, the rustler will not + /// be stopped, but will stop gathering data for the given tickers. + async fn stop_rustler_for(rustler: Arc>>, tickers: Vec) { + let mut rustler = rustler.lock().await; - rustler.delete(tickers).await?; + if !tickers.is_empty() { + // we delete the tickers from the rustler, but it will still be running if + // there are other markets using the same rustler. + match rustler.delete(&tickers).await { + Ok(()) => info!( + "Tickers {:?} removed from rustler '{}'", + tickers, + rustler.name() + ), + Err(e) => warn!( + "Failed to remove tickers from rustler '{}': {}", + rustler.name(), + e + ), + } } - - Ok(()) } }