wip: 🚧 core functionality

This commit is contained in:
Lucas Colombo 2024-04-18 05:58:07 -03:00
parent adb088cadc
commit 4fd13330cf
Signed by: lucas
GPG Key ID: EF34786CFEFFAE35
28 changed files with 1238 additions and 176 deletions

View File

@ -6,10 +6,6 @@
}
</style>
<path class="a" d="M32,22H28a4,4,0,0,0-4,4H22a4,4,0,0,0,4,4h4a4,4,0,0,0-4-4h2A4,4,0,0,0,32,22Z" />
<path class="a" d="M9,31V21H24.95508A10.97052,10.97052,0,0,1,14,31Z" />
<path class="a" d="M23.80175,22A9.97326,9.97326,0,0,1,14,30H10V22H23.80175m1.68111-2H8.5a.5.5,0,0,0-.5.5v11a.5.5,0,0,0,.5.5H14A12.00035,12.00035,0,0,0,25.989,20.51848.5052.5052,0,0,0,25.48286,20Z" />
<path class="a" d="M8.5,14H12a0,0,0,0,1,0,0v0a4,4,0,0,1-4,4H8a0,0,0,0,1,0,0V14.5A.5.5,0,0,1,8.5,14Z" />
<path class="a" d="M18.5,14H22a0,0,0,0,1,0,0v0a4,4,0,0,1-4,4h0a0,0,0,0,1,0,0V14.5A.5.5,0,0,1,18.5,14Z" />
<path class="a" d="M30,4.5V8H26V.5a.5.5,0,0,0-.5-.5h-7a.5.5,0,0,0-.5.5V2H14V.5a.5.5,0,0,0-.5-.5h-5A.5.5,0,0,0,8,.5V8H4V4.5A.5.5,0,0,0,3.5,4H.5a.5.5,0,0,0-.5.5V8a4,4,0,0,0,4,4H30a4,4,0,0,0,4-4V4.5a.5.5,0,0,0-.5-.5h-3A.5.5,0,0,0,30,4.5ZM22,8H12V4h2V5.5a.5.5,0,0,0,.5.5h3a.5.5,0,0,0,.5-.5V4h4Z" />
<path class="a" d="M32,20H28a3.96994,3.96994,0,0,0-2.323.75452,12.03549,12.03549,0,0,0,.312-2.236.5052.5052,0,0,0-.49176-.51831L25.48285,18H8.5a.5.5,0,0,0-.5.5v11a.5.5,0,0,0,.5.5H14a11.95959,11.95959,0,0,0,8.69525-3.74884A3.99448,3.99448,0,0,0,26,28h4a4,4,0,0,0-4-4h2A4,4,0,0,0,32,20Z" />
<path class="a" d="M30,6.5V10H26V2.5a.5.5,0,0,0-.5-.5h-7a.5.5,0,0,0-.5.5V4H14V2.5a.5.5,0,0,0-.5-.5h-5a.5.5,0,0,0-.5.5V10H4V6.5A.5.5,0,0,0,3.5,6H.5a.5.5,0,0,0-.5.5V10a4,4,0,0,0,4,4H30a4,4,0,0,0,4-4V6.5a.5.5,0,0,0-.5-.5h-3A.5.5,0,0,0,30,6.5ZM22,10H12V6h2V7.5a.5.5,0,0,0,.5.5h3a.5.5,0,0,0,.5-.5V6h4Z" />
</svg>

Before

Width:  |  Height:  |  Size: 1.1 KiB

After

Width:  |  Height:  |  Size: 801 B

View File

