feat: start rustler on startup when necessary

This commit is contained in:
Lucas Colombo 2024-04-23 06:36:28 -03:00
parent cbf6ba1f5a
commit 6871e656f5
Signed by: lucas
GPG Key ID: EF34786CFEFFAE35
5 changed files with 75 additions and 12 deletions

View File

@ -16,7 +16,7 @@ tasks:
run:watch: run:watch:
desc: 🚀 watch rustler desc: 🚀 watch rustler
cmds: cmds:
- cargo watch -c -x "run -- --version" - cargo watch -c -x "run"
build:watch: build:watch:
desc: 🚀 watch rustler «build» desc: 🚀 watch rustler «build»

View File

@ -2,6 +2,7 @@ use {
dotenvy::dotenv, dotenvy::dotenv,
eyre::Result, eyre::Result,
lool::logger::{info, ConsoleLogger, Level}, lool::logger::{info, ConsoleLogger, Level},
rustlers::{rustlerjar, rustlers::binance::BinanceRustler, svc::RustlersSvc},
tokio::join, tokio::join,
}; };
@ -14,7 +15,13 @@ async fn main() -> Result<()> {
dotenv()?; dotenv()?;
let conn = entities::db::get_connection().await?; let conn = entities::db::get_connection().await?;
let mut rustler = rustlers::svc::RustlersSvc::new(conn.clone(), None).await; let mut rustler = RustlersSvc::new(
conn.clone(),
rustlerjar! {
"BINANCE" => BinanceRustler
},
)
.await;
let (_grpc_res, _rustlers_res) = join! { let (_grpc_res, _rustlers_res) = join! {
grpc::server::start(conn.clone()), grpc::server::start(conn.clone()),

View File

@ -21,11 +21,13 @@ use {
#[macro_export] #[macro_export]
macro_rules! rustlerjar { macro_rules! rustlerjar {
($($($name:expr),* => $rustler:ident),* $(,)?) => {{ ($($($name:expr),* => $rustler:ident),* $(,)?) => {{
let mut instances: Vec<Box<dyn Rustler>> = Vec::new(); use $crate::rustlers::RustlerAccessor;
let mut instances: Vec<Box<dyn $crate::rustlers::Rustler>> = Vec::new();
let mut mappings = std::collections::HashMap::new(); let mut mappings = std::collections::HashMap::new();
$( $(
let instance = Box::new($rustler::new()); let instance = Box::new($rustler::create());
$( $(
mappings.insert($name.to_string(), instance.name()); mappings.insert($name.to_string(), instance.name());
)* )*

View File

@ -40,12 +40,23 @@ impl Rustler for BinanceRustler {
BINANCE_WSS_URL.bright_green() BINANCE_WSS_URL.bright_green()
); );
self.set_status(RustlerStatus::Connected)?;
Ok(()) Ok(())
} }
async fn disconnect(&mut self) -> Result<()> { async fn disconnect(&mut self) -> Result<()> {
if self.status == RustlerStatus::Disconnected || self.status == RustlerStatus::Disconnecting
{
return Ok(());
}
self.set_status(RustlerStatus::Disconnecting)?;
info!("Disconnecting from Binance WSS"); info!("Disconnecting from Binance WSS");
self.set_status(RustlerStatus::Disconnected)?;
Ok(()) Ok(())
} }

View File

@ -42,13 +42,13 @@ pub struct RustlersSvc {
impl RustlersSvc { impl RustlersSvc {
/// creates a new instance of the `RustlersSvc` /// creates a new instance of the `RustlersSvc`
pub async fn new(conn: DatabaseConnection, rustlers: Option<RustlerJar>) -> Self { pub async fn new(conn: DatabaseConnection, rustlers: RustlerJar) -> Self {
let market_svc = market::Service::new(conn).await; let market_svc = market::Service::new(conn).await;
let sched = Scheduler::new(); let sched = Scheduler::new();
Self { Self {
market_svc, market_svc,
rustlers: rustlers.unwrap(), rustlers,
sched, sched,
} }
} }
@ -68,6 +68,11 @@ impl RustlersSvc {
/// stops all rustlers and then starts them again /// stops all rustlers and then starts them again
pub async fn restart(&self) -> Result<()> { pub async fn restart(&self) -> Result<()> {
// TODO: restart all rustlers; this method should clear everything we set up about the
// rustlers and then call `start` again. here we will need access to the job handlers
// which we are not storing right now, so we're going to need to store them in the
// `RustlersSvc` struct
todo!() todo!()
} }
@ -89,28 +94,47 @@ impl RustlersSvc {
let rustler = self.rustlers.get(&market); let rustler = self.rustlers.get(&market);
if let Some(rustler) = rustler { if let Some(rustler) = rustler {
if let Some((start, end)) = rules { if let Some((start, stop)) = rules {
let start_name = format!("start-rustler-{}", market.short_name); let start_name = format!("start-rustler-{}", market.short_name);
let end_name = format!("end-rustler-{}", market.short_name); let end_name = format!("end-rustler-{}", market.short_name);
let _start_job = self // TODO: we will need to store the job handlers in the `RustlersSvc` struct
// so that we can stop them when we need to restart the rustlers
let start_job = self
.sched .sched
.schedule_fut( .schedule_fut(
start_name.to_owned(), start_name.to_owned(),
Self::start_rustler_for(rustler.clone(), tickers.clone()), Self::start_rustler_for(rustler.clone(), tickers.clone()),
start, start.clone(),
) )
.await; .await;
let _end_job = self let end_job = self
.sched .sched
.schedule_fut( .schedule_fut(
end_name.to_owned(), end_name.to_owned(),
Self::stop_rustler_for(rustler.clone(), tickers), Self::stop_rustler_for(rustler.clone(), tickers.clone()),
end, stop.clone(),
) )
.await; .await;
info!(
"Scheduled next execution for start job {start_name} for market '{}' at {:?}",
market.short_name,
start_job.get_next_run()
);
info!(
"Scheduled next execution for stop job {end_name} for market '{}' at {:?}",
market.short_name,
end_job.get_next_run()
);
if should_be_running_now(start, stop) {
info!("Starting '{start_name}' right away");
Self::start_rustler_for(rustler.clone(), tickers).await;
}
Ok(()) Ok(())
} else { } else {
warn!("No schedule rules found for market '{}'", market.short_name); warn!("No schedule rules found for market '{}'", market.short_name);
@ -219,3 +243,22 @@ enum Op {
Add, Add,
Sub, Sub,
} }
/// checks if the rustler should be running now
fn should_be_running_now(start: SchedulingRule, stop: SchedulingRule) -> bool {
let now = chrono::Local::now();
let start_date = start.next_from(now);
let stop_date = stop.next_from(now);
// if start date is Some in the past and stop_date is None, we should be running
if start_date.is_some() && stop_date.is_none() {
return true;
}
match (start_date, stop_date) {
(Some(start), Some(stop)) => stop < start && now < stop,
_ => true,
}
}