wip: 🚧 rustlers core further development
This commit is contained in:
parent
8e78b16165
commit
cbf6ba1f5a
@ -1,7 +1,8 @@
|
|||||||
use {
|
use {
|
||||||
dotenvy::dotenv,
|
dotenvy::dotenv,
|
||||||
eyre::Result,
|
eyre::Result,
|
||||||
lool::logger::{info, ConsoleLogger, Level}, tokio::{join, select},
|
lool::logger::{info, ConsoleLogger, Level},
|
||||||
|
tokio::join,
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: here we will trigger the start of both the grpc server and the websocket gateway
|
// TODO: here we will trigger the start of both the grpc server and the websocket gateway
|
||||||
@ -13,9 +14,9 @@ async fn main() -> Result<()> {
|
|||||||
|
|
||||||
dotenv()?;
|
dotenv()?;
|
||||||
let conn = entities::db::get_connection().await?;
|
let conn = entities::db::get_connection().await?;
|
||||||
let mut rustler = rustlers::svc::RustlersSvc::new(conn.clone()).await;
|
let mut rustler = rustlers::svc::RustlersSvc::new(conn.clone(), None).await;
|
||||||
|
|
||||||
let (grpc_res, rustlers_res) = join! {
|
let (_grpc_res, _rustlers_res) = join! {
|
||||||
grpc::server::start(conn.clone()),
|
grpc::server::start(conn.clone()),
|
||||||
rustler.start(),
|
rustler.start(),
|
||||||
};
|
};
|
||||||
|
|||||||
@ -1,25 +1,27 @@
|
|||||||
pub use sea_orm;
|
pub use sea_orm;
|
||||||
|
|
||||||
mod orm {
|
mod orm {
|
||||||
#[path = "market.rs"] pub mod market;
|
#[path = "market.rs"]
|
||||||
#[path = "ticker.rs"] pub mod ticker;
|
pub mod market;
|
||||||
|
#[path = "ticker.rs"]
|
||||||
|
pub mod ticker;
|
||||||
}
|
}
|
||||||
|
|
||||||
mod services {
|
mod services {
|
||||||
#[path = "market.rs"] pub mod market;
|
#[path = "market.rs"]
|
||||||
#[path = "ticker.rs"] pub mod ticker;
|
pub mod market;
|
||||||
|
#[path = "ticker.rs"]
|
||||||
|
pub mod ticker;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// market entities and services
|
/// market entities and services
|
||||||
pub mod market {
|
pub mod market {
|
||||||
pub use super::orm::market::*;
|
pub use super::{orm::market::*, services::market::*};
|
||||||
pub use super::services::market::*;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// ticker entities and services
|
/// ticker entities and services
|
||||||
pub mod ticker {
|
pub mod ticker {
|
||||||
pub use super::orm::ticker::*;
|
pub use super::{orm::ticker::*, services::ticker::*};
|
||||||
pub use super::services::ticker::*;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// database connection stuff
|
/// database connection stuff
|
||||||
@ -30,7 +32,6 @@ pub mod db {
|
|||||||
sea_orm::{
|
sea_orm::{
|
||||||
ConnectOptions, ConnectionTrait, Database, DatabaseConnection, DbBackend, Statement,
|
ConnectOptions, ConnectionTrait, Database, DatabaseConnection, DbBackend, Statement,
|
||||||
},
|
},
|
||||||
std::sync::Arc,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
const RUSTLER_DATABASE: &str = "RUSTLER_DATABASE";
|
const RUSTLER_DATABASE: &str = "RUSTLER_DATABASE";
|
||||||
|
|||||||
@ -4,7 +4,7 @@ use {
|
|||||||
ticker::{Entity as Ticker, Model as TickerModel},
|
ticker::{Entity as Ticker, Model as TickerModel},
|
||||||
},
|
},
|
||||||
eyre::Result,
|
eyre::Result,
|
||||||
sea_orm::{DatabaseConnection, DbErr, EntityTrait, IntoActiveModel}
|
sea_orm::{DatabaseConnection, DbErr, EntityTrait, IntoActiveModel},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
|
|||||||
@ -4,7 +4,7 @@ use {
|
|||||||
ticker::{Entity as Ticker, Model as TickerModel},
|
ticker::{Entity as Ticker, Model as TickerModel},
|
||||||
},
|
},
|
||||||
eyre::Result,
|
eyre::Result,
|
||||||
sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, IntoActiveModel, QueryFilter}
|
sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, IntoActiveModel, QueryFilter},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
|
|||||||
@ -1,5 +1,8 @@
|
|||||||
mod services {
|
mod services {
|
||||||
use entities::sea_orm::{DbErr, SqlErr};
|
use {
|
||||||
|
entities::sea_orm::{DbErr, SqlErr},
|
||||||
|
lool::s,
|
||||||
|
};
|
||||||
|
|
||||||
pub mod market;
|
pub mod market;
|
||||||
pub mod ticker;
|
pub mod ticker;
|
||||||
@ -12,7 +15,7 @@ mod services {
|
|||||||
tonic::Status::already_exists(format!("{} already exists", entity_name))
|
tonic::Status::already_exists(format!("{} already exists", entity_name))
|
||||||
}
|
}
|
||||||
Some(SqlErr::ForeignKeyConstraintViolation(_)) => {
|
Some(SqlErr::ForeignKeyConstraintViolation(_)) => {
|
||||||
tonic::Status::failed_precondition(format!("Related entity does not exist"))
|
tonic::Status::failed_precondition(s!("Related entity does not exist"))
|
||||||
}
|
}
|
||||||
_ => tonic::Status::internal(format!("Error {} {}", action, entity_name)),
|
_ => tonic::Status::internal(format!("Error {} {}", action, entity_name)),
|
||||||
}
|
}
|
||||||
|
|||||||
@ -3,7 +3,7 @@ use {
|
|||||||
entities::{market, sea_orm::DatabaseConnection, ticker},
|
entities::{market, sea_orm::DatabaseConnection, ticker},
|
||||||
eyre::Result,
|
eyre::Result,
|
||||||
lool::{cli::stylize::Stylize, logger::info},
|
lool::{cli::stylize::Stylize, logger::info},
|
||||||
std::{net::SocketAddr, sync::Arc},
|
std::net::SocketAddr,
|
||||||
tonic::transport::Server,
|
tonic::transport::Server,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -73,7 +73,7 @@ impl MarketApi for GrpcServer {
|
|||||||
|
|
||||||
let response = match result {
|
let response = match result {
|
||||||
Ok(mkts) => Ok(Response::new(Markets {
|
Ok(mkts) => Ok(Response::new(Markets {
|
||||||
markets: mkts.to_owned().into_iter().map(Market::from_model).collect(),
|
markets: mkts.iter().cloned().map(Market::from_model).collect(),
|
||||||
})),
|
})),
|
||||||
Err(err) => Err(handle_sql_err(err, "Getting", "markets")),
|
Err(err) => Err(handle_sql_err(err, "Getting", "markets")),
|
||||||
};
|
};
|
||||||
|
|||||||
@ -79,7 +79,7 @@ impl TickerApi for GrpcServer {
|
|||||||
|
|
||||||
let response = match result {
|
let response = match result {
|
||||||
Ok(mkts) => Ok(Response::new(Tickers {
|
Ok(mkts) => Ok(Response::new(Tickers {
|
||||||
tickers: mkts.to_owned().into_iter().map(Ticker::from_model).collect(),
|
tickers: mkts.iter().cloned().map(Ticker::from_model).collect(),
|
||||||
})),
|
})),
|
||||||
Err(err) => Err(handle_sql_err(err, "Getting", "tickers")),
|
Err(err) => Err(handle_sql_err(err, "Getting", "tickers")),
|
||||||
};
|
};
|
||||||
|
|||||||
@ -1,6 +1,3 @@
|
|||||||
pub mod rustlers;
|
|
||||||
pub mod rustlerjar;
|
pub mod rustlerjar;
|
||||||
|
pub mod rustlers;
|
||||||
pub mod svc;
|
pub mod svc;
|
||||||
|
|
||||||
use entities::{market, sea_orm::DatabaseConnection, ticker};
|
|
||||||
|
|
||||||
|
|||||||
@ -1,6 +1,9 @@
|
|||||||
use entities::market;
|
use {
|
||||||
use std::collections::HashMap;
|
crate::rustlers::Rustler,
|
||||||
use crate::rustlers::Rustler;
|
entities::market,
|
||||||
|
std::{collections::HashMap, sync::Arc},
|
||||||
|
tokio::sync::Mutex,
|
||||||
|
};
|
||||||
|
|
||||||
/// **🤠 » rustlerjar! macro**
|
/// **🤠 » rustlerjar! macro**
|
||||||
///
|
///
|
||||||
@ -52,7 +55,7 @@ macro_rules! rustlerjar {
|
|||||||
/// let rustler = rustler_jar.get(&market);
|
/// let rustler = rustler_jar.get(&market);
|
||||||
/// ```
|
/// ```
|
||||||
pub struct RustlerJar {
|
pub struct RustlerJar {
|
||||||
rustlers: HashMap<String, Box<dyn Rustler>>,
|
rustlers: HashMap<String, Arc<Mutex<Box<dyn Rustler>>>>,
|
||||||
mappings: HashMap<String, String>,
|
mappings: HashMap<String, String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -63,18 +66,23 @@ impl RustlerJar {
|
|||||||
pub fn new(rustlers_list: Vec<Box<dyn Rustler>>, mappings: HashMap<String, String>) -> Self {
|
pub fn new(rustlers_list: Vec<Box<dyn Rustler>>, mappings: HashMap<String, String>) -> Self {
|
||||||
let mut rustlers = HashMap::new();
|
let mut rustlers = HashMap::new();
|
||||||
for rustler in rustlers_list {
|
for rustler in rustlers_list {
|
||||||
rustlers.insert(rustler.name(), rustler);
|
rustlers.insert(rustler.name(), Arc::new(Mutex::new(rustler)));
|
||||||
}
|
}
|
||||||
|
|
||||||
Self { rustlers, mappings }
|
Self { rustlers, mappings }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// get the Rustler for the given market
|
/// get the Rustler for the given market
|
||||||
pub fn get(&self, market: &market::Model) -> Option<&Box<dyn Rustler>> {
|
pub fn get(&self, market: &market::Model) -> Option<&Arc<Mutex<Box<dyn Rustler>>>> {
|
||||||
let key = self.get_key(market);
|
let key = self.get_key(market);
|
||||||
self.rustlers.get(key)
|
self.rustlers.get(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_mut(&mut self, market: &market::Model) -> Option<&mut Arc<Mutex<Box<dyn Rustler>>>> {
|
||||||
|
let key = self.get_key(market).to_owned();
|
||||||
|
self.rustlers.get_mut(&key)
|
||||||
|
}
|
||||||
|
|
||||||
/// get the key from the mappings for the given market
|
/// get the key from the mappings for the given market
|
||||||
fn get_key(&self, market: &market::Model) -> &str {
|
fn get_key(&self, market: &market::Model) -> &str {
|
||||||
self.mappings.get(&market.short_name).unwrap()
|
self.mappings.get(&market.short_name).unwrap()
|
||||||
|
|||||||
@ -1,16 +1,15 @@
|
|||||||
pub mod binance;
|
pub mod binance;
|
||||||
|
pub extern crate chrono;
|
||||||
|
pub extern crate eyre;
|
||||||
|
|
||||||
use {
|
use {
|
||||||
async_trait::async_trait,
|
async_trait::async_trait,
|
||||||
chrono::{DateTime, Local},
|
chrono::{DateTime, Local},
|
||||||
entities::{market, ticker},
|
entities::{market, ticker},
|
||||||
std::collections::HashMap,
|
|
||||||
eyre::Result,
|
eyre::Result,
|
||||||
|
std::collections::HashMap,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub extern crate eyre;
|
|
||||||
pub extern crate chrono;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Default)]
|
#[derive(Debug, Clone, PartialEq, Eq, Default)]
|
||||||
pub enum RustlerStatus {
|
pub enum RustlerStatus {
|
||||||
Connecting,
|
Connecting,
|
||||||
@ -73,6 +72,14 @@ impl Ticker {
|
|||||||
market: m.short_name.clone(),
|
market: m.short_name.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn many_from(tickers: &[ticker::Model], market: &market::Model) -> Vec<Self> {
|
||||||
|
tickers.iter().map(|t| Self::from(t, market)).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn key(&self) -> String {
|
||||||
|
format!("{}:{}", self.market, self.symbol)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait RustlerAccessor {
|
pub trait RustlerAccessor {
|
||||||
@ -117,9 +124,9 @@ pub trait RustlerAccessor {
|
|||||||
pub trait Rustler: RustlerAccessor + Send + Sync {
|
pub trait Rustler: RustlerAccessor + Send + Sync {
|
||||||
// #region Unimplemented trait functions
|
// #region Unimplemented trait functions
|
||||||
/// fn called after tickers are added to the rustler
|
/// fn called after tickers are added to the rustler
|
||||||
fn on_add(&mut self, tickers: Vec<Ticker>) -> Result<()>;
|
fn on_add(&mut self, tickers: &[Ticker]) -> Result<()>;
|
||||||
/// fn called after tickers are deleted from the rustler
|
/// fn called after tickers are deleted from the rustler
|
||||||
fn on_delete(&mut self, tickers: Vec<Ticker>) -> Result<()>;
|
fn on_delete(&mut self, tickers: &[Ticker]) -> Result<()>;
|
||||||
/// connects the rustler to the data source
|
/// connects the rustler to the data source
|
||||||
async fn connect(&mut self) -> Result<()>;
|
async fn connect(&mut self) -> Result<()>;
|
||||||
/// disconnects the rustler from the data source
|
/// disconnects the rustler from the data source
|
||||||
@ -165,16 +172,16 @@ pub trait Rustler: RustlerAccessor + Send + Sync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// adds new tickers to the rustler
|
/// adds new tickers to the rustler
|
||||||
async fn add(&mut self, new_tickers: Vec<Ticker>) -> Result<()> {
|
async fn add(&mut self, new_tickers: &Vec<Ticker>) -> Result<()> {
|
||||||
let tickers = self.tickers_mut();
|
let tickers = self.tickers_mut();
|
||||||
|
|
||||||
for new_ticker in &new_tickers {
|
for new_ticker in new_tickers {
|
||||||
// if the ticker already exists in the tickers map, skip it
|
// if the ticker already exists in the tickers map, skip it
|
||||||
if tickers.contains_key(&new_ticker.symbol) {
|
if tickers.contains_key(&new_ticker.key()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
tickers.insert(new_ticker.symbol.clone(), new_ticker.clone());
|
tickers.insert(new_ticker.key(), new_ticker.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.opts().connect_on_add {
|
if self.opts().connect_on_add {
|
||||||
@ -189,11 +196,11 @@ pub trait Rustler: RustlerAccessor + Send + Sync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// deletes tickers from the rustler
|
/// deletes tickers from the rustler
|
||||||
async fn delete(&mut self, new_tickers: Vec<Ticker>) -> Result<()> {
|
async fn delete(&mut self, new_tickers: &Vec<Ticker>) -> Result<()> {
|
||||||
let tickers = self.tickers_mut();
|
let tickers = self.tickers_mut();
|
||||||
|
|
||||||
for new_ticker in &new_tickers {
|
for new_ticker in new_tickers {
|
||||||
tickers.remove(&new_ticker.symbol);
|
tickers.remove(&new_ticker.key());
|
||||||
}
|
}
|
||||||
|
|
||||||
// if after deleting the tickers the tickers map is
|
// if after deleting the tickers the tickers map is
|
||||||
@ -232,7 +239,10 @@ macro_rules! rustler_accessors {
|
|||||||
fn status(&self) -> &$crate::rustlers::RustlerStatus {
|
fn status(&self) -> &$crate::rustlers::RustlerStatus {
|
||||||
&self.status
|
&self.status
|
||||||
}
|
}
|
||||||
fn set_status(&mut self, status: $crate::rustlers::RustlerStatus) -> $crate::rustlers::eyre::Result<()> {
|
fn set_status(
|
||||||
|
&mut self,
|
||||||
|
status: $crate::rustlers::RustlerStatus,
|
||||||
|
) -> $crate::rustlers::eyre::Result<()> {
|
||||||
self.status = status;
|
self.status = status;
|
||||||
self.handle_status_change()?;
|
self.handle_status_change()?;
|
||||||
|
|
||||||
@ -247,31 +257,56 @@ macro_rules! rustler_accessors {
|
|||||||
fn next_run(&self) -> &$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local> {
|
fn next_run(&self) -> &$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local> {
|
||||||
&self.next_run
|
&self.next_run
|
||||||
}
|
}
|
||||||
fn set_next_run(&mut self, next_run: $crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>) {
|
fn set_next_run(
|
||||||
|
&mut self,
|
||||||
|
next_run: $crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>,
|
||||||
|
) {
|
||||||
self.next_run = next_run;
|
self.next_run = next_run;
|
||||||
}
|
}
|
||||||
fn next_stop(&self) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> {
|
fn next_stop(
|
||||||
|
&self,
|
||||||
|
) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> {
|
||||||
&self.next_stop
|
&self.next_stop
|
||||||
}
|
}
|
||||||
fn set_next_stop(&mut self, next_stop: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>) {
|
fn set_next_stop(
|
||||||
|
&mut self,
|
||||||
|
next_stop: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>,
|
||||||
|
) {
|
||||||
self.next_stop = next_stop;
|
self.next_stop = next_stop;
|
||||||
}
|
}
|
||||||
fn last_run(&self) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> {
|
fn last_run(
|
||||||
|
&self,
|
||||||
|
) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> {
|
||||||
&self.last_run
|
&self.last_run
|
||||||
}
|
}
|
||||||
fn set_last_run(&mut self, last_run: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>) {
|
fn set_last_run(
|
||||||
|
&mut self,
|
||||||
|
last_run: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>,
|
||||||
|
) {
|
||||||
self.last_run = last_run;
|
self.last_run = last_run;
|
||||||
}
|
}
|
||||||
fn last_stop(&self) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> {
|
fn last_stop(
|
||||||
|
&self,
|
||||||
|
) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> {
|
||||||
&self.last_stop
|
&self.last_stop
|
||||||
}
|
}
|
||||||
fn set_last_stop(&mut self, last_stop: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>) {
|
fn set_last_stop(
|
||||||
|
&mut self,
|
||||||
|
last_stop: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>,
|
||||||
|
) {
|
||||||
self.last_stop = last_stop;
|
self.last_stop = last_stop;
|
||||||
}
|
}
|
||||||
fn last_update(&self) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> {
|
fn last_update(
|
||||||
|
&self,
|
||||||
|
) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> {
|
||||||
&self.last_update
|
&self.last_update
|
||||||
}
|
}
|
||||||
fn set_last_update(&mut self, last_update: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>) {
|
fn set_last_update(
|
||||||
|
&mut self,
|
||||||
|
last_update: Option<
|
||||||
|
$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>,
|
||||||
|
>,
|
||||||
|
) {
|
||||||
self.last_update = last_update;
|
self.last_update = last_update;
|
||||||
}
|
}
|
||||||
fn opts(&self) -> &$crate::rustlers::RustlerOpts {
|
fn opts(&self) -> &$crate::rustlers::RustlerOpts {
|
||||||
|
|||||||
@ -1,9 +1,7 @@
|
|||||||
use {
|
use {
|
||||||
crate::{
|
crate::{
|
||||||
rustler,
|
rustler,
|
||||||
rustlers::{
|
rustlers::{Rustler, RustlerAccessor, RustlerStatus, Ticker},
|
||||||
Rustler, RustlerAccessor, RustlerStatus, Ticker
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
async_trait::async_trait,
|
async_trait::async_trait,
|
||||||
eyre::Result,
|
eyre::Result,
|
||||||
@ -23,7 +21,7 @@ rustler!(
|
|||||||
);
|
);
|
||||||
|
|
||||||
impl BinanceRustler {
|
impl BinanceRustler {
|
||||||
pub fn new() -> impl Rustler {
|
pub fn create() -> impl Rustler {
|
||||||
Self::default()
|
Self::default()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -51,13 +49,13 @@ impl Rustler for BinanceRustler {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_add(&mut self, tickers: Vec<Ticker>) -> Result<()> {
|
fn on_add(&mut self, tickers: &[Ticker]) -> Result<()> {
|
||||||
info!("Adding tickers: {:?}", tickers);
|
info!("Adding tickers: {:?}", tickers);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_delete(&mut self, tickers: Vec<Ticker>) -> Result<()> {
|
fn on_delete(&mut self, tickers: &[Ticker]) -> Result<()> {
|
||||||
info!("Deleting tickers: {:?}", tickers);
|
info!("Deleting tickers: {:?}", tickers);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@ -1,16 +1,21 @@
|
|||||||
use entities::{market, sea_orm::DatabaseConnection, ticker};
|
use {
|
||||||
use eyre::{Ok, Result};
|
entities::{market, sea_orm::DatabaseConnection, ticker},
|
||||||
use lool::{
|
eyre::Result,
|
||||||
|
lool::{
|
||||||
logger::{info, warn},
|
logger::{info, warn},
|
||||||
s,
|
|
||||||
sched::{
|
sched::{
|
||||||
recur, ruleset, scheduler::tokio::Scheduler, utils::parse_time, RecurrenceRuleSet,
|
recur, ruleset, scheduler::tokio::Scheduler, utils::parse_time, RecurrenceRuleSet,
|
||||||
SchedulingRule,
|
SchedulingRule,
|
||||||
},
|
},
|
||||||
|
},
|
||||||
|
std::sync::Arc,
|
||||||
|
tokio::sync::Mutex,
|
||||||
};
|
};
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
use crate::rustlers::{Rustler, Ticker};
|
use crate::{
|
||||||
|
rustlerjar::RustlerJar,
|
||||||
|
rustlers::{Rustler, Ticker},
|
||||||
|
};
|
||||||
|
|
||||||
// interface MarketExecData {
|
// interface MarketExecData {
|
||||||
// entity: MarketModel;
|
// entity: MarketModel;
|
||||||
@ -23,27 +28,28 @@ use crate::rustlers::{Rustler, Ticker};
|
|||||||
// // subscription?: Subscription
|
// // subscription?: Subscription
|
||||||
// }]]
|
// }]]
|
||||||
|
|
||||||
struct MarketExecData {
|
// struct MarketExecData {
|
||||||
// entity: MarketModel,
|
// entity: MarketModel,
|
||||||
// startJob?: Job,
|
// startJob?: Job,
|
||||||
// stopJob?: Job,
|
// stopJob?: Job,
|
||||||
}
|
// }
|
||||||
|
|
||||||
type RustlerFactory = Box<dyn Send + Sync + FnMut(&market::Model) -> Option<Box<dyn Rustler>>>;
|
|
||||||
|
|
||||||
pub struct RustlersSvc {
|
pub struct RustlersSvc {
|
||||||
market_svc: market::Service,
|
market_svc: market::Service,
|
||||||
factory: RustlerFactory,
|
sched: Scheduler,
|
||||||
|
rustlers: RustlerJar,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RustlersSvc {
|
impl RustlersSvc {
|
||||||
pub async fn new(conn: DatabaseConnection) -> Self {
|
/// creates a new instance of the `RustlersSvc`
|
||||||
|
pub async fn new(conn: DatabaseConnection, rustlers: Option<RustlerJar>) -> Self {
|
||||||
let market_svc = market::Service::new(conn).await;
|
let market_svc = market::Service::new(conn).await;
|
||||||
let factory = |_mkt: &market::Model| None;
|
let sched = Scheduler::new();
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
market_svc,
|
market_svc,
|
||||||
factory: Box::new(factory),
|
rustlers: rustlers.unwrap(),
|
||||||
|
sched,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -54,7 +60,7 @@ impl RustlersSvc {
|
|||||||
let markets = self.market_svc.get_all_with_tickers().await?;
|
let markets = self.market_svc.get_all_with_tickers().await?;
|
||||||
|
|
||||||
for (market, tickers) in markets {
|
for (market, tickers) in markets {
|
||||||
self.start_rustler_for((market, tickers)).await?;
|
self.schedule_rustler_for((market, tickers)).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -65,29 +71,45 @@ impl RustlersSvc {
|
|||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// gets the corresponding rustler for the given market and starts it
|
/// gets the right rustler for the given market and starts it
|
||||||
///
|
///
|
||||||
/// depending on the market configuraation, the rustler might be started
|
/// depending on the market configuraation, the rustler might be started
|
||||||
/// immediately or its start might be scheduled for a later time
|
/// immediately or its start might be scheduled for a later time
|
||||||
///
|
///
|
||||||
/// this function also schedules the stop of the rustler at the end of the market
|
/// this function also schedules the stop of the rustler at the end of the market
|
||||||
/// trading hours if the market is configured to stop at a specific time
|
/// trading hours if the market is configured to stop at a specific time
|
||||||
async fn start_rustler_for(
|
async fn schedule_rustler_for(
|
||||||
&mut self,
|
&mut self,
|
||||||
market: (market::Model, Vec<ticker::Model>),
|
market: (market::Model, Vec<ticker::Model>),
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let (market, tickers) = market;
|
let (market, tickers) = market;
|
||||||
|
let tickers: Vec<Ticker> = tickers.into_iter().map(|t| Ticker::from(&t, &market)).collect();
|
||||||
|
|
||||||
let rules = self.get_schedule_rules_for(&market)?;
|
let rules = self.get_schedule_rules_for(&market)?;
|
||||||
let rustler = (self.factory)(&market);
|
let rustler = self.rustlers.get(&market);
|
||||||
|
|
||||||
if let Some(rustler) = rustler {
|
if let Some(rustler) = rustler {
|
||||||
if let Some((start, end)) = rules {
|
if let Some((start, end)) = rules {
|
||||||
let mut sched = Scheduler::new();
|
let start_name = format!("start-rustler-{}", market.short_name);
|
||||||
let start_name = format!("start-{}", market.short_name);
|
let end_name = format!("end-rustler-{}", market.short_name);
|
||||||
let end_name = format!("end-{}", market.short_name);
|
|
||||||
|
|
||||||
let start_job = sched.schedule(start_name, || async move {}, start).await;
|
let _start_job = self
|
||||||
let end_job = sched.schedule(end_name, || async move {}, end).await;
|
.sched
|
||||||
|
.schedule_fut(
|
||||||
|
start_name.to_owned(),
|
||||||
|
Self::start_rustler_for(rustler.clone(), tickers.clone()),
|
||||||
|
start,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let _end_job = self
|
||||||
|
.sched
|
||||||
|
.schedule_fut(
|
||||||
|
end_name.to_owned(),
|
||||||
|
Self::stop_rustler_for(rustler.clone(), tickers),
|
||||||
|
end,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
@ -123,44 +145,56 @@ impl RustlersSvc {
|
|||||||
Ok(Some((recur(&start_rule), recur(&stop_rule))))
|
Ok(Some((recur(&start_rule), recur(&stop_rule))))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// get the rustler according to the market
|
|
||||||
fn get_rustler_for(&mut self, market: &market::Model) -> Option<Box<dyn Rustler>> {
|
|
||||||
let scrapper = (self.factory)(market);
|
|
||||||
|
|
||||||
|
|
||||||
scrapper
|
|
||||||
}
|
|
||||||
|
|
||||||
/// starts a rustler by adding the tickers to it
|
/// starts a rustler by adding the tickers to it
|
||||||
async fn start_rustler(
|
async fn start_rustler_for(rustler: Arc<Mutex<Box<dyn Rustler>>>, tickers: Vec<Ticker>) {
|
||||||
rustler: &mut Box<dyn Rustler>,
|
let mut rustler = rustler.lock().await;
|
||||||
market: market::Model,
|
match rustler.start().await {
|
||||||
tickers: Vec<ticker::Model>,
|
Ok(()) => {
|
||||||
) -> Result<()> {
|
if !tickers.is_empty() {
|
||||||
if tickers.len() > 0 {
|
info!("Rustler {} started for market", rustler.name());
|
||||||
let tickers: Vec<Ticker> =
|
|
||||||
tickers.into_iter().map(|t| Ticker::from(&t, &market)).collect();
|
|
||||||
|
|
||||||
rustler.add(tickers).await?;
|
match rustler.add(&tickers).await {
|
||||||
|
Ok(()) => info!(
|
||||||
|
"Tickers {:?} added to rustler '{}'",
|
||||||
|
tickers,
|
||||||
|
rustler.name()
|
||||||
|
),
|
||||||
|
Err(e) => warn!(
|
||||||
|
"Failed to add tickers to rustler '{}': {}",
|
||||||
|
rustler.name(),
|
||||||
|
e
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => warn!("Failed to start rustler '{}': {}", rustler.name(), e),
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
/// stops a rustler for the given market/tickers
|
||||||
|
///
|
||||||
|
/// if the rustler is being used by other markets, or the ticker list does not contain
|
||||||
|
/// all the tickers that the rustler is using for the given market, the rustler will not
|
||||||
|
/// be stopped, but will stop gathering data for the given tickers.
|
||||||
|
async fn stop_rustler_for(rustler: Arc<Mutex<Box<dyn Rustler>>>, tickers: Vec<Ticker>) {
|
||||||
|
let mut rustler = rustler.lock().await;
|
||||||
|
|
||||||
|
if !tickers.is_empty() {
|
||||||
|
// we delete the tickers from the rustler, but it will still be running if
|
||||||
|
// there are other markets using the same rustler.
|
||||||
|
match rustler.delete(&tickers).await {
|
||||||
|
Ok(()) => info!(
|
||||||
|
"Tickers {:?} removed from rustler '{}'",
|
||||||
|
tickers,
|
||||||
|
rustler.name()
|
||||||
|
),
|
||||||
|
Err(e) => warn!(
|
||||||
|
"Failed to remove tickers from rustler '{}': {}",
|
||||||
|
rustler.name(),
|
||||||
|
e
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// stops a rustler by deleting all its tickers
|
|
||||||
async fn stop_rustler(
|
|
||||||
rustler: &mut Box<dyn Rustler>,
|
|
||||||
market: market::Model,
|
|
||||||
tickers: Vec<ticker::Model>,
|
|
||||||
) -> Result<()> {
|
|
||||||
if tickers.len() > 0 {
|
|
||||||
let tickers: Vec<Ticker> =
|
|
||||||
tickers.into_iter().map(|t| Ticker::from(&t, &market)).collect();
|
|
||||||
|
|
||||||
rustler.delete(tickers).await?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user