diff --git a/Cargo.lock b/Cargo.lock index b0a7703..ebe13ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1931,6 +1931,7 @@ dependencies = [ name = "rustler-core" version = "0.3.0" dependencies = [ + "async-stream", "async-trait", "chrono", "dotenvy", diff --git a/Cargo.toml b/Cargo.toml index 2499486..ae3b98c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ lool = { version = "^0.3.2", registry = "lugit", features = [ "sched.rule-recurrence", "macros", ] } +async-stream = "0.3.5" [build-dependencies] tonic-build = "0.11.0" diff --git a/lib/bus/redis/stream/mod.rs b/lib/bus/redis/stream/mod.rs index 33fa96c..a5c0e6a 100644 --- a/lib/bus/redis/stream/mod.rs +++ b/lib/bus/redis/stream/mod.rs @@ -2,11 +2,8 @@ use { eyre::Result, futures::Stream, lool::fail, - std::{ - pin::Pin, - task::{Context, Poll}, - }, - tokio::sync::broadcast::{self, Receiver, Sender}, + std::pin::Pin, + tokio::sync::broadcast::{self, Sender}, }; use crate::bus::BusMessage; @@ -37,29 +34,17 @@ impl SourceStream { // Subscribe to the stream pub fn subscribe(&self) -> Result + Send + 'static>>> { if let Some(sender) = &self.sender { - let receiver = sender.subscribe(); - Ok(Box::pin(BroadcastStream { receiver })) + let mut receiver = sender.subscribe(); + + let stream = async_stream::stream! { + while let Ok(item) = receiver.recv().await { + yield item; + } + }; + + Ok(Box::pin(stream)) } else { fail!("SourceStream has been consumed") } } } - -// Wrapper around Receiver to implement Stream -struct BroadcastStream { - receiver: Receiver, -} - -impl Stream for BroadcastStream { - type Item = RM; - - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - // Spawn an async task to receive the message - tokio::task::block_in_place(|| match futures::executor::block_on(this.receiver.recv()) { - Ok(msg) => Poll::Ready(Some(msg)), - Err(broadcast::error::RecvError::Closed) => Poll::Ready(None), - Err(broadcast::error::RecvError::Lagged(_)) => Poll::Pending, - }) - } -}