From 3393edfd90a19d0f587a979bc61f6217746bca3f Mon Sep 17 00:00:00 2001 From: Lucas Colombo Date: Tue, 4 Jun 2024 10:43:44 -0300 Subject: [PATCH] =?UTF-8?q?feat:=20=E2=9C=A8=20make=20SourceStream=20more?= =?UTF-8?q?=20reusable?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/bus/mod.rs | 3 ++- lib/bus/redis/stream/mod.rs | 9 +++++---- lib/rustlers/rustler.rs | 3 ++- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/lib/bus/mod.rs b/lib/bus/mod.rs index d5e6fa2..9aed019 100644 --- a/lib/bus/mod.rs +++ b/lib/bus/mod.rs @@ -1,6 +1,7 @@ use std::{fmt::Debug, pin::Pin}; use futures::Stream; +use redis::stream::StreamMsg; use {eyre::Result, tonic::async_trait}; @@ -24,7 +25,7 @@ pub trait ToFromBusMessage { /// ๐ŸŽ ยป supertrait combining all bus object traits + debug + send + sync + 'static pub trait BusMessage: - ToBusVal + ToBusKey + ToFromBusMessage + Debug + Clone + Send + Sync + PartialEq + 'static + ToBusVal + ToBusKey + ToFromBusMessage + Debug + PartialEq + StreamMsg + 'static { } diff --git a/lib/bus/redis/stream/mod.rs b/lib/bus/redis/stream/mod.rs index a5c0e6a..3f6e058 100644 --- a/lib/bus/redis/stream/mod.rs +++ b/lib/bus/redis/stream/mod.rs @@ -6,19 +6,20 @@ use { tokio::sync::broadcast::{self, Sender}, }; -use crate::bus::BusMessage; +pub trait StreamMsg: Clone + Send + Sync + 'static {} -pub struct SourceStream { +// TODO: move this to a separate module, or maybe to the lool library +pub struct SourceStream { sender: Option>, } -impl Default for SourceStream { +impl Default for SourceStream { fn default() -> Self { Self::new() } } -impl SourceStream { +impl SourceStream { // Create a new SourceStream with a broadcast channel pub fn new() -> Self { let (sender, _) = broadcast::channel(100); // Adjust the buffer size as needed diff --git a/lib/rustlers/rustler.rs b/lib/rustlers/rustler.rs index b1d61f0..18e8af3 100644 --- a/lib/rustlers/rustler.rs +++ b/lib/rustlers/rustler.rs @@ -4,7 +4,7 @@ pub extern crate eyre; use { super::svc::RustlerMsg, crate::{ - bus::{BusMessage, ToBusKey, ToBusVal, ToFromBusMessage}, + bus::{redis::stream::StreamMsg, BusMessage, ToBusKey, ToBusVal, ToFromBusMessage}, entities::{market, ticker}, }, async_trait::async_trait, @@ -163,6 +163,7 @@ impl PartialEq for Quote { } } +impl StreamMsg for Quote {} impl BusMessage for Quote {} #[derive(Debug, Clone)]