From 6871e656f5c40f27ded7f2a5954fd4fa666684db Mon Sep 17 00:00:00 2001 From: Lucas Colombo Date: Tue, 23 Apr 2024 06:36:28 -0300 Subject: [PATCH] =?UTF-8?q?feat:=20=E2=9C=A8=20start=20rustler=20on=20star?= =?UTF-8?q?tup=20when=20necessary?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Taskfile.yaml | 2 +- app/main.rs | 9 ++++- rustlers/src/rustlerjar.rs | 6 ++-- rustlers/src/rustlers/binance.rs | 11 ++++++ rustlers/src/svc.rs | 59 +++++++++++++++++++++++++++----- 5 files changed, 75 insertions(+), 12 deletions(-) diff --git a/Taskfile.yaml b/Taskfile.yaml index 4cec3b4..26987e0 100644 --- a/Taskfile.yaml +++ b/Taskfile.yaml @@ -16,7 +16,7 @@ tasks: run:watch: desc: 🚀 watch rustler cmds: - - cargo watch -c -x "run -- --version" + - cargo watch -c -x "run" build:watch: desc: 🚀 watch rustler «build» diff --git a/app/main.rs b/app/main.rs index 7642020..d852476 100644 --- a/app/main.rs +++ b/app/main.rs @@ -2,6 +2,7 @@ use { dotenvy::dotenv, eyre::Result, lool::logger::{info, ConsoleLogger, Level}, + rustlers::{rustlerjar, rustlers::binance::BinanceRustler, svc::RustlersSvc}, tokio::join, }; @@ -14,7 +15,13 @@ async fn main() -> Result<()> { dotenv()?; let conn = entities::db::get_connection().await?; - let mut rustler = rustlers::svc::RustlersSvc::new(conn.clone(), None).await; + let mut rustler = RustlersSvc::new( + conn.clone(), + rustlerjar! { + "BINANCE" => BinanceRustler + }, + ) + .await; let (_grpc_res, _rustlers_res) = join! { grpc::server::start(conn.clone()), diff --git a/rustlers/src/rustlerjar.rs b/rustlers/src/rustlerjar.rs index 62593df..3283c52 100644 --- a/rustlers/src/rustlerjar.rs +++ b/rustlers/src/rustlerjar.rs @@ -21,11 +21,13 @@ use { #[macro_export] macro_rules! rustlerjar { ($($($name:expr),* => $rustler:ident),* $(,)?) => {{ - let mut instances: Vec> = Vec::new(); + use $crate::rustlers::RustlerAccessor; + + let mut instances: Vec> = Vec::new(); let mut mappings = std::collections::HashMap::new(); $( - let instance = Box::new($rustler::new()); + let instance = Box::new($rustler::create()); $( mappings.insert($name.to_string(), instance.name()); )* diff --git a/rustlers/src/rustlers/binance.rs b/rustlers/src/rustlers/binance.rs index eba4938..a34d249 100644 --- a/rustlers/src/rustlers/binance.rs +++ b/rustlers/src/rustlers/binance.rs @@ -40,12 +40,23 @@ impl Rustler for BinanceRustler { BINANCE_WSS_URL.bright_green() ); + self.set_status(RustlerStatus::Connected)?; + Ok(()) } async fn disconnect(&mut self) -> Result<()> { + if self.status == RustlerStatus::Disconnected || self.status == RustlerStatus::Disconnecting + { + return Ok(()); + } + + self.set_status(RustlerStatus::Disconnecting)?; + info!("Disconnecting from Binance WSS"); + self.set_status(RustlerStatus::Disconnected)?; + Ok(()) } diff --git a/rustlers/src/svc.rs b/rustlers/src/svc.rs index 427642a..8b9b7cd 100644 --- a/rustlers/src/svc.rs +++ b/rustlers/src/svc.rs @@ -42,13 +42,13 @@ pub struct RustlersSvc { impl RustlersSvc { /// creates a new instance of the `RustlersSvc` - pub async fn new(conn: DatabaseConnection, rustlers: Option) -> Self { + pub async fn new(conn: DatabaseConnection, rustlers: RustlerJar) -> Self { let market_svc = market::Service::new(conn).await; let sched = Scheduler::new(); Self { market_svc, - rustlers: rustlers.unwrap(), + rustlers, sched, } } @@ -68,6 +68,11 @@ impl RustlersSvc { /// stops all rustlers and then starts them again pub async fn restart(&self) -> Result<()> { + // TODO: restart all rustlers; this method should clear everything we set up about the + // rustlers and then call `start` again. here we will need access to the job handlers + // which we are not storing right now, so we're going to need to store them in the + // `RustlersSvc` struct + todo!() } @@ -89,28 +94,47 @@ impl RustlersSvc { let rustler = self.rustlers.get(&market); if let Some(rustler) = rustler { - if let Some((start, end)) = rules { + if let Some((start, stop)) = rules { let start_name = format!("start-rustler-{}", market.short_name); let end_name = format!("end-rustler-{}", market.short_name); - let _start_job = self + // TODO: we will need to store the job handlers in the `RustlersSvc` struct + // so that we can stop them when we need to restart the rustlers + + let start_job = self .sched .schedule_fut( start_name.to_owned(), Self::start_rustler_for(rustler.clone(), tickers.clone()), - start, + start.clone(), ) .await; - let _end_job = self + let end_job = self .sched .schedule_fut( end_name.to_owned(), - Self::stop_rustler_for(rustler.clone(), tickers), - end, + Self::stop_rustler_for(rustler.clone(), tickers.clone()), + stop.clone(), ) .await; + info!( + "Scheduled next execution for start job {start_name} for market '{}' at {:?}", + market.short_name, + start_job.get_next_run() + ); + info!( + "Scheduled next execution for stop job {end_name} for market '{}' at {:?}", + market.short_name, + end_job.get_next_run() + ); + + if should_be_running_now(start, stop) { + info!("Starting '{start_name}' right away"); + Self::start_rustler_for(rustler.clone(), tickers).await; + } + Ok(()) } else { warn!("No schedule rules found for market '{}'", market.short_name); @@ -219,3 +243,22 @@ enum Op { Add, Sub, } + +/// checks if the rustler should be running now +fn should_be_running_now(start: SchedulingRule, stop: SchedulingRule) -> bool { + let now = chrono::Local::now(); + + let start_date = start.next_from(now); + let stop_date = stop.next_from(now); + + // if start date is Some in the past and stop_date is None, we should be running + + if start_date.is_some() && stop_date.is_none() { + return true; + } + + match (start_date, stop_date) { + (Some(start), Some(stop)) => stop < start && now < stop, + _ => true, + } +}