use {
super::{
rustler::{Rustler, Ticker},
rustlerjar::RustlerJar,
MarketHourType,
},
crate::{
bus::PublisherTrait,
entities::{market, sea_orm::DatabaseConnection, ticker},
rustlers::Quote,
},
eyre::Result,
lool::{
fail,
logger::{info, warn},
sched::{
recur, ruleset, scheduler::tokio::Scheduler, utils::parse_time, RecurrenceRuleSet,
SchedulingRule,
},
},
std::sync::Arc,
tokio::sync::{mpsc::Sender, Mutex},
};
/// #### ๐ ยป Rustler Message
pub enum RustlerMsg {
QuoteMsg(Quote),
}
/// #### ๐ ยป create a quote message
#[inline]
pub fn quote(
id: String,
market: String,
price: f64,
change_percent: f64,
time: i64,
market_hours: MarketHourType,
) -> RustlerMsg {
RustlerMsg::QuoteMsg(Quote {
id,
market,
price,
change_percent,
time,
market_hours,
})
}
/// #### ๐ ยป create a quote message from a `Quote`
#[inline]
pub fn to_msg(quote: Quote) -> RustlerMsg {
RustlerMsg::QuoteMsg(quote)
}
/// #### ๐ ยป Rustlers Service
///
/// `RustlersSvc` is a service that manages the rustlers and orchestrates their executions.
pub struct RustlersSvc
where
P: PublisherTrait + Send + Sync + 'static + Clone,
{
market_svc: market::Service,
sched: Scheduler,
rustlers: RustlerJar,
publisher: P,
}
impl RustlersSvc
where
Publisher: PublisherTrait + Send + Sync + 'static + Clone,
{
/// #### ๐ ยป create service
///
/// creates a new instance of the `RustlersSvc`
///
/// **Arguments**
/// - `conn` - the database connection that will be used to get market and tickers data
/// - `rustlers` - the rustlers to be used by the service
///
/// **Returns**
/// the created `RustlersSvc` instance
pub async fn new(conn: DatabaseConnection, rustlers: RustlerJar, publisher: Publisher) -> Self {
let market_svc = market::Service::new(conn).await;
let sched = Scheduler::new();
Self {
market_svc,
rustlers,
sched,
publisher,
}
}
/// #### ๐ ยป start rustlers
///
/// gets market data from the the database and starts
/// the corresponding rustler for each market
pub async fn start(&mut self) -> Result<()> {
info!("Starting rustlers");
let markets = self.market_svc.get_all_with_tickers().await?;
if !markets.is_empty() {
let (sender, mut receiver) = tokio::sync::mpsc::channel(100);
for (market, tickers) in markets {
self.schedule_rustler_for((market, tickers), sender.clone()).await?;
}
// NOTE: if we wanted to stop all the rustlers for good for some reason, we should
// use a select! instead and listen for a stop signal coming from somewhere
let mut publisher = self.publisher.clone();
while let Some(msg) = receiver.recv().await {
match msg {
RustlerMsg::QuoteMsg(quote) => publisher.publish(quote).await?,
}
}
fail!("Rustlers stopped")
} else {
fail!("No markets found")
}
}
/// #### ๐ ยป restart rustlers
///
/// stops all rustlers and then starts them again
pub async fn restart(&self) -> Result<()> {
// TODO: restart all rustlers; this method should clear everything we set up about the
// rustlers and then call `start` again. here we will need access to the job handlers
// which we are not storing right now, so we're going to need to store them in the
// `RustlersSvc` struct
todo!()
}
/// gets the right rustler for the given market and starts it
///
/// depending on the market configuraation, the rustler might be started
/// immediately or its start might be scheduled for a later time
///
/// this function also schedules the stop of the rustler at the end of the market
/// trading hours if the market is configured to stop at a specific time
async fn schedule_rustler_for(
&mut self,
market: (market::Model, Vec),
sender: Sender,
) -> Result<()> {
let (market, tickers) = market;
let tickers: Vec = tickers.into_iter().map(|t| Ticker::from(&t, &market)).collect();
let rules = self.get_schedule_rules_for(&market)?;
let rustler = self.rustlers.get(&market);
if let Some(rustler) = rustler {
{
let mut rustler = rustler.lock().await;
info!("Setting message sender for rustler '{}'", rustler.name());
rustler.set_msg_sender(Some(sender))
}
let start_name = format!("start-rustler-{}", market.short_name);
let end_name = format!("end-rustler-{}", market.short_name);
if let Some((start, stop)) = &rules {
// TODO: we will need to store the job handlers in the `RustlersSvc` struct
// so that we can stop them when we need to restart the rustlers
let start_job = self
.sched
.schedule_fut(
start_name.to_owned(),
Self::start_rustler_for(rustler.clone(), tickers.clone()),
start.clone(),
)
.await;
let end_job = self
.sched
.schedule_fut(
end_name.to_owned(),
Self::stop_rustler_for(rustler.clone(), tickers.clone()),
stop.clone(),
)
.await;
info!(
"Scheduled next execution for start job {start_name} for market '{}' at {:?}",
market.short_name,
start_job.get_next_run()
);
info!(
"Scheduled next execution for stop job {end_name} for market '{}' at {:?}",
market.short_name,
end_job.get_next_run()
);
} else {
info!("No schedule rules found for market '{}'", market.short_name);
}
if should_be_running_now(rules) {
info!("Starting '{start_name}' right away");
Self::start_rustler_for(rustler.clone(), tickers).await;
}
Ok(())
} else {
warn!("No rustler found for market '{}'", market.short_name);
Ok(())
}
}
/// creates schedule rules for the given market
fn get_schedule_rules_for(
&self,
mkt: &market::Model,
) -> Result