From ebb4e94f1a04913725337c93aa5d1b4f57dc4516 Mon Sep 17 00:00:00 2001 From: Lucas Colombo Date: Sun, 21 Apr 2024 13:21:07 -0300 Subject: [PATCH] =?UTF-8?q?feat(sched):=20=E2=9C=A8=20tokio:=20schedule=20?= =?UTF-8?q?a=20future?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/sched_tokio_future.rs | 46 ++++++++++++++++++++++++++++++++++ lib/sched/scheduler/tokio.rs | 33 ++++++++++++++++++------ 2 files changed, 72 insertions(+), 7 deletions(-) create mode 100644 examples/sched_tokio_future.rs diff --git a/examples/sched_tokio_future.rs b/examples/sched_tokio_future.rs new file mode 100644 index 0000000..e29ff0f --- /dev/null +++ b/examples/sched_tokio_future.rs @@ -0,0 +1,46 @@ +use { + eyre::{set_hook, DefaultHandler, Result}, + lool::{ + logger::ConsoleLogger, + sched::{recur, ruleset, scheduler::tokio::Scheduler}, + }, + tokio::time::sleep, +}; + +fn setup_eyre() { + let _ = set_hook(Box::new(DefaultHandler::default_with)); +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<()> { + setup_eyre(); + ConsoleLogger::default_setup(log::Level::Trace, "lool::sched::recur")?; + + let mut sched = Scheduler::new(); + log::debug!("scheduler created"); + + let now = chrono::Local::now(); + + let handler = sched.schedule_fut("test-task", async move { + println!("I'm running at {}", &now.format("%Y-%m-%d %H:%M:%S")); + }, recur(ruleset().at_second(0))).await; + + sleep(std::time::Duration::from_secs(1)).await; + + loop { + let name = handler.name(); + println!("{:?}", handler); + + handler.get_next_run(); + + sleep(std::time::Duration::from_secs(60)).await; + + let result = sched.remove(&handler).await; + + if result.is_ok() { + println!("task {name} removed"); + } else { + println!("task {name} not present in the scheduler"); + } + } +} diff --git a/lib/sched/scheduler/tokio.rs b/lib/sched/scheduler/tokio.rs index f92d841..ebfe073 100644 --- a/lib/sched/scheduler/tokio.rs +++ b/lib/sched/scheduler/tokio.rs @@ -77,9 +77,10 @@ impl Scheduler { } } - /// 🧉 » schedule a task + /// 🧉 » schedule an async task /// - /// schedules a task to be executed at times determined by the provided rules. + /// schedules an async function to be executed as a task at time intervals determined by the + /// provided /// rules. pub async fn schedule( &mut self, name: Str, @@ -92,25 +93,43 @@ impl Scheduler { Str: AsRef, { let name = name.as_ref(); - self.schedule_many_rules(name, func, vec![rules]).await + let mut func = func; + self.schedule_many_rules(name, func(), vec![rules]).await + } + + /// 🧉 » schedule a future + /// + /// schedules a future to be executed as a task at time intervals determined by the provided + /// rules. + pub async fn schedule_fut( + &mut self, + name: Str, + future: Fut, + rules: SchedulingRule, + ) -> TaskHandler + where + Fut: Future + Send + 'static, + Str: AsRef, + { + let name = name.as_ref(); + self.schedule_many_rules(name, future, vec![rules]).await } /// 🧉 » schedule a task /// /// schedules a task to be executed at times determined by the provided rules. - pub async fn schedule_many_rules( + pub async fn schedule_many_rules( &mut self, name: &str, - mut func: F, + future: Fut, rules: Vec, ) -> TaskHandler where - F: FnMut() -> Fut + Send + 'static, Fut: Future + Send + 'static, { let task = Arc::new(Mutex::new(ScheduledTask { name: name.to_string(), - action: Box::pin(func()), + action: Box::pin(future), rules: Arc::new(rules), is_running: Arc::new(AtomicBool::new(false)), is_stopped: Arc::new(AtomicBool::new(false)),