diff --git a/lib/entities/orm/market.rs b/lib/entities/orm/market.rs index c16daf9..3812041 100644 --- a/lib/entities/orm/market.rs +++ b/lib/entities/orm/market.rs @@ -2,9 +2,9 @@ use sea_orm::entity::prelude::*; +/// 🐎 » market entity model #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "market")] -/// 🐎 » market entity model pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub id: String, diff --git a/lib/entities/orm/ticker.rs b/lib/entities/orm/ticker.rs index 7ce060e..6bca68e 100644 --- a/lib/entities/orm/ticker.rs +++ b/lib/entities/orm/ticker.rs @@ -2,9 +2,9 @@ use sea_orm::entity::prelude::*; +/// 🐎 » ticker entity model #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "ticker")] -/// 🐎 » ticker entity model pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub id: String, diff --git a/lib/entities/services/ticker.rs b/lib/entities/services/ticker.rs index 68dc8d9..36a3fad 100644 --- a/lib/entities/services/ticker.rs +++ b/lib/entities/services/ticker.rs @@ -13,20 +13,24 @@ pub struct Service { } impl Service { + /// 🐎 » creates a new `Ticker` service pub async fn new(conn: DatabaseConnection) -> Self { Self { conn } } + /// 🐎 » retrieves all tickers from the database pub async fn get_all(&self) -> Result, DbErr> { let tickers = Ticker::find().all(&self.conn).await?; Ok(tickers) } + /// 🐎 » retrieves a ticker from the database, given its id pub async fn get(&self, id: String) -> Result, DbErr> { let ticker = Ticker::find_by_id(id).one(&self.conn).await?; Ok(ticker) } + /// 🐎 » retrieves a ticker from the database, given its symbol pub async fn get_by_symbol(&self, symbol: String) -> Result, DbErr> { let ticker = Ticker::find().filter(ticker::Column::Symbol.eq(symbol)).one(&self.conn).await?; @@ -34,6 +38,7 @@ impl Service { Ok(ticker) } + /// 🐎 » creates a new ticker in the database pub async fn create(&self, market: TickerModel) -> Result { Ticker::insert(market.clone().into_active_model()).exec(&self.conn).await?; Ok(market) diff --git a/lib/grpc/server.rs b/lib/grpc/server.rs index 147ef69..a8a1bb5 100644 --- a/lib/grpc/server.rs +++ b/lib/grpc/server.rs @@ -10,6 +10,7 @@ use { tonic::transport::Server, }; +/// the environment variable to set the gRPC server address const RUSTLER_GRPC_API_ADDR: &str = "RUSTLER_GRPC_API_ADDR"; /// 🐎 » starts the rustler gRPC server diff --git a/lib/grpc/services/market.rs b/lib/grpc/services/market.rs index 3afe318..59b3d15 100644 --- a/lib/grpc/services/market.rs +++ b/lib/grpc/services/market.rs @@ -16,6 +16,7 @@ pub mod market_mod { } impl Market { + /// 🐎 » converts a `Market` entity from gRPC to a database sea-orm `market::Model` fn into_model(self) -> market::Model { market::Model { id: self.id, @@ -31,6 +32,7 @@ impl Market { } } + /// 🐎 » converts a `market::Model` database entity to a gRPC `Market` entity fn from_model(model: market::Model) -> Self { Self { id: model.id, @@ -67,6 +69,7 @@ impl GrpcServer { #[tonic::async_trait] impl MarketApi for GrpcServer { + /// retrieves and returns a market entity from the database, given its id async fn get_all(&self, _: Request) -> Result, Status> { let start = Instant::now(); let result = self.svc.get_all().await; @@ -83,6 +86,7 @@ impl MarketApi for GrpcServer { response } + /// retrieves and returns all market entities from the database async fn create(&self, market: Request) -> Result, Status> { let start = Instant::now(); let mkt = market.into_inner().into_model(); diff --git a/lib/grpc/services/ticker.rs b/lib/grpc/services/ticker.rs index 9fff7d8..a612291 100644 --- a/lib/grpc/services/ticker.rs +++ b/lib/grpc/services/ticker.rs @@ -15,6 +15,7 @@ pub mod ticker_mod { } impl Ticker { + /// 🐎 » converts a `Ticker` entity from gRPC to a database sea-orm `ticker::Model` fn into_model(self) -> ticker::Model { ticker::Model { id: self.id, @@ -23,6 +24,7 @@ impl Ticker { } } + /// 🐎 » converts a `ticker::Model` database entity to a gRPC `Ticker` entity fn from_model(model: ticker::Model) -> Self { Self { id: model.id, @@ -52,6 +54,7 @@ impl GrpcServer { #[tonic::async_trait] impl TickerApi for GrpcServer { + /// retrieves and returns a ticker entity from the database, given its id async fn get(&self, req: Request) -> Result, Status> { let start = Instant::now(); let mkt = req.into_inner(); @@ -73,6 +76,7 @@ impl TickerApi for GrpcServer { response } + /// retrieves and returns all ticker entities from the database async fn get_all(&self, _: Request) -> Result, Status> { let start = Instant::now(); let result = self.svc.get_all().await; @@ -89,6 +93,7 @@ impl TickerApi for GrpcServer { response } + /// creates a new ticker entity in the database async fn create(&self, market: Request) -> Result, Status> { let start = Instant::now(); let mkt = market.into_inner().into_model(); diff --git a/lib/rustlers/bus/publish.rs b/lib/rustlers/bus/publish.rs index 2d95127..eaf3419 100644 --- a/lib/rustlers/bus/publish.rs +++ b/lib/rustlers/bus/publish.rs @@ -5,6 +5,8 @@ use { }; /// 🐎 » bus **Publisher** +/// +/// allows to push a message or resource to the bus #[derive(Clone)] pub struct Publisher { conn: MultiplexedConnection, diff --git a/lib/rustlers/bus/subscribe.rs b/lib/rustlers/bus/subscribe.rs index af6980a..c7c3fec 100644 --- a/lib/rustlers/bus/subscribe.rs +++ b/lib/rustlers/bus/subscribe.rs @@ -14,6 +14,8 @@ use { // https://github.com/exein-io/pulsar/blob/99ad35c8d13eaf1a37d7b6a9dcb812a5a1231d00/crates/pulsar-core/src/bus.rs /// 🐎 » bus **Subscriber** +/// +/// allows to subscribe to a redis key pattern and receive messages from the redis bus pub struct Subscriber { // TODO: replace with storing tokio multiplexed connection like in publish.rs when redis@0.26.0 // is released see https://github.com/redis-rs/redis-rs/issues/1137. @@ -50,17 +52,33 @@ impl Subscriber { }) } + /// 🐎 » set the pattern to subscribe to pub fn with_pattern(&mut self, pattern: &str) -> &mut Self { self.pattern = s!(pattern); self } + /// 🐎 » returns the pattern used to subscribe to the redis bus, including the prefix if set pub fn get_pattern(&self) -> String { key(self.get_prefix(), self.pattern.clone()) } - /// 🐎 » subscribe to a channel - pub async fn start_streaming(&mut self) -> Result<()> { + /// 🐎 » **stream** + /// + /// returns an `Observable` stream of messages from the redis bus + pub async fn stream(&mut self) -> Result> { + if self.subject.is_none() { + self.start_streaming().await?; + } + + match self.subject.as_ref() { + Some(subject) => Ok(subject.clone().box_it()), + None => fail!("Could not start streaming messages from redis bus"), + } + } + + /// subscribe to the redis feed + async fn start_streaming(&mut self) -> Result<()> { if self.subject.is_none() { self.subject = Some(SubjectThreads::default()); } @@ -99,15 +117,4 @@ impl Subscriber { Ok(()) } - - pub async fn stream(&mut self) -> Result> { - if self.subject.is_none() { - self.start_streaming().await?; - } - - match self.subject.as_ref() { - Some(subject) => Ok(subject.clone().box_it()), - None => fail!("Could not start streaming messages from redis bus"), - } - } } diff --git a/lib/rustlers/rustler.rs b/lib/rustlers/rustler.rs index ced2716..c7bfef5 100644 --- a/lib/rustlers/rustler.rs +++ b/lib/rustlers/rustler.rs @@ -14,6 +14,7 @@ use { }, }; +/// 🐎 » a struct representing the status of a rustler at a given time #[derive(Debug, Clone, PartialEq, Eq, Default)] pub enum RustlerStatus { Connecting, @@ -23,6 +24,7 @@ pub enum RustlerStatus { Disconnected, } +/// 🐎 » an enum representing the different types of market hours #[derive(Debug, Clone)] pub enum MarketHourType { Pre = 0, @@ -49,6 +51,8 @@ impl From for MarketHourType { } } +/// 🐎 » a struct storing a ticker's quote at a given time, and the change in price since the last +/// quote #[derive(Debug, Clone)] pub struct Quote { pub id: String, @@ -93,6 +97,9 @@ impl ToRedisKey for Quote { } impl ToFromRedisMessage for Quote { + /// 🐎 » converts a `Quote` to a serialized message that can be sent over a redis channel + /// + /// the message is in the format `id¦market¦price¦change_percent¦time¦market_hours` fn as_message(&self) -> String { // id¦market¦price¦change_percent¦time¦market_hours format!( @@ -349,6 +356,9 @@ pub trait Rustler: RustlerAccessor + Send + Sync { } } +/// macro that expands to the accessor functions for a `Rustler` struct +/// +/// for internal use #[macro_export] macro_rules! rustler_accessors { ( diff --git a/lib/rustlers/rustlerjar.rs b/lib/rustlers/rustlerjar.rs index d7232f9..c367e80 100644 --- a/lib/rustlers/rustlerjar.rs +++ b/lib/rustlers/rustlerjar.rs @@ -80,6 +80,7 @@ impl RustlerJar { self.rustlers.get(key) } + /// get the mutable Rustler for the given market as a mutable reference pub fn get_mut(&mut self, market: &market::Model) -> Option<&mut Arc>>> { let key = self.get_key(market).to_owned(); self.rustlers.get_mut(&key)