From 1638b0c1081826331681ec4362723189299cea14 Mon Sep 17 00:00:00 2001 From: Lucas Colombo Date: Sun, 2 Jun 2024 11:01:38 -0300 Subject: [PATCH] =?UTF-8?q?feat:=20=E2=9C=A8=20replace=20BroadcastStream?= =?UTF-8?q?=20with=20async=5Fstream=20create?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 1 + Cargo.toml | 1 + lib/bus/redis/stream/mod.rs | 37 +++++++++++-------------------------- 3 files changed, 13 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b0a7703..ebe13ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1931,6 +1931,7 @@ dependencies = [ name = "rustler-core" version = "0.3.0" dependencies = [ + "async-stream", "async-trait", "chrono", "dotenvy", diff --git a/Cargo.toml b/Cargo.toml index 2499486..ae3b98c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ lool = { version = "^0.3.2", registry = "lugit", features = [ "sched.rule-recurrence", "macros", ] } +async-stream = "0.3.5" [build-dependencies] tonic-build = "0.11.0" diff --git a/lib/bus/redis/stream/mod.rs b/lib/bus/redis/stream/mod.rs index 33fa96c..a5c0e6a 100644 --- a/lib/bus/redis/stream/mod.rs +++ b/lib/bus/redis/stream/mod.rs @@ -2,11 +2,8 @@ use { eyre::Result, futures::Stream, lool::fail, - std::{ - pin::Pin, - task::{Context, Poll}, - }, - tokio::sync::broadcast::{self, Receiver, Sender}, + std::pin::Pin, + tokio::sync::broadcast::{self, Sender}, }; use crate::bus::BusMessage; @@ -37,29 +34,17 @@ impl SourceStream { // Subscribe to the stream pub fn subscribe(&self) -> Result + Send + 'static>>> { if let Some(sender) = &self.sender { - let receiver = sender.subscribe(); - Ok(Box::pin(BroadcastStream { receiver })) + let mut receiver = sender.subscribe(); + + let stream = async_stream::stream! { + while let Ok(item) = receiver.recv().await { + yield item; + } + }; + + Ok(Box::pin(stream)) } else { fail!("SourceStream has been consumed") } } } - -// Wrapper around Receiver to implement Stream -struct BroadcastStream { - receiver: Receiver, -} - -impl Stream for BroadcastStream { - type Item = RM; - - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - // Spawn an async task to receive the message - tokio::task::block_in_place(|| match futures::executor::block_on(this.receiver.recv()) { - Ok(msg) => Poll::Ready(Some(msg)), - Err(broadcast::error::RecvError::Closed) => Poll::Ready(None), - Err(broadcast::error::RecvError::Lagged(_)) => Poll::Pending, - }) - } -}