@ -1,7 +1,7 @@
{
"files.exclude": {
// 🧩 modules
"migration": true,
// "migration": true,
// "entities": true,
//
@ -24,12 +24,12 @@
// 🗑
"check_size.py": true,
"*.code-workspace": true,
// ".gitignore": true,
".gitignore": true,
".vscode": true,
".git": true,
// 🗄 database
"rustler.db": true,
// "rustler.db": true,
"rustler.db-shm": true,
"rustler.db-wal": true,
"rustler.db.bkp": true,

128
Cargo.lock generated
View File

@ -317,9 +317,9 @@ checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799"
[[package]]
name = "async-trait"
version = "0.1.78"
version = "0.1.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "461abc97219de0eaaf81fe3ef974a540158f3d079c2ab200f891f1a2ef201e85"
checksum = "a507401cad91ec6a857ed5513a2073c82a9b9048762b885bb98655b306964681"
dependencies = [
"proc-macro2",
"quote",
@ -358,7 +358,7 @@ dependencies = [
"bitflags 1.3.2",
"bytes",
"futures-util",
"http",
"http 0.2.12",
"http-body",
"hyper",
"itoa",
@ -384,7 +384,7 @@ dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http 0.2.12",
"http-body",
"mime",
"rustversion",
@ -566,14 +566,16 @@ checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e"
[[package]]
name = "chrono"
version = "0.4.35"
version = "0.4.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8eaf5903dcbc0a39312feb77df2ff4c76387d591b9fc7b04a238dcf8bb62639a"
checksum = "8a0d04d43504c61aa6c7531f1871dd0d418d91130162063b789da00fd7057a5e"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"serde",
"wasm-bindgen",
"windows-targets 0.52.4",
]
@ -703,6 +705,12 @@ dependencies = [
"typenum",
]
[[package]]
name = "data-encoding"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5"
[[package]]
name = "der"
version = "0.7.8"
@ -1066,6 +1074,18 @@ dependencies = [
"wasi",
]
[[package]]
name = "getset"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e45727250e75cc04ff2846a66397da8ef2b3db8e40e0cef4df67950a07621eb9"
dependencies = [
"proc-macro-error",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "gimli"
version = "0.28.1"
@ -1113,7 +1133,7 @@ dependencies = [
"futures-core",
"futures-sink",
"futures-util",
"http",
"http 0.2.12",
"indexmap 2.2.6",
"slab",
"tokio",
@ -1214,6 +1234,17 @@ dependencies = [
"itoa",
]
[[package]]
name = "http"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.6"
@ -1221,7 +1252,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
dependencies = [
"bytes",
"http",
"http 0.2.12",
"pin-project-lite",
]
@ -1248,13 +1279,13 @@ dependencies = [
"futures-core",
"futures-util",
"h2",
"http",
"http 0.2.12",
"http-body",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"socket2 0.5.6",
"socket2 0.4.10",
"tokio",
"tower-service",
"tracing",
@ -1470,13 +1501,17 @@ dependencies = [
[[package]]
name = "lool"
version = "0.0.5"
version = "0.1.4"
source = "sparse+http://lugit.local/api/packages/lucodear/cargo/"
checksum = "c727e0c375380178e0b15f8f54f249e5e2a9e72c20475e8a7807f4c72d253106"
checksum = "4320ce16557a892f40c0a776371ac0c3cda2c9312c83065a4ab2a1facf571539"
dependencies = [
"bitflags 2.5.0",
"chrono",
"eyre",
"log",
"num-traits",
"tokio",
"tokio_schedule",
]
[[package]]
@ -2258,14 +2293,30 @@ dependencies = [
name = "rustler"
version = "0.0.0-alpha.0"
dependencies = [
"chrono",
"dotenvy",
"entities",
"eyre",
"grpc",
"lool",
"rustlers",
"tokio",
]
[[package]]
name = "rustlers"
version = "0.1.0"
dependencies = [
"async-trait",
"chrono",
"entities",
"eyre",
"getset",
"lool",
"tokio-tungstenite",
"tonic-build",
]
[[package]]
name = "rustversion"
version = "1.0.14"
@ -3015,9 +3066,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.36.0"
version = "1.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931"
checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787"
dependencies = [
"backtrace",
"bytes",
@ -3062,6 +3113,18 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite",
]
[[package]]
name = "tokio-util"
version = "0.7.10"
@ -3076,6 +3139,16 @@ dependencies = [
"tracing",
]
[[package]]
name = "tokio_schedule"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b4a14ab1658c39d137ebcc5fbaab364c9922a6cc04ab48b364546c2e6022256"
dependencies = [
"chrono",
"tokio",
]
[[package]]
name = "toml_datetime"
version = "0.6.5"
@ -3105,7 +3178,7 @@ dependencies = [
"base64",
"bytes",
"h2",
"http",
"http 0.2.12",
"http-body",
"hyper",
"hyper-timeout",
@ -3218,6 +3291,25 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http 1.1.0",
"httparse",
"log",
"rand",
"sha1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.17.0"
@ -3274,6 +3366,12 @@ version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da"
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "utf8parse"
version = "0.2.1"

View File

@ -22,21 +22,26 @@ name = "rustler"
path = "app/main.rs"
[workspace]
members = [".", "migration", "entities", "grpc"]
members = [".", "migration", "entities", "grpc", "rustlers"]
[workspace.dependencies]
lool = { version = "0.0.5", registry = "lugit" } # crates: disable-check
lool = { version = "0.1.4", registry = "lugit" } # crates: disable-check
eyre = { version = "0.6.12", default-features = false }
tokio-tungstenite = { version = "0.21.0" }
[dependencies]
# internal
entities = { path = "entities" }
grpc = { path = "grpc" }
rustlers = { path = "rustlers" }
# workspace
eyre = { workspace = true, default-features = false, features = ["auto-install"] }
lool = { workspace = true, features = ["logger", "macros", "cli-stylize"] }
eyre = { workspace = true, default-features = false, features = [
"auto-install",
] }
lool = { workspace = true }
# external
tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread"] }
dotenvy = "0.15.7"
chrono = "0.4.37"

View File

@ -34,6 +34,11 @@ tasks:
cmds:
- cargo +nightly fmt --all
test:
desc: 🧪 test lool
cmds:
- cargo nextest run --all-features --workspace
lint:
desc: 🧶 lint rustler
cmds:

View File

@ -1,7 +1,7 @@
use {
dotenvy::dotenv,
eyre::Result,
lool::logger::{info, ConsoleLogger, Level},
lool::logger::{info, ConsoleLogger, Level}, tokio::{join, select},
};
// TODO: here we will trigger the start of both the grpc server and the websocket gateway
@ -13,8 +13,12 @@ async fn main() -> Result<()> {
dotenv()?;
let conn = entities::db::get_connection().await?;
let mut rustler = rustlers::svc::RustlersSvc::new(conn.clone()).await;
grpc::server::start(conn.clone()).await?;
let (grpc_res, rustlers_res) = join! {
grpc::server::start(conn.clone()),
rustler.start(),
};
info!("Shutting down");
Ok(())

View File

@ -11,7 +11,7 @@ path = "src/lib.rs"
[dependencies]
# common
eyre = { workspace = true, default-features = false }
lool = { workspace = true }
lool = { workspace = true, features = [ "cli", "cli.stylize", "macros", "logger" ] }
# external
sea-orm = { version = "0.12.15", features = [

View File

@ -1,8 +1,28 @@
pub mod market;
pub mod ticker;
pub use sea_orm;
mod orm {
#[path = "market.rs"] pub mod market;
#[path = "ticker.rs"] pub mod ticker;
}
mod services {
#[path = "market.rs"] pub mod market;
#[path = "ticker.rs"] pub mod ticker;
}
/// market entities and services
pub mod market {
pub use super::orm::market::*;
pub use super::services::market::*;
}
/// ticker entities and services
pub mod ticker {
pub use super::orm::ticker::*;
pub use super::services::ticker::*;
}
/// database connection stuff
pub mod db {
use {
eyre::Result,
@ -25,14 +45,14 @@ pub mod db {
conn_str
}
pub async fn get_connection() -> Result<Arc<DatabaseConnection>> {
pub async fn get_connection() -> Result<DatabaseConnection> {
let db_conn_str =
std::env::var(RUSTLER_DATABASE).unwrap_or_else(|_| get_default_conn_str());
let mut conn_opts = ConnectOptions::new(db_conn_str.to_owned());
conn_opts.sqlx_logging(false);
let conn = Arc::new(Database::connect(conn_opts).await?);
let conn = Database::connect(conn_opts).await?;
conn.query_one(Statement::from_string(
DbBackend::Sqlite,

View File

@ -1,29 +0,0 @@
use {
super::{Entity as MarketEntity, Model as MarketModel},
eyre::Result,
sea_orm::{DatabaseConnection, EntityTrait, IntoActiveModel},
std::{sync::Arc, time::Instant},
};
pub struct Service {
conn: Arc<DatabaseConnection>,
}
impl Service {
pub async fn new(conn: Arc<DatabaseConnection>) -> Self {
Self { conn }
}
pub async fn get_all(&self) -> Result<Vec<MarketModel>> {
// let conn = self.conn.unwrap();
let markets = MarketEntity::find().all(&*self.conn).await?;
Ok(markets)
}
pub async fn create(&self, market: MarketModel) -> Result<MarketModel> {
let start = Instant::now();
MarketEntity::insert(market.clone().into_active_model()).exec(&*self.conn).await?;
println!("insert market model took {:?}", start.elapsed());
Ok(market)
}
}

View File

@ -1,7 +1,6 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.15
mod svc;
use sea_orm::entity::prelude::*;
pub use svc::Service;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "market")]
@ -10,13 +9,13 @@ pub struct Model {
pub id: String,
pub short_name: String,
pub full_name: String,
pub opens_from: i32,
pub opens_till: i32,
pub open_time: String,
pub close_time: String,
pub pre_market_offset: i32,
pub post_market_offset: i32,
pub time_zone_offset: i32,
pub opens_from: Option<u32>,
pub opens_till: Option<u32>,
pub open_time: Option<String>,
pub close_time: Option<String>,
pub pre_market_offset: Option<u32>,
pub post_market_offset: Option<u32>,
pub time_zone_offset: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -0,0 +1,37 @@
use {
crate::{
market::{Entity as Market, Model as MarketModel},
ticker::{Entity as Ticker, Model as TickerModel},
},
eyre::Result,
sea_orm::{DatabaseConnection, DbErr, EntityTrait, IntoActiveModel}
};
pub struct Service {
conn: DatabaseConnection,
}
impl Service {
pub async fn new(conn: DatabaseConnection) -> Self {
Self { conn }
}
pub async fn get_all(&self) -> Result<Vec<MarketModel>, DbErr> {
let markets = Market::find().all(&self.conn).await?;
Ok(markets)
}
pub async fn create(&self, market: MarketModel) -> Result<MarketModel, DbErr> {
Market::insert(market.clone().into_active_model()).exec(&self.conn).await?;
Ok(market)
}
pub async fn get_all_with_tickers(
&self,
) -> Result<Vec<(MarketModel, Vec<TickerModel>)>, DbErr> {
let markets_with_tickers: Vec<(MarketModel, Vec<TickerModel>)> =
Market::find().find_with_related(Ticker).all(&self.conn).await?;
Ok(markets_with_tickers)
}
}

View File

@ -0,0 +1,40 @@
use {
crate::{
orm::ticker,
ticker::{Entity as Ticker, Model as TickerModel},
},
eyre::Result,
sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, IntoActiveModel, QueryFilter}
};
pub struct Service {
conn: DatabaseConnection,
}
impl Service {
pub async fn new(conn: DatabaseConnection) -> Self {
Self { conn }
}
pub async fn get_all(&self) -> Result<Vec<TickerModel>, DbErr> {
let tickers = Ticker::find().all(&self.conn).await?;
Ok(tickers)
}
pub async fn get(&self, id: String) -> Result<Option<TickerModel>, DbErr> {
let ticker = Ticker::find_by_id(id).one(&self.conn).await?;
Ok(ticker)
}
pub async fn get_by_symbol(&self, symbol: String) -> Result<Option<TickerModel>, DbErr> {
let ticker =
Ticker::find().filter(ticker::Column::Symbol.eq(symbol)).one(&self.conn).await?;
Ok(ticker)
}
pub async fn create(&self, market: TickerModel) -> Result<TickerModel, DbErr> {
Ticker::insert(market.clone().into_active_model()).exec(&self.conn).await?;
Ok(market)
}
}

View File

@ -1,5 +1,5 @@
fn main() {
let proto_files = vec!["./proto/rustler.proto", "./proto/market.proto"];
let proto_files = vec!["./proto/rustler.proto", "./proto/market.proto", "./proto/ticker.proto"];
for proto_file in proto_files {
compile_proto(proto_file);

View File

@ -13,13 +13,13 @@ message Market {
string id = 1;
string short_name = 2;
string full_name = 3;
int32 opens_from = 4;
int32 opens_till = 5;
string open_time = 6;
string close_time = 7;
int32 pre_market_offset = 8;
int32 post_market_offset = 9;
int32 time_zone_offset = 10;
optional uint32 opens_from = 4;
optional uint32 opens_till = 5;
optional string open_time = 6;
optional string close_time = 7;
optional uint32 pre_market_offset = 8;
optional uint32 post_market_offset = 9;
optional string time_zone_offset = 10;
}
message Markets {

29
grpc/proto/ticker.proto Normal file
View File

@ -0,0 +1,29 @@
syntax = "proto3";
package ticker;
service TickerApi {
rpc GetAll (Empty) returns (Tickers) {}
rpc Create (Ticker) returns (Ticker) {}
rpc Get (TickerId) returns (Ticker) {}
}
message TickerId {
string id = 1;
}
message TickerSymbol {
string symbol = 1;
}
message Empty { }
message Ticker {
string id = 1;
string symbol = 2;
string market_id = 3;
}
message Tickers {
repeated Ticker tickers = 1;
}

View File

@ -1 +1,22 @@
mod services {
use entities::sea_orm::{DbErr, SqlErr};
pub mod market;
pub mod ticker;
pub(crate) fn handle_sql_err(err: DbErr, action: &str, entity_name: &str) -> tonic::Status {
let sqlerr = err.sql_err();
match sqlerr {
Some(SqlErr::UniqueConstraintViolation(_)) => {
tonic::Status::already_exists(format!("{} already exists", entity_name))
}
Some(SqlErr::ForeignKeyConstraintViolation(_)) => {
tonic::Status::failed_precondition(format!("Related entity does not exist"))
}
_ => tonic::Status::internal(format!("Error {} {}", action, entity_name)),
}
}
}
pub mod server;

View File

@ -1,93 +1,16 @@
use {
self::market_mod::Empty,
entities::{market, sea_orm::DatabaseConnection},
crate::services,
entities::{market, sea_orm::DatabaseConnection, ticker},
eyre::Result,
lool::{cli::stylize::Stylize, logger::info},
market_mod::{
market_api_server::{MarketApi, MarketApiServer},
Market, Markets,
},
std::{net::SocketAddr, sync::Arc, time::Instant},
tonic::{transport::Server, Request, Response, Status},
std::{net::SocketAddr, sync::Arc},
tonic::transport::Server,
};
pub mod market_mod {
tonic::include_proto!("market");
}
const RUSTLER_GRPC_API_ADDR: &str = "RUSTLER_GRPC_API_ADDR";
impl Market {
fn into_model(self) -> market::Model {
market::Model {
id: self.id,
short_name: self.short_name,
full_name: self.full_name,
opens_from: self.opens_from,
opens_till: self.opens_till,
open_time: self.open_time,
close_time: self.close_time,
pre_market_offset: self.pre_market_offset,
post_market_offset: self.post_market_offset,
time_zone_offset: self.time_zone_offset,
}
}
fn from_model(model: market::Model) -> Self {
Self {
id: model.id,
short_name: model.short_name,
full_name: model.full_name,
opens_from: model.opens_from,
opens_till: model.opens_till,
open_time: model.open_time,
close_time: model.close_time,
pre_market_offset: model.pre_market_offset,
post_market_offset: model.post_market_offset,
time_zone_offset: model.time_zone_offset,
}
}
}
pub struct GrpcServer {
svc: market::Service,
}
#[tonic::async_trait]
impl MarketApi for GrpcServer {
async fn get_all(&self, _: Request<Empty>) -> Result<Response<Markets>, Status> {
let start = Instant::now();
let response = if let Ok(mkts) = self.svc.get_all().await {
Ok(Response::new(Markets {
markets: mkts.into_iter().map(Market::from_model).collect(),
}))
} else {
Err(Status::internal("Failed to get markets"))
};
info!("`MarketApi.get_all` took {:?}", start.elapsed());
response
}
async fn create(&self, market: Request<Market>) -> Result<Response<Market>, Status> {
let start = Instant::now();
let mkt = market.into_inner().into_model();
let response = if let Ok(m) = self.svc.create(mkt).await {
Ok(Response::new(Market::from_model(m)))
} else {
Err(Status::internal("Failed to create market"))
};
info!("`MarketApi.create` took {:?}", start.elapsed());
response
}
}
/// Starts the gRPC server
pub async fn start(conn: Arc<DatabaseConnection>) -> Result<()> {
pub async fn start(conn: DatabaseConnection) -> Result<()> {
fn get_default_addr() -> String {
let addr = "0.0.0.0:50051";
info!(
@ -101,14 +24,22 @@ pub async fn start(conn: Arc<DatabaseConnection>) -> Result<()> {
let addr: SocketAddr =
std::env::var(RUSTLER_GRPC_API_ADDR).unwrap_or_else(|_| get_default_addr()).parse()?;
let svc = market::Service::new(conn).await;
let server = GrpcServer { svc };
let market_db = market::Service::new(conn.clone()).await;
let ticker_db = ticker::Service::new(conn.clone()).await;
let market_grpc = services::market::GrpcServer { svc: market_db };
let ticker_grpc = services::ticker::GrpcServer { svc: ticker_db };
info!(
"🎉 gRPC server listening on {}",
addr.clone().to_string().green()
);
Server::builder().add_service(MarketApiServer::new(server)).serve(addr).await?;
Server::builder()
.add_service(market_grpc.svc())
.add_service(ticker_grpc.svc())
.serve(addr)
.await?;
Ok(())
}

View File

@ -0,0 +1,99 @@
use {
self::market_mod::Empty,
crate::services::handle_sql_err,
entities::market,
eyre::Result,
lool::logger::{error, info},
market_mod::{
market_api_server::{MarketApi, MarketApiServer},
Market, Markets,
},
std::{any::Any, fmt::Debug, time::Instant},
tonic::{Request, Response, Status},
};
pub mod market_mod {
tonic::include_proto!("market");
}
impl Market {
fn into_model(self) -> market::Model {
market::Model {
id: self.id,
short_name: self.short_name,
full_name: self.full_name,
opens_from: self.opens_from,
opens_till: self.opens_till,
open_time: self.open_time,
close_time: self.close_time,
pre_market_offset: self.pre_market_offset,
post_market_offset: self.post_market_offset,
time_zone_offset: self.time_zone_offset,
}
}
fn from_model(model: market::Model) -> Self {
Self {
id: model.id,
short_name: model.short_name,
full_name: model.full_name,
opens_from: model.opens_from,
opens_till: model.opens_till,
open_time: model.open_time,
close_time: model.close_time,
pre_market_offset: model.pre_market_offset,
post_market_offset: model.post_market_offset,
time_zone_offset: model.time_zone_offset,
}
}
}
pub struct GrpcServer {
pub(crate) svc: market::Service,
}
impl GrpcServer {
pub fn log_if_err<T: Any, K: Debug>(&self, res: &Result<T, K>) {
if let Err(err) = &res {
error!("{:?}", err);
}
}
pub fn svc(self) -> MarketApiServer<GrpcServer> {
MarketApiServer::new(self)
}
}
#[tonic::async_trait]
impl MarketApi for GrpcServer {
async fn get_all(&self, _: Request<Empty>) -> Result<Response<Markets>, Status> {
let start = Instant::now();
let result = self.svc.get_all().await;
self.log_if_err(&result);
let response = match result {
Ok(mkts) => Ok(Response::new(Markets {
markets: mkts.to_owned().into_iter().map(Market::from_model).collect(),
})),
Err(err) => Err(handle_sql_err(err, "Getting", "markets")),
};
info!("`MarketApi.get_all` took {:?}", start.elapsed());
response
}
async fn create(&self, market: Request<Market>) -> Result<Response<Market>, Status> {
let start = Instant::now();
let mkt = market.into_inner().into_model();
let result = self.svc.create(mkt).await;
self.log_if_err(&result);
let response = match result {
Ok(m) => Ok(Response::new(Market::from_model(m))),
Err(err) => Err(handle_sql_err(err, "creating", "market")),
};
info!("`MarketApi.create` took {:?}", start.elapsed());
response
}
}

105
grpc/src/services/ticker.rs Normal file
View File

@ -0,0 +1,105 @@
use {
crate::services::handle_sql_err,
entities::ticker,
eyre::Result,
lool::logger::{error, info},
std::{any::Any, fmt::Debug, time::Instant},
ticker_mod::{
ticker_api_server::{TickerApi, TickerApiServer},
Empty, Ticker, TickerId, Tickers,
},
tonic::{Request, Response, Status},
};
pub mod ticker_mod {
tonic::include_proto!("ticker");
}
impl Ticker {
fn into_model(self) -> ticker::Model {
ticker::Model {
id: self.id,
symbol: self.symbol,
market_id: self.market_id,
}
}
fn from_model(model: ticker::Model) -> Self {
Self {
id: model.id,
symbol: model.symbol,
market_id: model.market_id,
}
}
}
pub struct GrpcServer {
pub(crate) svc: ticker::Service,
}
impl GrpcServer {
pub fn log_if_err<T: Any, K: Debug>(&self, res: &Result<T, K>) {
if let Err(err) = &res {
error!("{:?}", err);
}
}
pub fn svc(self) -> TickerApiServer<GrpcServer> {
TickerApiServer::new(self)
}
}
#[tonic::async_trait]
impl TickerApi for GrpcServer {
async fn get(&self, req: Request<TickerId>) -> Result<Response<Ticker>, Status> {
let start = Instant::now();
let mkt = req.into_inner();
let result = self.svc.get(mkt.id).await;
self.log_if_err(&result);
let response = match result {
Ok(m) => {
if let Some(m) = m {
Ok(Response::new(Ticker::from_model(m)))
} else {
Err(Status::not_found("Ticker not found"))
}
}
Err(err) => Err(handle_sql_err(err, "Getting", "ticker")),
};
info!("`TickerApi.get` took {:?}", start.elapsed());
response
}
async fn get_all(&self, _: Request<Empty>) -> Result<Response<Tickers>, Status> {
let start = Instant::now();
let result = self.svc.get_all().await;
self.log_if_err(&result);
let response = match result {
Ok(mkts) => Ok(Response::new(Tickers {
tickers: mkts.to_owned().into_iter().map(Ticker::from_model).collect(),
})),
Err(err) => Err(handle_sql_err(err, "Getting", "tickers")),
};
info!("`TickerApi.get_all` took {:?}", start.elapsed());
response
}
async fn create(&self, market: Request<Ticker>) -> Result<Response<Ticker>, Status> {
let start = Instant::now();
let mkt = market.into_inner().into_model();
let result = self.svc.create(mkt).await;
self.log_if_err(&result);
let response = match result {
Ok(m) => Ok(Response::new(Ticker::from_model(m))),
Err(err) => Err(handle_sql_err(err, "creating", "ticker")),
};
info!("`TickerApi.create` took {:?}", start.elapsed());
response
}
}

View File

@ -14,13 +14,13 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Market::Id).string().not_null().primary_key())
.col(ColumnDef::new(Market::ShortName).string().not_null())
.col(ColumnDef::new(Market::FullName).string().not_null())
.col(ColumnDef::new(Market::OpensFrom).integer().not_null())
.col(ColumnDef::new(Market::OpensTill).integer().not_null())
.col(ColumnDef::new(Market::OpenTime).string().not_null())
.col(ColumnDef::new(Market::CloseTime).string().not_null())
.col(ColumnDef::new(Market::PreMarketOffset).integer().not_null())
.col(ColumnDef::new(Market::PostMarketOffset).integer().not_null())
.col(ColumnDef::new(Market::TimeZoneOffset).integer().not_null())
.col(ColumnDef::new(Market::OpensFrom).unsigned().null())
.col(ColumnDef::new(Market::OpensTill).unsigned().null())
.col(ColumnDef::new(Market::OpenTime).string().null())
.col(ColumnDef::new(Market::CloseTime).string().null())
.col(ColumnDef::new(Market::PreMarketOffset).unsigned().null())
.col(ColumnDef::new(Market::PostMarketOffset).unsigned().null())
.col(ColumnDef::new(Market::TimeZoneOffset).string().null())
.to_owned(),
)
.await

View File

@ -4,3 +4,4 @@ chain_width = 100
comment_width = 80
imports_indent = "Block"
imports_granularity = "One"
empty_item_single_line = true

View File

@ -0,0 +1,31 @@
[package]
name = "rustlers"
version = "0.1.0"
edition = "2021"
publish = false
[lib]
name = "rustlers"
path = "src/lib.rs"
[dependencies]
# workspace
eyre = { workspace = true, default-features = false }
tokio-tungstenite = { workspace = true }
lool = { workspace = true, features = [
"logger",
"sched.tokio",
"sched.rule-recurrence",
"macros",
] }
# internal
entities = { path = "../entities" }
# external
chrono = "0.4.37"
async-trait = "0.1.79"
getset = "0.1.2"
[build-dependencies]
tonic-build = "0.11.0"

6
rustlers/src/lib.rs Normal file
View File

@ -0,0 +1,6 @@
pub mod rustlers;
pub mod rustlerjar;
pub mod svc;
use entities::{market, sea_orm::DatabaseConnection, ticker};

View File

@ -0,0 +1,82 @@
use entities::market;
use std::collections::HashMap;
use crate::rustlers::Rustler;
/// **🤠 » rustlerjar! macro**
///
/// A macro to create a `RustlerJar` with multiple Rustler instances and their corresponding
/// mappings.
///
/// **Usage**
///
/// ```rust
/// let rustler_jar = rustlerjar! {
/// "NYSE", "NASDAQ" => FooRustler,
/// "BINANCE" => BarRustler,
/// };
/// ```
#[macro_export]
macro_rules! rustlerjar {
($($($name:expr),* => $rustler:ident),* $(,)?) => {{
let mut instances: Vec<Box<dyn Rustler>> = Vec::new();
let mut mappings = std::collections::HashMap::new();
$(
let instance = Box::new($rustler::new());
$(
mappings.insert($name.to_string(), instance.name());
)*
instances.push(instance);
)*
$crate::rustlerjar::RustlerJar::new(instances, mappings)
}};
}
/// **🤠 » RustlerJar**
///
/// A `RustlerJar` is a collection of Rustlers and their corresponding mappings to the markets.
/// Which indicates which Rustler should be used for a given market. Rustlers are stored as
/// instances of `Box<dyn Rustler>`, and the mappings are stored as a `HashMap<String, String>` (
/// where the key is the market short name and the value is the Rustler name).
///
/// **Usage**
///
/// The easiest way to create a `RustlerJar` is by using the `rustlerjar!` macro.
/// ```rust
/// let rustler_jar = rustlerjar! {
/// "NYSE", "NASDAQ" => FooRustler,
/// "BINANCE" => BarRustler,
/// };
///
/// let rustler = rustler_jar.get(&market);
/// ```
pub struct RustlerJar {
rustlers: HashMap<String, Box<dyn Rustler>>,
mappings: HashMap<String, String>,
}
impl RustlerJar {
/// create a new `RustlerJar` with the given Rustlers and mappings.
///
/// **☢️ warn**: using the `rustlerjar!` macro is recommended
pub fn new(rustlers_list: Vec<Box<dyn Rustler>>, mappings: HashMap<String, String>) -> Self {
let mut rustlers = HashMap::new();
for rustler in rustlers_list {
rustlers.insert(rustler.name(), rustler);
}
Self { rustlers, mappings }
}
/// get the Rustler for the given market
pub fn get(&self, market: &market::Model) -> Option<&Box<dyn Rustler>> {
let key = self.get_key(market);
self.rustlers.get(key)
}
/// get the key from the mappings for the given market
fn get_key(&self, market: &market::Model) -> &str {
self.mappings.get(&market.short_name).unwrap()
}
}

330
rustlers/src/rustlers.rs Normal file
View File

@ -0,0 +1,330 @@
pub mod binance;
use {
async_trait::async_trait,
chrono::{DateTime, Local},
entities::{market, ticker},
std::collections::HashMap,
eyre::Result,
};
pub extern crate eyre;
pub extern crate chrono;
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum RustlerStatus {
Connecting,
Connected,
Disconnecting,
#[default]
Disconnected,
}
#[derive(Debug, Clone)]
pub enum MarketHourType {
Pre,
Regular,
Post,
Extended,
}
#[derive(Debug, Clone)]
pub struct Quote {
pub id: String,
pub market: String,
pub price: f64,
pub change_percent: Option<f64>,
pub time: Option<i64>,
pub market_hours: Option<MarketHourType>,
}
#[derive(Debug, Clone)]
pub struct RustlerOpts {
pub connect_on_start: bool,
pub connect_on_add: bool,
}
impl Default for RustlerOpts {
fn default() -> Self {
Self {
connect_on_start: true,
connect_on_add: true,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ScrapperCallbacks {
pub on_connected: Option<fn() -> Result<()>>,
pub on_disconnected: Option<fn() -> Result<()>>,
pub on_message: Option<fn(message: Quote) -> Result<()>>,
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Ticker {
pub symbol: String,
pub market: String,
}
impl Ticker {
pub fn from(t: &ticker::Model, m: &market::Model) -> Self {
Self {
symbol: t.symbol.clone(),
market: m.short_name.clone(),
}
}
}
pub trait RustlerAccessor {
// #region fields g&s
fn name(&self) -> String;
fn static_name() -> String
where
Self: Sized;
fn status(&self) -> &RustlerStatus;
fn set_status(&mut self, status: RustlerStatus) -> Result<()>;
fn next_run(&self) -> &DateTime<Local>;
fn set_next_run(&mut self, next_run: DateTime<Local>);
fn next_stop(&self) -> &Option<DateTime<Local>>;
fn set_next_stop(&mut self, next_stop: Option<DateTime<Local>>);
fn last_run(&self) -> &Option<DateTime<Local>>;
fn set_last_run(&mut self, last_run: Option<DateTime<Local>>);
fn last_stop(&self) -> &Option<DateTime<Local>>;
fn set_last_stop(&mut self, last_stop: Option<DateTime<Local>>);
fn last_update(&self) -> &Option<DateTime<Local>>;
fn set_last_update(&mut self, last_update: Option<DateTime<Local>>);
fn opts(&self) -> &RustlerOpts;
fn set_opts(&mut self, opts: RustlerOpts);
fn tickers(&self) -> &HashMap<String, Ticker>;
fn tickers_mut(&mut self) -> &mut HashMap<String, Ticker>;
fn set_tickers(&mut self, tickers: HashMap<String, Ticker>);
fn callbacks(&self) -> &Option<ScrapperCallbacks>;
fn set_callbacks(&mut self, callbacks: Option<ScrapperCallbacks>);
// #endregion
}
#[async_trait]
pub trait Rustler: RustlerAccessor + Send + Sync {
// #region Unimplemented trait functions
/// fn called after tickers are added to the rustler
fn on_add(&mut self, tickers: Vec<Ticker>) -> Result<()>;
/// fn called after tickers are deleted from the rustler
fn on_delete(&mut self, tickers: Vec<Ticker>) -> Result<()>;
/// connects the rustler to the data source
async fn connect(&mut self) -> Result<()>;
/// disconnects the rustler from the data source
async fn disconnect(&mut self) -> Result<()>;
// #endregion
/// should be called at construction time
async fn start(&mut self) -> Result<()> {
let opts = self.opts();
if opts.connect_on_start {
self.connect().await?;
}
Ok(())
}
/// updates last stop and last run times and calls the appropriate callback
///
/// should be called after the status of the rustler changes
fn handle_status_change(&mut self) -> Result<()> {
match self.status() {
RustlerStatus::Disconnected => {
self.set_last_stop(Some(Local::now()));
if let Some(callbacks) = self.callbacks() {
if let Some(on_disconnected) = callbacks.on_disconnected {
on_disconnected()?;
}
}
}
RustlerStatus::Connected => {
self.set_last_run(Some(Local::now()));
if let Some(callbacks) = self.callbacks() {
if let Some(on_connected) = callbacks.on_connected {
on_connected()?;
}
}
}
_ => {}
};
Ok(())
}
/// adds new tickers to the rustler
async fn add(&mut self, new_tickers: Vec<Ticker>) -> Result<()> {
let tickers = self.tickers_mut();
for new_ticker in &new_tickers {
// if the ticker already exists in the tickers map, skip it
if tickers.contains_key(&new_ticker.symbol) {
continue;
}
tickers.insert(new_ticker.symbol.clone(), new_ticker.clone());
}
if self.opts().connect_on_add {
// if disconnected, then connect the rustler
if self.status() == &RustlerStatus::Disconnected {
self.connect().await?;
}
}
self.on_add(new_tickers)?;
Ok(())
}
/// deletes tickers from the rustler
async fn delete(&mut self, new_tickers: Vec<Ticker>) -> Result<()> {
let tickers = self.tickers_mut();
for new_ticker in &new_tickers {
tickers.remove(&new_ticker.symbol);
}
// if after deleting the tickers the tickers map is
// empty, disconnect the rustler
if tickers.is_empty() {
self.disconnect().await?;
}
self.on_delete(new_tickers)?;
Ok(())
}
/// registers a new quote by passing it to the on_message callback
fn register_quote(&self, quote: Quote) -> Result<()> {
if let Some(callbacks) = self.callbacks() {
if let Some(on_message) = callbacks.on_message {
on_message(quote)?;
}
}
Ok(())
}
}
#[macro_export]
macro_rules! rustler_accessors {
(
$name:ident
) => {
fn name(&self) -> String {
stringify!($name).to_string()
}
fn static_name() -> String {
stringify!($name).to_string()
}
fn status(&self) -> &$crate::rustlers::RustlerStatus {
&self.status
}
fn set_status(&mut self, status: $crate::rustlers::RustlerStatus) -> $crate::rustlers::eyre::Result<()> {
self.status = status;
self.handle_status_change()?;
lool::logger::info!(
"Rustler {} status changed to {:?}",
self.name(),
self.status()
);
Ok(())
}
fn next_run(&self) -> &$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local> {
&self.next_run
}
fn set_next_run(&mut self, next_run: $crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>) {
self.next_run = next_run;
}
fn next_stop(&self) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> {
&self.next_stop
}
fn set_next_stop(&mut self, next_stop: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>) {
self.next_stop = next_stop;
}
fn last_run(&self) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> {
&self.last_run
}
fn set_last_run(&mut self, last_run: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>) {
self.last_run = last_run;
}
fn last_stop(&self) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> {
&self.last_stop
}
fn set_last_stop(&mut self, last_stop: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>) {
self.last_stop = last_stop;
}
fn last_update(&self) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> {
&self.last_update
}
fn set_last_update(&mut self, last_update: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>) {
self.last_update = last_update;
}
fn opts(&self) -> &$crate::rustlers::RustlerOpts {
&self.opts
}
fn set_opts(&mut self, opts: $crate::rustlers::RustlerOpts) {
self.opts = opts;
}
fn tickers(&self) -> &HashMap<String, $crate::rustlers::Ticker> {
&self.tickers
}
fn tickers_mut(&mut self) -> &mut HashMap<String, $crate::rustlers::Ticker> {
&mut self.tickers
}
fn set_tickers(&mut self, tickers: HashMap<String, $crate::rustlers::Ticker>) {
self.tickers = tickers;
}
fn callbacks(&self) -> &Option<$crate::rustlers::ScrapperCallbacks> {
&self.callbacks
}
fn set_callbacks(&mut self, callbacks: Option<$crate::rustlers::ScrapperCallbacks>) {
self.callbacks = callbacks;
}
};
}
// Define the macro
#[macro_export]
macro_rules! rustler {
// Entry point for the macro, takes the struct definition
(
$(#[$outer:meta])*
$vis:vis struct $name:ident { $($fields:tt)* }
) => {
// Expand to the struct with derives and the fields
$(#[$outer])*
#[derive(Debug, Clone, Default)]
$vis struct $name {
status: $crate::rustlers::RustlerStatus,
next_run: $crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>,
next_stop: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>,
last_run: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>,
last_stop: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>,
last_update: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>,
opts: $crate::rustlers::RustlerOpts,
tickers: HashMap<String, $crate::rustlers::Ticker>,
callbacks: Option<$crate::rustlers::ScrapperCallbacks>,
$($fields)*
}
// Implement the RustlerAccessor trait for the struct
impl $crate::rustlers::RustlerAccessor for $name {
$crate::rustler_accessors!($name);
}
};
}

View File

@ -0,0 +1,65 @@
use {
crate::{
rustler,
rustlers::{
Rustler, RustlerAccessor, RustlerStatus, Ticker
},
},
async_trait::async_trait,
eyre::Result,
lool::{cli::stylize::Stylize, logger::info},
std::collections::HashMap,
};
const BINANCE_WSS_URL: &str = "wss://stream.binance.com:9443/stream";
// const MANUAL_CLOSE_CODE: u16 = 4663;
// const MANUAL_CLOSE_REASON: &str = "Manually Disconnected";
rustler!(
/// 🤠 » **binance rustler**
///
/// A rustler that steals quotes from Binance
pub struct BinanceRustler {}
);
impl BinanceRustler {
pub fn new() -> impl Rustler {
Self::default()
}
}
#[async_trait]
impl Rustler for BinanceRustler {
async fn connect(&mut self) -> Result<()> {
if self.status == RustlerStatus::Connected || self.status == RustlerStatus::Connecting {
return Ok(());
}
self.set_status(RustlerStatus::Connecting)?;
info!(
"Connecting to Binance WSS: {}",
BINANCE_WSS_URL.bright_green()
);
Ok(())
}
async fn disconnect(&mut self) -> Result<()> {
info!("Disconnecting from Binance WSS");
Ok(())
}
fn on_add(&mut self, tickers: Vec<Ticker>) -> Result<()> {
info!("Adding tickers: {:?}", tickers);
Ok(())
}
fn on_delete(&mut self, tickers: Vec<Ticker>) -> Result<()> {
info!("Deleting tickers: {:?}", tickers);
Ok(())
}
}

187
rustlers/src/svc.rs Normal file
View File

@ -0,0 +1,187 @@
use entities::{market, sea_orm::DatabaseConnection, ticker};
use eyre::{Ok, Result};
use lool::{
logger::{info, warn},
s,
sched::{
recur, ruleset, scheduler::tokio::Scheduler, utils::parse_time, RecurrenceRuleSet,
SchedulingRule,
},
};
use std::collections::HashMap;
use crate::rustlers::{Rustler, Ticker};
// interface MarketExecData {
// entity: MarketModel;
// startJob?: Job;
// stopJob?: Job;
// }
// interface ScrapperData {
// markets?: MarketExecData[];
// // subscription?: Subscription
// }]]
struct MarketExecData {
// entity: MarketModel,
// startJob?: Job,
// stopJob?: Job,
}
type RustlerFactory = Box<dyn Send + Sync + FnMut(&market::Model) -> Option<Box<dyn Rustler>>>;
pub struct RustlersSvc {
market_svc: market::Service,
factory: RustlerFactory,
}
impl RustlersSvc {
pub async fn new(conn: DatabaseConnection) -> Self {
let market_svc = market::Service::new(conn).await;
let factory = |_mkt: &market::Model| None;
Self {
market_svc,
factory: Box::new(factory),
}
}
/// gets market data from the the database and starts
/// the corresponding rustler for each market
pub async fn start(&mut self) -> Result<()> {
info!("Starting rustlers");
let markets = self.market_svc.get_all_with_tickers().await?;
for (market, tickers) in markets {
self.start_rustler_for((market, tickers)).await?;
}
Ok(())
}
/// stops all rustlers and then starts them again
pub async fn restart(&self) -> Result<()> {
todo!()
}
/// gets the corresponding rustler for the given market and starts it
///
/// depending on the market configuraation, the rustler might be started
/// immediately or its start might be scheduled for a later time
///
/// this function also schedules the stop of the rustler at the end of the market
/// trading hours if the market is configured to stop at a specific time
async fn start_rustler_for(
&mut self,
market: (market::Model, Vec<ticker::Model>),
) -> Result<()> {
let (market, tickers) = market;
let rules = self.get_schedule_rules_for(&market)?;
let rustler = (self.factory)(&market);
if let Some(rustler) = rustler {
if let Some((start, end)) = rules {
let mut sched = Scheduler::new();
let start_name = format!("start-{}", market.short_name);
let end_name = format!("end-{}", market.short_name);
let start_job = sched.schedule(start_name, || async move {}, start).await;
let end_job = sched.schedule(end_name, || async move {}, end).await;
Ok(())
} else {
warn!("No schedule rules found for market '{}'", market.short_name);
Ok(())
}
} else {
warn!("No rustler found for market '{}'", market.short_name);
Ok(())
}
}
/// creates schedule rules for the given market
fn get_schedule_rules_for(
&self,
mkt: &market::Model,
) -> Result<Option<(SchedulingRule, SchedulingRule)>> {
if mkt.open_time.is_none() || mkt.close_time.is_none() {
return Ok(None);
}
let open_time = parse_time(&mkt.open_time.clone().unwrap())?;
let close_time = parse_time(&mkt.close_time.clone().unwrap())?;
let pre_offset = mkt.pre_market_offset.unwrap_or(0);
let post_offset = mkt.post_market_offset.unwrap_or(0);
let mut rule = ruleset();
rule.from_to_dow(mkt.opens_from.unwrap_or(0), mkt.opens_till.unwrap_or(0));
let start_rule = make_rule(&rule, open_time, pre_offset, Op::Sub);
let stop_rule = make_rule(&rule, close_time, post_offset, Op::Add);
Ok(Some((recur(&start_rule), recur(&stop_rule))))
}
/// get the rustler according to the market
fn get_rustler_for(&mut self, market: &market::Model) -> Option<Box<dyn Rustler>> {
let scrapper = (self.factory)(market);
scrapper
}
/// starts a rustler by adding the tickers to it
async fn start_rustler(
rustler: &mut Box<dyn Rustler>,
market: market::Model,
tickers: Vec<ticker::Model>,
) -> Result<()> {
if tickers.len() > 0 {
let tickers: Vec<Ticker> =
tickers.into_iter().map(|t| Ticker::from(&t, &market)).collect();
rustler.add(tickers).await?;
}
Ok(())
}
/// stops a rustler by deleting all its tickers
async fn stop_rustler(
rustler: &mut Box<dyn Rustler>,
market: market::Model,
tickers: Vec<ticker::Model>,
) -> Result<()> {
if tickers.len() > 0 {
let tickers: Vec<Ticker> =
tickers.into_iter().map(|t| Ticker::from(&t, &market)).collect();
rustler.delete(tickers).await?;
}
Ok(())
}
}
fn make_rule(
base: &RecurrenceRuleSet,
time: (u32, u32, u32),
offset: u32,
op: Op,
) -> RecurrenceRuleSet {
let (h, m, s) = time;
let h = match op {
Op::Add => h.saturating_add(offset),
Op::Sub => h.saturating_sub(offset),
};
let mut rule = base.clone();
rule.at_time(h, m, s);
rule
}
enum Op {
Add,
Sub,
}