From 4fd13330cf8f2862bac371734b6d47bb8f70015a Mon Sep 17 00:00:00 2001 From: Lucas Colombo Date: Thu, 18 Apr 2024 05:58:07 -0300 Subject: [PATCH] =?UTF-8?q?wip:=20=F0=9F=9A=A7=20core=20functionality?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/img/rustler.svg | 8 +- .vscode/settings.json | 6 +- Cargo.lock | 128 ++++++- Cargo.toml | 15 +- Taskfile.yaml | 5 + app/main.rs | 8 +- entities/Cargo.toml | 2 +- entities/src/lib.rs | 30 +- entities/src/market/svc.rs | 29 -- entities/src/{market/mod.rs => orm/market.rs} | 17 +- entities/src/{ => orm}/ticker.rs | 2 +- entities/src/services/market.rs | 37 ++ entities/src/services/ticker.rs | 40 +++ grpc/build.rs | 2 +- grpc/proto/market.proto | 14 +- grpc/proto/ticker.proto | 29 ++ grpc/src/lib.rs | 21 ++ grpc/src/server.rs | 101 +----- grpc/src/services/market.rs | 99 ++++++ grpc/src/services/ticker.rs | 105 ++++++ .../m20220101_000001_create_table_market.rs | 14 +- rustfmt.toml | 1 + rustlers/Cargo.toml | 31 ++ rustlers/src/lib.rs | 6 + rustlers/src/rustlerjar.rs | 82 +++++ rustlers/src/rustlers.rs | 330 ++++++++++++++++++ rustlers/src/rustlers/binance.rs | 65 ++++ rustlers/src/svc.rs | 187 ++++++++++ 28 files changed, 1238 insertions(+), 176 deletions(-) delete mode 100644 entities/src/market/svc.rs rename entities/src/{market/mod.rs => orm/market.rs} (70%) rename entities/src/{ => orm}/ticker.rs (94%) create mode 100644 entities/src/services/market.rs create mode 100644 entities/src/services/ticker.rs create mode 100644 grpc/proto/ticker.proto create mode 100644 grpc/src/services/market.rs create mode 100644 grpc/src/services/ticker.rs create mode 100644 rustlers/src/lib.rs create mode 100644 rustlers/src/rustlerjar.rs create mode 100644 rustlers/src/rustlers.rs create mode 100644 rustlers/src/rustlers/binance.rs create mode 100644 rustlers/src/svc.rs diff --git a/.github/img/rustler.svg b/.github/img/rustler.svg index fe6126a..3e148fd 100644 --- a/.github/img/rustler.svg +++ b/.github/img/rustler.svg @@ -6,10 +6,6 @@ } - - - - - - + + \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index 00de050..18edc72 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,7 +1,7 @@ { "files.exclude": { // 🧩 modules - "migration": true, + // "migration": true, // "entities": true, // ⚙️ @@ -24,12 +24,12 @@ // 🗑️ "check_size.py": true, "*.code-workspace": true, - // ".gitignore": true, + ".gitignore": true, ".vscode": true, ".git": true, // 🗄️ database - "rustler.db": true, + // "rustler.db": true, "rustler.db-shm": true, "rustler.db-wal": true, "rustler.db.bkp": true, diff --git a/Cargo.lock b/Cargo.lock index 67b1f78..955ae61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -317,9 +317,9 @@ checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" [[package]] name = "async-trait" -version = "0.1.78" +version = "0.1.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "461abc97219de0eaaf81fe3ef974a540158f3d079c2ab200f891f1a2ef201e85" +checksum = "a507401cad91ec6a857ed5513a2073c82a9b9048762b885bb98655b306964681" dependencies = [ "proc-macro2", "quote", @@ -358,7 +358,7 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", - "http", + "http 0.2.12", "http-body", "hyper", "itoa", @@ -384,7 +384,7 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", + "http 0.2.12", "http-body", "mime", "rustversion", @@ -566,14 +566,16 @@ checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" [[package]] name = "chrono" -version = "0.4.35" +version = "0.4.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eaf5903dcbc0a39312feb77df2ff4c76387d591b9fc7b04a238dcf8bb62639a" +checksum = "8a0d04d43504c61aa6c7531f1871dd0d418d91130162063b789da00fd7057a5e" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-targets 0.52.4", ] @@ -703,6 +705,12 @@ dependencies = [ "typenum", ] +[[package]] +name = "data-encoding" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" + [[package]] name = "der" version = "0.7.8" @@ -1066,6 +1074,18 @@ dependencies = [ "wasi", ] +[[package]] +name = "getset" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e45727250e75cc04ff2846a66397da8ef2b3db8e40e0cef4df67950a07621eb9" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "gimli" version = "0.28.1" @@ -1113,7 +1133,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap 2.2.6", "slab", "tokio", @@ -1214,6 +1234,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -1221,7 +1252,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", "pin-project-lite", ] @@ -1248,13 +1279,13 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", + "http 0.2.12", "http-body", "httparse", "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.6", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -1470,13 +1501,17 @@ dependencies = [ [[package]] name = "lool" -version = "0.0.5" +version = "0.1.4" source = "sparse+http://lugit.local/api/packages/lucodear/cargo/" -checksum = "c727e0c375380178e0b15f8f54f249e5e2a9e72c20475e8a7807f4c72d253106" +checksum = "4320ce16557a892f40c0a776371ac0c3cda2c9312c83065a4ab2a1facf571539" dependencies = [ "bitflags 2.5.0", + "chrono", "eyre", "log", + "num-traits", + "tokio", + "tokio_schedule", ] [[package]] @@ -2258,14 +2293,30 @@ dependencies = [ name = "rustler" version = "0.0.0-alpha.0" dependencies = [ + "chrono", "dotenvy", "entities", "eyre", "grpc", "lool", + "rustlers", "tokio", ] +[[package]] +name = "rustlers" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "entities", + "eyre", + "getset", + "lool", + "tokio-tungstenite", + "tonic-build", +] + [[package]] name = "rustversion" version = "1.0.14" @@ -3015,9 +3066,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.36.0" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", "bytes", @@ -3062,6 +3113,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -3076,6 +3139,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "tokio_schedule" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b4a14ab1658c39d137ebcc5fbaab364c9922a6cc04ab48b364546c2e6022256" +dependencies = [ + "chrono", + "tokio", +] + [[package]] name = "toml_datetime" version = "0.6.5" @@ -3105,7 +3178,7 @@ dependencies = [ "base64", "bytes", "h2", - "http", + "http 0.2.12", "http-body", "hyper", "hyper-timeout", @@ -3218,6 +3291,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -3274,6 +3366,12 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8parse" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 1272264..8d41e8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,21 +22,26 @@ name = "rustler" path = "app/main.rs" [workspace] -members = [".", "migration", "entities", "grpc"] +members = [".", "migration", "entities", "grpc", "rustlers"] [workspace.dependencies] -lool = { version = "0.0.5", registry = "lugit" } # crates: disable-check +lool = { version = "0.1.4", registry = "lugit" } # crates: disable-check eyre = { version = "0.6.12", default-features = false } +tokio-tungstenite = { version = "0.21.0" } [dependencies] # internal entities = { path = "entities" } grpc = { path = "grpc" } +rustlers = { path = "rustlers" } # workspace -eyre = { workspace = true, default-features = false, features = ["auto-install"] } -lool = { workspace = true, features = ["logger", "macros", "cli-stylize"] } +eyre = { workspace = true, default-features = false, features = [ + "auto-install", +] } +lool = { workspace = true } # external -tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread"] } +tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread"] } dotenvy = "0.15.7" +chrono = "0.4.37" diff --git a/Taskfile.yaml b/Taskfile.yaml index 8d3c82c..4cec3b4 100644 --- a/Taskfile.yaml +++ b/Taskfile.yaml @@ -34,6 +34,11 @@ tasks: cmds: - cargo +nightly fmt --all + test: + desc: 🧪 test lool + cmds: + - cargo nextest run --all-features --workspace + lint: desc: 🧶 lint rustler cmds: diff --git a/app/main.rs b/app/main.rs index 7997f22..4507978 100644 --- a/app/main.rs +++ b/app/main.rs @@ -1,7 +1,7 @@ use { dotenvy::dotenv, eyre::Result, - lool::logger::{info, ConsoleLogger, Level}, + lool::logger::{info, ConsoleLogger, Level}, tokio::{join, select}, }; // TODO: here we will trigger the start of both the grpc server and the websocket gateway @@ -13,8 +13,12 @@ async fn main() -> Result<()> { dotenv()?; let conn = entities::db::get_connection().await?; + let mut rustler = rustlers::svc::RustlersSvc::new(conn.clone()).await; - grpc::server::start(conn.clone()).await?; + let (grpc_res, rustlers_res) = join! { + grpc::server::start(conn.clone()), + rustler.start(), + }; info!("Shutting down"); Ok(()) diff --git a/entities/Cargo.toml b/entities/Cargo.toml index 037a431..a3ed977 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -11,7 +11,7 @@ path = "src/lib.rs" [dependencies] # common eyre = { workspace = true, default-features = false } -lool = { workspace = true } +lool = { workspace = true, features = [ "cli", "cli.stylize", "macros", "logger" ] } # external sea-orm = { version = "0.12.15", features = [ diff --git a/entities/src/lib.rs b/entities/src/lib.rs index ad52a1e..1d9a0a1 100644 --- a/entities/src/lib.rs +++ b/entities/src/lib.rs @@ -1,8 +1,28 @@ -pub mod market; -pub mod ticker; - pub use sea_orm; +mod orm { + #[path = "market.rs"] pub mod market; + #[path = "ticker.rs"] pub mod ticker; +} + +mod services { + #[path = "market.rs"] pub mod market; + #[path = "ticker.rs"] pub mod ticker; +} + +/// market entities and services +pub mod market { + pub use super::orm::market::*; + pub use super::services::market::*; +} + +/// ticker entities and services +pub mod ticker { + pub use super::orm::ticker::*; + pub use super::services::ticker::*; +} + +/// database connection stuff pub mod db { use { eyre::Result, @@ -25,14 +45,14 @@ pub mod db { conn_str } - pub async fn get_connection() -> Result> { + pub async fn get_connection() -> Result { let db_conn_str = std::env::var(RUSTLER_DATABASE).unwrap_or_else(|_| get_default_conn_str()); let mut conn_opts = ConnectOptions::new(db_conn_str.to_owned()); conn_opts.sqlx_logging(false); - let conn = Arc::new(Database::connect(conn_opts).await?); + let conn = Database::connect(conn_opts).await?; conn.query_one(Statement::from_string( DbBackend::Sqlite, diff --git a/entities/src/market/svc.rs b/entities/src/market/svc.rs deleted file mode 100644 index ba63b10..0000000 --- a/entities/src/market/svc.rs +++ /dev/null @@ -1,29 +0,0 @@ -use { - super::{Entity as MarketEntity, Model as MarketModel}, - eyre::Result, - sea_orm::{DatabaseConnection, EntityTrait, IntoActiveModel}, - std::{sync::Arc, time::Instant}, -}; - -pub struct Service { - conn: Arc, -} - -impl Service { - pub async fn new(conn: Arc) -> Self { - Self { conn } - } - - pub async fn get_all(&self) -> Result> { - // let conn = self.conn.unwrap(); - let markets = MarketEntity::find().all(&*self.conn).await?; - Ok(markets) - } - - pub async fn create(&self, market: MarketModel) -> Result { - let start = Instant::now(); - MarketEntity::insert(market.clone().into_active_model()).exec(&*self.conn).await?; - println!("insert market model took {:?}", start.elapsed()); - Ok(market) - } -} diff --git a/entities/src/market/mod.rs b/entities/src/orm/market.rs similarity index 70% rename from entities/src/market/mod.rs rename to entities/src/orm/market.rs index 3a277f5..402f366 100644 --- a/entities/src/market/mod.rs +++ b/entities/src/orm/market.rs @@ -1,7 +1,6 @@ //! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.15 -mod svc; + use sea_orm::entity::prelude::*; -pub use svc::Service; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "market")] @@ -10,13 +9,13 @@ pub struct Model { pub id: String, pub short_name: String, pub full_name: String, - pub opens_from: i32, - pub opens_till: i32, - pub open_time: String, - pub close_time: String, - pub pre_market_offset: i32, - pub post_market_offset: i32, - pub time_zone_offset: i32, + pub opens_from: Option, + pub opens_till: Option, + pub open_time: Option, + pub close_time: Option, + pub pre_market_offset: Option, + pub post_market_offset: Option, + pub time_zone_offset: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/entities/src/ticker.rs b/entities/src/orm/ticker.rs similarity index 94% rename from entities/src/ticker.rs rename to entities/src/orm/ticker.rs index 2e3e94c..676ad4e 100644 --- a/entities/src/ticker.rs +++ b/entities/src/orm/ticker.rs @@ -29,4 +29,4 @@ impl Related for Entity { } } -impl ActiveModelBehavior for ActiveModel {} +impl ActiveModelBehavior for ActiveModel {} \ No newline at end of file diff --git a/entities/src/services/market.rs b/entities/src/services/market.rs new file mode 100644 index 0000000..a04a8d3 --- /dev/null +++ b/entities/src/services/market.rs @@ -0,0 +1,37 @@ +use { + crate::{ + market::{Entity as Market, Model as MarketModel}, + ticker::{Entity as Ticker, Model as TickerModel}, + }, + eyre::Result, + sea_orm::{DatabaseConnection, DbErr, EntityTrait, IntoActiveModel} +}; + +pub struct Service { + conn: DatabaseConnection, +} + +impl Service { + pub async fn new(conn: DatabaseConnection) -> Self { + Self { conn } + } + + pub async fn get_all(&self) -> Result, DbErr> { + let markets = Market::find().all(&self.conn).await?; + Ok(markets) + } + + pub async fn create(&self, market: MarketModel) -> Result { + Market::insert(market.clone().into_active_model()).exec(&self.conn).await?; + Ok(market) + } + + pub async fn get_all_with_tickers( + &self, + ) -> Result)>, DbErr> { + let markets_with_tickers: Vec<(MarketModel, Vec)> = + Market::find().find_with_related(Ticker).all(&self.conn).await?; + + Ok(markets_with_tickers) + } +} diff --git a/entities/src/services/ticker.rs b/entities/src/services/ticker.rs new file mode 100644 index 0000000..bdf41d7 --- /dev/null +++ b/entities/src/services/ticker.rs @@ -0,0 +1,40 @@ +use { + crate::{ + orm::ticker, + ticker::{Entity as Ticker, Model as TickerModel}, + }, + eyre::Result, + sea_orm::{ColumnTrait, DatabaseConnection, DbErr, EntityTrait, IntoActiveModel, QueryFilter} +}; + +pub struct Service { + conn: DatabaseConnection, +} + +impl Service { + pub async fn new(conn: DatabaseConnection) -> Self { + Self { conn } + } + + pub async fn get_all(&self) -> Result, DbErr> { + let tickers = Ticker::find().all(&self.conn).await?; + Ok(tickers) + } + + pub async fn get(&self, id: String) -> Result, DbErr> { + let ticker = Ticker::find_by_id(id).one(&self.conn).await?; + Ok(ticker) + } + + pub async fn get_by_symbol(&self, symbol: String) -> Result, DbErr> { + let ticker = + Ticker::find().filter(ticker::Column::Symbol.eq(symbol)).one(&self.conn).await?; + + Ok(ticker) + } + + pub async fn create(&self, market: TickerModel) -> Result { + Ticker::insert(market.clone().into_active_model()).exec(&self.conn).await?; + Ok(market) + } +} diff --git a/grpc/build.rs b/grpc/build.rs index 710dccd..2d85665 100644 --- a/grpc/build.rs +++ b/grpc/build.rs @@ -1,5 +1,5 @@ fn main() { - let proto_files = vec!["./proto/rustler.proto", "./proto/market.proto"]; + let proto_files = vec!["./proto/rustler.proto", "./proto/market.proto", "./proto/ticker.proto"]; for proto_file in proto_files { compile_proto(proto_file); diff --git a/grpc/proto/market.proto b/grpc/proto/market.proto index 6b7eab1..c72a099 100644 --- a/grpc/proto/market.proto +++ b/grpc/proto/market.proto @@ -13,13 +13,13 @@ message Market { string id = 1; string short_name = 2; string full_name = 3; - int32 opens_from = 4; - int32 opens_till = 5; - string open_time = 6; - string close_time = 7; - int32 pre_market_offset = 8; - int32 post_market_offset = 9; - int32 time_zone_offset = 10; + optional uint32 opens_from = 4; + optional uint32 opens_till = 5; + optional string open_time = 6; + optional string close_time = 7; + optional uint32 pre_market_offset = 8; + optional uint32 post_market_offset = 9; + optional string time_zone_offset = 10; } message Markets { diff --git a/grpc/proto/ticker.proto b/grpc/proto/ticker.proto new file mode 100644 index 0000000..cbc3bc1 --- /dev/null +++ b/grpc/proto/ticker.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +package ticker; + +service TickerApi { + rpc GetAll (Empty) returns (Tickers) {} + rpc Create (Ticker) returns (Ticker) {} + rpc Get (TickerId) returns (Ticker) {} +} + +message TickerId { + string id = 1; +} + +message TickerSymbol { + string symbol = 1; +} + +message Empty { } + +message Ticker { + string id = 1; + string symbol = 2; + string market_id = 3; +} + +message Tickers { + repeated Ticker tickers = 1; +} diff --git a/grpc/src/lib.rs b/grpc/src/lib.rs index 74f47ad..193891e 100644 --- a/grpc/src/lib.rs +++ b/grpc/src/lib.rs @@ -1 +1,22 @@ +mod services { + use entities::sea_orm::{DbErr, SqlErr}; + + pub mod market; + pub mod ticker; + + pub(crate) fn handle_sql_err(err: DbErr, action: &str, entity_name: &str) -> tonic::Status { + let sqlerr = err.sql_err(); + + match sqlerr { + Some(SqlErr::UniqueConstraintViolation(_)) => { + tonic::Status::already_exists(format!("{} already exists", entity_name)) + } + Some(SqlErr::ForeignKeyConstraintViolation(_)) => { + tonic::Status::failed_precondition(format!("Related entity does not exist")) + } + _ => tonic::Status::internal(format!("Error {} {}", action, entity_name)), + } + } +} + pub mod server; diff --git a/grpc/src/server.rs b/grpc/src/server.rs index fd44659..1ddb0b1 100644 --- a/grpc/src/server.rs +++ b/grpc/src/server.rs @@ -1,93 +1,16 @@ use { - self::market_mod::Empty, - entities::{market, sea_orm::DatabaseConnection}, + crate::services, + entities::{market, sea_orm::DatabaseConnection, ticker}, eyre::Result, lool::{cli::stylize::Stylize, logger::info}, - market_mod::{ - market_api_server::{MarketApi, MarketApiServer}, - Market, Markets, - }, - std::{net::SocketAddr, sync::Arc, time::Instant}, - tonic::{transport::Server, Request, Response, Status}, + std::{net::SocketAddr, sync::Arc}, + tonic::transport::Server, }; -pub mod market_mod { - tonic::include_proto!("market"); -} - const RUSTLER_GRPC_API_ADDR: &str = "RUSTLER_GRPC_API_ADDR"; -impl Market { - fn into_model(self) -> market::Model { - market::Model { - id: self.id, - short_name: self.short_name, - full_name: self.full_name, - opens_from: self.opens_from, - opens_till: self.opens_till, - open_time: self.open_time, - close_time: self.close_time, - pre_market_offset: self.pre_market_offset, - post_market_offset: self.post_market_offset, - time_zone_offset: self.time_zone_offset, - } - } - - fn from_model(model: market::Model) -> Self { - Self { - id: model.id, - short_name: model.short_name, - full_name: model.full_name, - opens_from: model.opens_from, - opens_till: model.opens_till, - open_time: model.open_time, - close_time: model.close_time, - pre_market_offset: model.pre_market_offset, - post_market_offset: model.post_market_offset, - time_zone_offset: model.time_zone_offset, - } - } -} - -pub struct GrpcServer { - svc: market::Service, -} - -#[tonic::async_trait] -impl MarketApi for GrpcServer { - async fn get_all(&self, _: Request) -> Result, Status> { - let start = Instant::now(); - let response = if let Ok(mkts) = self.svc.get_all().await { - Ok(Response::new(Markets { - markets: mkts.into_iter().map(Market::from_model).collect(), - })) - } else { - Err(Status::internal("Failed to get markets")) - }; - - info!("`MarketApi.get_all` took {:?}", start.elapsed()); - - response - } - - async fn create(&self, market: Request) -> Result, Status> { - let start = Instant::now(); - let mkt = market.into_inner().into_model(); - - let response = if let Ok(m) = self.svc.create(mkt).await { - Ok(Response::new(Market::from_model(m))) - } else { - Err(Status::internal("Failed to create market")) - }; - - info!("`MarketApi.create` took {:?}", start.elapsed()); - - response - } -} - /// Starts the gRPC server -pub async fn start(conn: Arc) -> Result<()> { +pub async fn start(conn: DatabaseConnection) -> Result<()> { fn get_default_addr() -> String { let addr = "0.0.0.0:50051"; info!( @@ -101,14 +24,22 @@ pub async fn start(conn: Arc) -> Result<()> { let addr: SocketAddr = std::env::var(RUSTLER_GRPC_API_ADDR).unwrap_or_else(|_| get_default_addr()).parse()?; - let svc = market::Service::new(conn).await; - let server = GrpcServer { svc }; + let market_db = market::Service::new(conn.clone()).await; + let ticker_db = ticker::Service::new(conn.clone()).await; + + let market_grpc = services::market::GrpcServer { svc: market_db }; + let ticker_grpc = services::ticker::GrpcServer { svc: ticker_db }; info!( "🎉 gRPC server listening on {}", addr.clone().to_string().green() ); - Server::builder().add_service(MarketApiServer::new(server)).serve(addr).await?; + + Server::builder() + .add_service(market_grpc.svc()) + .add_service(ticker_grpc.svc()) + .serve(addr) + .await?; Ok(()) } diff --git a/grpc/src/services/market.rs b/grpc/src/services/market.rs new file mode 100644 index 0000000..dcd4d84 --- /dev/null +++ b/grpc/src/services/market.rs @@ -0,0 +1,99 @@ +use { + self::market_mod::Empty, + crate::services::handle_sql_err, + entities::market, + eyre::Result, + lool::logger::{error, info}, + market_mod::{ + market_api_server::{MarketApi, MarketApiServer}, + Market, Markets, + }, + std::{any::Any, fmt::Debug, time::Instant}, + tonic::{Request, Response, Status}, +}; + +pub mod market_mod { + tonic::include_proto!("market"); +} + +impl Market { + fn into_model(self) -> market::Model { + market::Model { + id: self.id, + short_name: self.short_name, + full_name: self.full_name, + opens_from: self.opens_from, + opens_till: self.opens_till, + open_time: self.open_time, + close_time: self.close_time, + pre_market_offset: self.pre_market_offset, + post_market_offset: self.post_market_offset, + time_zone_offset: self.time_zone_offset, + } + } + + fn from_model(model: market::Model) -> Self { + Self { + id: model.id, + short_name: model.short_name, + full_name: model.full_name, + opens_from: model.opens_from, + opens_till: model.opens_till, + open_time: model.open_time, + close_time: model.close_time, + pre_market_offset: model.pre_market_offset, + post_market_offset: model.post_market_offset, + time_zone_offset: model.time_zone_offset, + } + } +} + +pub struct GrpcServer { + pub(crate) svc: market::Service, +} + +impl GrpcServer { + pub fn log_if_err(&self, res: &Result) { + if let Err(err) = &res { + error!("{:?}", err); + } + } + + pub fn svc(self) -> MarketApiServer { + MarketApiServer::new(self) + } +} + +#[tonic::async_trait] +impl MarketApi for GrpcServer { + async fn get_all(&self, _: Request) -> Result, Status> { + let start = Instant::now(); + let result = self.svc.get_all().await; + self.log_if_err(&result); + + let response = match result { + Ok(mkts) => Ok(Response::new(Markets { + markets: mkts.to_owned().into_iter().map(Market::from_model).collect(), + })), + Err(err) => Err(handle_sql_err(err, "Getting", "markets")), + }; + + info!("`MarketApi.get_all` took {:?}", start.elapsed()); + response + } + + async fn create(&self, market: Request) -> Result, Status> { + let start = Instant::now(); + let mkt = market.into_inner().into_model(); + let result = self.svc.create(mkt).await; + self.log_if_err(&result); + + let response = match result { + Ok(m) => Ok(Response::new(Market::from_model(m))), + Err(err) => Err(handle_sql_err(err, "creating", "market")), + }; + + info!("`MarketApi.create` took {:?}", start.elapsed()); + response + } +} diff --git a/grpc/src/services/ticker.rs b/grpc/src/services/ticker.rs new file mode 100644 index 0000000..98084fa --- /dev/null +++ b/grpc/src/services/ticker.rs @@ -0,0 +1,105 @@ +use { + crate::services::handle_sql_err, + entities::ticker, + eyre::Result, + lool::logger::{error, info}, + std::{any::Any, fmt::Debug, time::Instant}, + ticker_mod::{ + ticker_api_server::{TickerApi, TickerApiServer}, + Empty, Ticker, TickerId, Tickers, + }, + tonic::{Request, Response, Status}, +}; + +pub mod ticker_mod { + tonic::include_proto!("ticker"); +} + +impl Ticker { + fn into_model(self) -> ticker::Model { + ticker::Model { + id: self.id, + symbol: self.symbol, + market_id: self.market_id, + } + } + + fn from_model(model: ticker::Model) -> Self { + Self { + id: model.id, + symbol: model.symbol, + market_id: model.market_id, + } + } +} + +pub struct GrpcServer { + pub(crate) svc: ticker::Service, +} + +impl GrpcServer { + pub fn log_if_err(&self, res: &Result) { + if let Err(err) = &res { + error!("{:?}", err); + } + } + + pub fn svc(self) -> TickerApiServer { + TickerApiServer::new(self) + } +} + +#[tonic::async_trait] +impl TickerApi for GrpcServer { + async fn get(&self, req: Request) -> Result, Status> { + let start = Instant::now(); + let mkt = req.into_inner(); + let result = self.svc.get(mkt.id).await; + self.log_if_err(&result); + + let response = match result { + Ok(m) => { + if let Some(m) = m { + Ok(Response::new(Ticker::from_model(m))) + } else { + Err(Status::not_found("Ticker not found")) + } + } + Err(err) => Err(handle_sql_err(err, "Getting", "ticker")), + }; + + info!("`TickerApi.get` took {:?}", start.elapsed()); + response + } + + async fn get_all(&self, _: Request) -> Result, Status> { + let start = Instant::now(); + let result = self.svc.get_all().await; + self.log_if_err(&result); + + let response = match result { + Ok(mkts) => Ok(Response::new(Tickers { + tickers: mkts.to_owned().into_iter().map(Ticker::from_model).collect(), + })), + Err(err) => Err(handle_sql_err(err, "Getting", "tickers")), + }; + + info!("`TickerApi.get_all` took {:?}", start.elapsed()); + response + } + + async fn create(&self, market: Request) -> Result, Status> { + let start = Instant::now(); + let mkt = market.into_inner().into_model(); + let result = self.svc.create(mkt).await; + self.log_if_err(&result); + + let response = match result { + Ok(m) => Ok(Response::new(Ticker::from_model(m))), + Err(err) => Err(handle_sql_err(err, "creating", "ticker")), + }; + + info!("`TickerApi.create` took {:?}", start.elapsed()); + response + } +} diff --git a/migration/src/m20220101_000001_create_table_market.rs b/migration/src/m20220101_000001_create_table_market.rs index ac24ca0..45c2c2f 100644 --- a/migration/src/m20220101_000001_create_table_market.rs +++ b/migration/src/m20220101_000001_create_table_market.rs @@ -14,13 +14,13 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Market::Id).string().not_null().primary_key()) .col(ColumnDef::new(Market::ShortName).string().not_null()) .col(ColumnDef::new(Market::FullName).string().not_null()) - .col(ColumnDef::new(Market::OpensFrom).integer().not_null()) - .col(ColumnDef::new(Market::OpensTill).integer().not_null()) - .col(ColumnDef::new(Market::OpenTime).string().not_null()) - .col(ColumnDef::new(Market::CloseTime).string().not_null()) - .col(ColumnDef::new(Market::PreMarketOffset).integer().not_null()) - .col(ColumnDef::new(Market::PostMarketOffset).integer().not_null()) - .col(ColumnDef::new(Market::TimeZoneOffset).integer().not_null()) + .col(ColumnDef::new(Market::OpensFrom).unsigned().null()) + .col(ColumnDef::new(Market::OpensTill).unsigned().null()) + .col(ColumnDef::new(Market::OpenTime).string().null()) + .col(ColumnDef::new(Market::CloseTime).string().null()) + .col(ColumnDef::new(Market::PreMarketOffset).unsigned().null()) + .col(ColumnDef::new(Market::PostMarketOffset).unsigned().null()) + .col(ColumnDef::new(Market::TimeZoneOffset).string().null()) .to_owned(), ) .await diff --git a/rustfmt.toml b/rustfmt.toml index f50e61a..76ddffc 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -4,3 +4,4 @@ chain_width = 100 comment_width = 80 imports_indent = "Block" imports_granularity = "One" +empty_item_single_line = true diff --git a/rustlers/Cargo.toml b/rustlers/Cargo.toml index e69de29..cd0587e 100644 --- a/rustlers/Cargo.toml +++ b/rustlers/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "rustlers" +version = "0.1.0" +edition = "2021" +publish = false + +[lib] +name = "rustlers" +path = "src/lib.rs" + +[dependencies] +# workspace +eyre = { workspace = true, default-features = false } +tokio-tungstenite = { workspace = true } +lool = { workspace = true, features = [ + "logger", + "sched.tokio", + "sched.rule-recurrence", + "macros", +] } + +# internal +entities = { path = "../entities" } + +# external +chrono = "0.4.37" +async-trait = "0.1.79" +getset = "0.1.2" + +[build-dependencies] +tonic-build = "0.11.0" diff --git a/rustlers/src/lib.rs b/rustlers/src/lib.rs new file mode 100644 index 0000000..0b463dc --- /dev/null +++ b/rustlers/src/lib.rs @@ -0,0 +1,6 @@ +pub mod rustlers; +pub mod rustlerjar; +pub mod svc; + +use entities::{market, sea_orm::DatabaseConnection, ticker}; + diff --git a/rustlers/src/rustlerjar.rs b/rustlers/src/rustlerjar.rs new file mode 100644 index 0000000..1a17438 --- /dev/null +++ b/rustlers/src/rustlerjar.rs @@ -0,0 +1,82 @@ +use entities::market; +use std::collections::HashMap; +use crate::rustlers::Rustler; + +/// **🤠 » rustlerjar! macro** +/// +/// A macro to create a `RustlerJar` with multiple Rustler instances and their corresponding +/// mappings. +/// +/// **Usage** +/// +/// ```rust +/// let rustler_jar = rustlerjar! { +/// "NYSE", "NASDAQ" => FooRustler, +/// "BINANCE" => BarRustler, +/// }; +/// ``` +#[macro_export] +macro_rules! rustlerjar { + ($($($name:expr),* => $rustler:ident),* $(,)?) => {{ + let mut instances: Vec> = Vec::new(); + let mut mappings = std::collections::HashMap::new(); + + $( + let instance = Box::new($rustler::new()); + $( + mappings.insert($name.to_string(), instance.name()); + )* + instances.push(instance); + )* + + $crate::rustlerjar::RustlerJar::new(instances, mappings) + }}; +} + +/// **🤠 » RustlerJar** +/// +/// A `RustlerJar` is a collection of Rustlers and their corresponding mappings to the markets. +/// Which indicates which Rustler should be used for a given market. Rustlers are stored as +/// instances of `Box`, and the mappings are stored as a `HashMap` ( +/// where the key is the market short name and the value is the Rustler name). +/// +/// **Usage** +/// +/// The easiest way to create a `RustlerJar` is by using the `rustlerjar!` macro. +/// ```rust +/// let rustler_jar = rustlerjar! { +/// "NYSE", "NASDAQ" => FooRustler, +/// "BINANCE" => BarRustler, +/// }; +/// +/// let rustler = rustler_jar.get(&market); +/// ``` +pub struct RustlerJar { + rustlers: HashMap>, + mappings: HashMap, +} + +impl RustlerJar { + /// create a new `RustlerJar` with the given Rustlers and mappings. + /// + /// **☢️ warn**: using the `rustlerjar!` macro is recommended + pub fn new(rustlers_list: Vec>, mappings: HashMap) -> Self { + let mut rustlers = HashMap::new(); + for rustler in rustlers_list { + rustlers.insert(rustler.name(), rustler); + } + + Self { rustlers, mappings } + } + + /// get the Rustler for the given market + pub fn get(&self, market: &market::Model) -> Option<&Box> { + let key = self.get_key(market); + self.rustlers.get(key) + } + + /// get the key from the mappings for the given market + fn get_key(&self, market: &market::Model) -> &str { + self.mappings.get(&market.short_name).unwrap() + } +} diff --git a/rustlers/src/rustlers.rs b/rustlers/src/rustlers.rs new file mode 100644 index 0000000..2599d1e --- /dev/null +++ b/rustlers/src/rustlers.rs @@ -0,0 +1,330 @@ +pub mod binance; + +use { + async_trait::async_trait, + chrono::{DateTime, Local}, + entities::{market, ticker}, + std::collections::HashMap, + eyre::Result, +}; + +pub extern crate eyre; +pub extern crate chrono; + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub enum RustlerStatus { + Connecting, + Connected, + Disconnecting, + #[default] + Disconnected, +} + +#[derive(Debug, Clone)] +pub enum MarketHourType { + Pre, + Regular, + Post, + Extended, +} + +#[derive(Debug, Clone)] +pub struct Quote { + pub id: String, + pub market: String, + pub price: f64, + pub change_percent: Option, + pub time: Option, + pub market_hours: Option, +} + +#[derive(Debug, Clone)] +pub struct RustlerOpts { + pub connect_on_start: bool, + pub connect_on_add: bool, +} + +impl Default for RustlerOpts { + fn default() -> Self { + Self { + connect_on_start: true, + connect_on_add: true, + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct ScrapperCallbacks { + pub on_connected: Option Result<()>>, + pub on_disconnected: Option Result<()>>, + pub on_message: Option Result<()>>, +} + +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct Ticker { + pub symbol: String, + pub market: String, +} + +impl Ticker { + pub fn from(t: &ticker::Model, m: &market::Model) -> Self { + Self { + symbol: t.symbol.clone(), + market: m.short_name.clone(), + } + } +} + +pub trait RustlerAccessor { + // #region fields g&s + fn name(&self) -> String; + + fn static_name() -> String + where + Self: Sized; + + fn status(&self) -> &RustlerStatus; + fn set_status(&mut self, status: RustlerStatus) -> Result<()>; + + fn next_run(&self) -> &DateTime; + fn set_next_run(&mut self, next_run: DateTime); + + fn next_stop(&self) -> &Option>; + fn set_next_stop(&mut self, next_stop: Option>); + + fn last_run(&self) -> &Option>; + fn set_last_run(&mut self, last_run: Option>); + + fn last_stop(&self) -> &Option>; + fn set_last_stop(&mut self, last_stop: Option>); + + fn last_update(&self) -> &Option>; + fn set_last_update(&mut self, last_update: Option>); + + fn opts(&self) -> &RustlerOpts; + fn set_opts(&mut self, opts: RustlerOpts); + + fn tickers(&self) -> &HashMap; + fn tickers_mut(&mut self) -> &mut HashMap; + fn set_tickers(&mut self, tickers: HashMap); + + fn callbacks(&self) -> &Option; + fn set_callbacks(&mut self, callbacks: Option); + // #endregion +} + +#[async_trait] +pub trait Rustler: RustlerAccessor + Send + Sync { + // #region Unimplemented trait functions + /// fn called after tickers are added to the rustler + fn on_add(&mut self, tickers: Vec) -> Result<()>; + /// fn called after tickers are deleted from the rustler + fn on_delete(&mut self, tickers: Vec) -> Result<()>; + /// connects the rustler to the data source + async fn connect(&mut self) -> Result<()>; + /// disconnects the rustler from the data source + async fn disconnect(&mut self) -> Result<()>; + // #endregion + + /// should be called at construction time + async fn start(&mut self) -> Result<()> { + let opts = self.opts(); + if opts.connect_on_start { + self.connect().await?; + } + Ok(()) + } + + /// updates last stop and last run times and calls the appropriate callback + /// + /// should be called after the status of the rustler changes + fn handle_status_change(&mut self) -> Result<()> { + match self.status() { + RustlerStatus::Disconnected => { + self.set_last_stop(Some(Local::now())); + + if let Some(callbacks) = self.callbacks() { + if let Some(on_disconnected) = callbacks.on_disconnected { + on_disconnected()?; + } + } + } + RustlerStatus::Connected => { + self.set_last_run(Some(Local::now())); + + if let Some(callbacks) = self.callbacks() { + if let Some(on_connected) = callbacks.on_connected { + on_connected()?; + } + } + } + _ => {} + }; + + Ok(()) + } + + /// adds new tickers to the rustler + async fn add(&mut self, new_tickers: Vec) -> Result<()> { + let tickers = self.tickers_mut(); + + for new_ticker in &new_tickers { + // if the ticker already exists in the tickers map, skip it + if tickers.contains_key(&new_ticker.symbol) { + continue; + } + + tickers.insert(new_ticker.symbol.clone(), new_ticker.clone()); + } + + if self.opts().connect_on_add { + // if disconnected, then connect the rustler + if self.status() == &RustlerStatus::Disconnected { + self.connect().await?; + } + } + + self.on_add(new_tickers)?; + Ok(()) + } + + /// deletes tickers from the rustler + async fn delete(&mut self, new_tickers: Vec) -> Result<()> { + let tickers = self.tickers_mut(); + + for new_ticker in &new_tickers { + tickers.remove(&new_ticker.symbol); + } + + // if after deleting the tickers the tickers map is + // empty, disconnect the rustler + if tickers.is_empty() { + self.disconnect().await?; + } + + self.on_delete(new_tickers)?; + Ok(()) + } + + /// registers a new quote by passing it to the on_message callback + fn register_quote(&self, quote: Quote) -> Result<()> { + if let Some(callbacks) = self.callbacks() { + if let Some(on_message) = callbacks.on_message { + on_message(quote)?; + } + } + + Ok(()) + } +} + +#[macro_export] +macro_rules! rustler_accessors { + ( + $name:ident + ) => { + fn name(&self) -> String { + stringify!($name).to_string() + } + fn static_name() -> String { + stringify!($name).to_string() + } + fn status(&self) -> &$crate::rustlers::RustlerStatus { + &self.status + } + fn set_status(&mut self, status: $crate::rustlers::RustlerStatus) -> $crate::rustlers::eyre::Result<()> { + self.status = status; + self.handle_status_change()?; + + lool::logger::info!( + "Rustler {} status changed to {:?}", + self.name(), + self.status() + ); + + Ok(()) + } + fn next_run(&self) -> &$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local> { + &self.next_run + } + fn set_next_run(&mut self, next_run: $crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>) { + self.next_run = next_run; + } + fn next_stop(&self) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> { + &self.next_stop + } + fn set_next_stop(&mut self, next_stop: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>) { + self.next_stop = next_stop; + } + fn last_run(&self) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> { + &self.last_run + } + fn set_last_run(&mut self, last_run: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>) { + self.last_run = last_run; + } + fn last_stop(&self) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> { + &self.last_stop + } + fn set_last_stop(&mut self, last_stop: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>) { + self.last_stop = last_stop; + } + fn last_update(&self) -> &Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>> { + &self.last_update + } + fn set_last_update(&mut self, last_update: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>) { + self.last_update = last_update; + } + fn opts(&self) -> &$crate::rustlers::RustlerOpts { + &self.opts + } + fn set_opts(&mut self, opts: $crate::rustlers::RustlerOpts) { + self.opts = opts; + } + fn tickers(&self) -> &HashMap { + &self.tickers + } + fn tickers_mut(&mut self) -> &mut HashMap { + &mut self.tickers + } + fn set_tickers(&mut self, tickers: HashMap) { + self.tickers = tickers; + } + fn callbacks(&self) -> &Option<$crate::rustlers::ScrapperCallbacks> { + &self.callbacks + } + fn set_callbacks(&mut self, callbacks: Option<$crate::rustlers::ScrapperCallbacks>) { + self.callbacks = callbacks; + } + }; +} + +// Define the macro +#[macro_export] +macro_rules! rustler { + // Entry point for the macro, takes the struct definition + ( + $(#[$outer:meta])* + $vis:vis struct $name:ident { $($fields:tt)* } + ) => { + // Expand to the struct with derives and the fields + $(#[$outer])* + #[derive(Debug, Clone, Default)] + $vis struct $name { + status: $crate::rustlers::RustlerStatus, + next_run: $crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>, + next_stop: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>, + last_run: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>, + last_stop: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>, + last_update: Option<$crate::rustlers::chrono::DateTime<$crate::rustlers::chrono::Local>>, + opts: $crate::rustlers::RustlerOpts, + tickers: HashMap, + callbacks: Option<$crate::rustlers::ScrapperCallbacks>, + $($fields)* + } + + // Implement the RustlerAccessor trait for the struct + impl $crate::rustlers::RustlerAccessor for $name { + $crate::rustler_accessors!($name); + } + }; +} diff --git a/rustlers/src/rustlers/binance.rs b/rustlers/src/rustlers/binance.rs new file mode 100644 index 0000000..28c15f2 --- /dev/null +++ b/rustlers/src/rustlers/binance.rs @@ -0,0 +1,65 @@ +use { + crate::{ + rustler, + rustlers::{ + Rustler, RustlerAccessor, RustlerStatus, Ticker + }, + }, + async_trait::async_trait, + eyre::Result, + lool::{cli::stylize::Stylize, logger::info}, + std::collections::HashMap, +}; + +const BINANCE_WSS_URL: &str = "wss://stream.binance.com:9443/stream"; +// const MANUAL_CLOSE_CODE: u16 = 4663; +// const MANUAL_CLOSE_REASON: &str = "Manually Disconnected"; + +rustler!( + /// 🤠 » **binance rustler** + /// + /// A rustler that steals quotes from Binance + pub struct BinanceRustler {} +); + +impl BinanceRustler { + pub fn new() -> impl Rustler { + Self::default() + } +} + +#[async_trait] +impl Rustler for BinanceRustler { + async fn connect(&mut self) -> Result<()> { + if self.status == RustlerStatus::Connected || self.status == RustlerStatus::Connecting { + return Ok(()); + } + + self.set_status(RustlerStatus::Connecting)?; + + info!( + "Connecting to Binance WSS: {}", + BINANCE_WSS_URL.bright_green() + ); + + Ok(()) + } + + async fn disconnect(&mut self) -> Result<()> { + info!("Disconnecting from Binance WSS"); + + Ok(()) + } + + fn on_add(&mut self, tickers: Vec) -> Result<()> { + info!("Adding tickers: {:?}", tickers); + + Ok(()) + } + + fn on_delete(&mut self, tickers: Vec) -> Result<()> { + info!("Deleting tickers: {:?}", tickers); + + Ok(()) + } +} diff --git a/rustlers/src/svc.rs b/rustlers/src/svc.rs new file mode 100644 index 0000000..b2639ab --- /dev/null +++ b/rustlers/src/svc.rs @@ -0,0 +1,187 @@ +use entities::{market, sea_orm::DatabaseConnection, ticker}; +use eyre::{Ok, Result}; +use lool::{ + logger::{info, warn}, + s, + sched::{ + recur, ruleset, scheduler::tokio::Scheduler, utils::parse_time, RecurrenceRuleSet, + SchedulingRule, + }, +}; +use std::collections::HashMap; + +use crate::rustlers::{Rustler, Ticker}; + +// interface MarketExecData { +// entity: MarketModel; +// startJob?: Job; +// stopJob?: Job; +// } + +// interface ScrapperData { +// markets?: MarketExecData[]; +// // subscription?: Subscription +// }]] + +struct MarketExecData { + // entity: MarketModel, + // startJob?: Job, + // stopJob?: Job, +} + +type RustlerFactory = Box Option>>; + +pub struct RustlersSvc { + market_svc: market::Service, + factory: RustlerFactory, +} + +impl RustlersSvc { + pub async fn new(conn: DatabaseConnection) -> Self { + let market_svc = market::Service::new(conn).await; + let factory = |_mkt: &market::Model| None; + + Self { + market_svc, + factory: Box::new(factory), + } + } + + /// gets market data from the the database and starts + /// the corresponding rustler for each market + pub async fn start(&mut self) -> Result<()> { + info!("Starting rustlers"); + let markets = self.market_svc.get_all_with_tickers().await?; + + for (market, tickers) in markets { + self.start_rustler_for((market, tickers)).await?; + } + + Ok(()) + } + + /// stops all rustlers and then starts them again + pub async fn restart(&self) -> Result<()> { + todo!() + } + + /// gets the corresponding rustler for the given market and starts it + /// + /// depending on the market configuraation, the rustler might be started + /// immediately or its start might be scheduled for a later time + /// + /// this function also schedules the stop of the rustler at the end of the market + /// trading hours if the market is configured to stop at a specific time + async fn start_rustler_for( + &mut self, + market: (market::Model, Vec), + ) -> Result<()> { + let (market, tickers) = market; + let rules = self.get_schedule_rules_for(&market)?; + let rustler = (self.factory)(&market); + + if let Some(rustler) = rustler { + if let Some((start, end)) = rules { + let mut sched = Scheduler::new(); + let start_name = format!("start-{}", market.short_name); + let end_name = format!("end-{}", market.short_name); + + let start_job = sched.schedule(start_name, || async move {}, start).await; + let end_job = sched.schedule(end_name, || async move {}, end).await; + + Ok(()) + } else { + warn!("No schedule rules found for market '{}'", market.short_name); + Ok(()) + } + } else { + warn!("No rustler found for market '{}'", market.short_name); + Ok(()) + } + } + + /// creates schedule rules for the given market + fn get_schedule_rules_for( + &self, + mkt: &market::Model, + ) -> Result> { + if mkt.open_time.is_none() || mkt.close_time.is_none() { + return Ok(None); + } + + let open_time = parse_time(&mkt.open_time.clone().unwrap())?; + let close_time = parse_time(&mkt.close_time.clone().unwrap())?; + let pre_offset = mkt.pre_market_offset.unwrap_or(0); + let post_offset = mkt.post_market_offset.unwrap_or(0); + + let mut rule = ruleset(); + rule.from_to_dow(mkt.opens_from.unwrap_or(0), mkt.opens_till.unwrap_or(0)); + + let start_rule = make_rule(&rule, open_time, pre_offset, Op::Sub); + let stop_rule = make_rule(&rule, close_time, post_offset, Op::Add); + + Ok(Some((recur(&start_rule), recur(&stop_rule)))) + } + + /// get the rustler according to the market + fn get_rustler_for(&mut self, market: &market::Model) -> Option> { + let scrapper = (self.factory)(market); + + + scrapper + } + + /// starts a rustler by adding the tickers to it + async fn start_rustler( + rustler: &mut Box, + market: market::Model, + tickers: Vec, + ) -> Result<()> { + if tickers.len() > 0 { + let tickers: Vec = + tickers.into_iter().map(|t| Ticker::from(&t, &market)).collect(); + + rustler.add(tickers).await?; + } + + Ok(()) + } + + /// stops a rustler by deleting all its tickers + async fn stop_rustler( + rustler: &mut Box, + market: market::Model, + tickers: Vec, + ) -> Result<()> { + if tickers.len() > 0 { + let tickers: Vec = + tickers.into_iter().map(|t| Ticker::from(&t, &market)).collect(); + + rustler.delete(tickers).await?; + } + + Ok(()) + } +} + +fn make_rule( + base: &RecurrenceRuleSet, + time: (u32, u32, u32), + offset: u32, + op: Op, +) -> RecurrenceRuleSet { + let (h, m, s) = time; + let h = match op { + Op::Add => h.saturating_add(offset), + Op::Sub => h.saturating_sub(offset), + }; + + let mut rule = base.clone(); + rule.at_time(h, m, s); + rule +} + +enum Op { + Add, + Sub, +}