feat: replace BroadcastStream with async_stream create

This commit is contained in:
Lucas Colombo 2024-06-02 11:01:38 -03:00
parent d1bf0c56b5
commit 1638b0c108
Signed by: lucas
GPG Key ID: EF34786CFEFFAE35
3 changed files with 13 additions and 26 deletions

1
Cargo.lock generated
View File

@ -1931,6 +1931,7 @@ dependencies = [
name = "rustler-core" name = "rustler-core"
version = "0.3.0" version = "0.3.0"
dependencies = [ dependencies = [
"async-stream",
"async-trait", "async-trait",
"chrono", "chrono",
"dotenvy", "dotenvy",

View File

@ -52,6 +52,7 @@ lool = { version = "^0.3.2", registry = "lugit", features = [
"sched.rule-recurrence", "sched.rule-recurrence",
"macros", "macros",
] } ] }
async-stream = "0.3.5"
[build-dependencies] [build-dependencies]
tonic-build = "0.11.0" tonic-build = "0.11.0"

View File

@ -2,11 +2,8 @@ use {
eyre::Result, eyre::Result,
futures::Stream, futures::Stream,
lool::fail, lool::fail,
std::{ std::pin::Pin,
pin::Pin, tokio::sync::broadcast::{self, Sender},
task::{Context, Poll},
},
tokio::sync::broadcast::{self, Receiver, Sender},
}; };
use crate::bus::BusMessage; use crate::bus::BusMessage;
@ -37,29 +34,17 @@ impl<RM: BusMessage> SourceStream<RM> {
// Subscribe to the stream // Subscribe to the stream
pub fn subscribe(&self) -> Result<Pin<Box<dyn Stream<Item = RM> + Send + 'static>>> { pub fn subscribe(&self) -> Result<Pin<Box<dyn Stream<Item = RM> + Send + 'static>>> {
if let Some(sender) = &self.sender { if let Some(sender) = &self.sender {
let receiver = sender.subscribe(); let mut receiver = sender.subscribe();
Ok(Box::pin(BroadcastStream { receiver }))
let stream = async_stream::stream! {
while let Ok(item) = receiver.recv().await {
yield item;
}
};
Ok(Box::pin(stream))
} else { } else {
fail!("SourceStream has been consumed") fail!("SourceStream has been consumed")
} }
} }
} }
// Wrapper around Receiver to implement Stream
struct BroadcastStream<RM: BusMessage> {
receiver: Receiver<RM>,
}
impl<RM: BusMessage> Stream for BroadcastStream<RM> {
type Item = RM;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
// Spawn an async task to receive the message
tokio::task::block_in_place(|| match futures::executor::block_on(this.receiver.recv()) {
Ok(msg) => Poll::Ready(Some(msg)),
Err(broadcast::error::RecvError::Closed) => Poll::Ready(None),
Err(broadcast::error::RecvError::Lagged(_)) => Poll::Pending,
})
}
}