From 51a83371c6e1bef063512fb014edd1ca42a73b69 Mon Sep 17 00:00:00 2001 From: Lucas Colombo Date: Thu, 11 Apr 2024 04:25:36 -0300 Subject: [PATCH] =?UTF-8?q?feat(sched):=20=E2=9C=A8=20tokio=20scheduler?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 23 +++ Cargo.toml | 25 ++- examples/sched.rs | 2 +- examples/sched_tokio.rs | 49 +++++ lib/sched/mod.rs | 6 +- lib/sched/scheduler.rs | 364 +-------------------------------- lib/sched/scheduler/threads.rs | 243 ++++++++++++++++++++++ lib/sched/scheduler/tokio.rs | 229 +++++++++++++++++++++ lib/sched/task_handler.rs | 142 +++++++++++++ 🧳 lool.code-workspace | 25 --- 10 files changed, 720 insertions(+), 388 deletions(-) create mode 100644 examples/sched_tokio.rs create mode 100644 lib/sched/scheduler/threads.rs create mode 100644 lib/sched/scheduler/tokio.rs create mode 100644 lib/sched/task_handler.rs diff --git a/Cargo.lock b/Cargo.lock index 06eb7af..1f8f474 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -183,6 +183,7 @@ dependencies = [ "log", "num-traits", "tokio", + "tokio_schedule", ] [[package]] @@ -273,6 +274,28 @@ checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", "pin-project-lite", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio_schedule" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b4a14ab1658c39d137ebcc5fbaab364c9922a6cc04ab48b364546c2e6022256" +dependencies = [ + "chrono", + "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 1d84e94..923a4b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,15 @@ path = "lib/lib.rs" "macros" = [] # scheduling "sched" = ["dep:chrono", "dep:log"] -"sched.tokio" = ["dep:tokio", "tokio?/time", "tokio?/rt", "sched"] +"sched.tokio" = [ + "dep:tokio", + "tokio?/time", + "tokio?/rt", + "sched", + "tokio?/macros", + "tokio?/sync", +] +"sched.threads" = ["sched"] "sched.rule-recurrence" = ["sched", "dep:num-traits"] "sched.rule-cron" = ["sched", "dep:croner"] # utils @@ -48,6 +56,21 @@ log = { version = "0.4.21", optional = true } tokio = { version = "1.37.0", optional = true } croner = { version = "2.0.4", optional = true } num-traits = { version = "0.2.18", optional = true } +tokio_schedule = "0.3.1" +[[example]] +name = "sched" +path = "examples/sched.rs" +required-features = [ + "sched.threads", + "sched.rule-recurrence", +] +[[example]] +name = "sched_tokio" +path = "examples/sched_tokio.rs" +required-features = [ + "sched.tokio", + "sched.rule-recurrence", +] diff --git a/examples/sched.rs b/examples/sched.rs index af6037d..dece24b 100644 --- a/examples/sched.rs +++ b/examples/sched.rs @@ -2,7 +2,7 @@ use { eyre::{set_hook, DefaultHandler, Result}, lool::{ logger::ConsoleLogger, - sched::{recur, ruleset, Scheduler}, + sched::{recur, ruleset, scheduler::threads::Scheduler}, }, }; diff --git a/examples/sched_tokio.rs b/examples/sched_tokio.rs new file mode 100644 index 0000000..5881394 --- /dev/null +++ b/examples/sched_tokio.rs @@ -0,0 +1,49 @@ +use { + eyre::{set_hook, DefaultHandler, Result}, + lool::{ + logger::ConsoleLogger, + sched::{recur, ruleset, scheduler::tokio::Scheduler}, + }, + tokio::time::sleep, +}; + +fn setup_eyre() { + let _ = set_hook(Box::new(DefaultHandler::default_with)); +} + +async fn my_action() { + let now = chrono::Local::now(); + println!("I'm running at {}", now.format("%Y-%m-%d %H:%M:%S")); + + sleep(std::time::Duration::from_secs(15)).await; +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<()> { + setup_eyre(); + ConsoleLogger::default_setup(log::Level::Trace, "lool::sched::recur")?; + + let mut sched = Scheduler::new(); + log::debug!("scheduler created"); + + let handler = sched.schedule("test-task", my_action, recur(ruleset().at_second(0))).await; + + sleep(std::time::Duration::from_secs(1)).await; + + loop { + let name = handler.name(); + println!("{:?}", handler); + + handler.get_next_run(); + + sleep(std::time::Duration::from_secs(60)).await; + + let result = sched.remove(&handler).await; + + if result.is_ok() { + println!("task {name} removed"); + } else { + println!("task {name} not present in the scheduler"); + } + } +} diff --git a/lib/sched/mod.rs b/lib/sched/mod.rs index e5ff6ba..28a2edf 100644 --- a/lib/sched/mod.rs +++ b/lib/sched/mod.rs @@ -1,6 +1,6 @@ mod rules; -mod scheduler; +pub use rules::*; +pub mod scheduler; +pub mod task_handler; pub mod utils; - -pub use {rules::*, scheduler::*}; diff --git a/lib/sched/scheduler.rs b/lib/sched/scheduler.rs index b6c4954..f04b0ba 100644 --- a/lib/sched/scheduler.rs +++ b/lib/sched/scheduler.rs @@ -1,360 +1,8 @@ -use { - eyre::{eyre, Result}, - log::debug, - std::{ - collections::HashMap, - sync::{ - atomic::{AtomicBool, AtomicPtr, Ordering}, - Arc, Mutex, - }, - thread::{self, JoinHandle}, - }, -}; +#[cfg(not(any(feature = "sched.tokio", feature = "sched.threads")))] +compile_error!("at least one of 'sched.tokio' or 'sched.threads' features must be enabled"); -use { - super::SchedulingRule, - chrono::{DateTime, Local}, -}; +#[cfg(feature = "sched.tokio")] +pub mod tokio; -type Action = Box; - -/// 🧉 » a scheduled task -/// -/// this structs represents a task that has been scheduled in the scheduler. -/// -/// this is returned by the `Scheduler::schedule` method, and can be used to check and control the -/// status of the task. -pub struct ScheduledTask { - #[allow(dead_code)] - name: String, - action: Action, - rules: Arc>, - is_running: Arc, - is_stopped: Arc, - is_removed: Arc, - last_run: Arc>>, -} - -impl ScheduledTask { - fn run(&mut self) { - let action = self.action.as_mut(); - action(); - } - - fn make_handler(&self) -> TaskHandler { - TaskHandler { - name: self.name.clone(), - rules: self.rules.clone(), - is_running: self.is_running.clone(), - is_stopped: self.is_stopped.clone(), - is_removed: self.is_removed.clone(), - last_run: self.last_run.clone(), - } - } - - fn is_active(&self) -> bool { - !self.is_stopped.load(Ordering::Relaxed) && !self.is_removed.load(Ordering::Relaxed) - } - - fn is_removed(&self) -> bool { - self.is_removed.load(Ordering::Relaxed) - } -} - -/// 🧉 » a task scheduler. -/// -/// this struct is responsible for scheduling tasks to be executed at specific times, depending on -/// the rules provided for each task. -/// -/// Each task can have n rules, and the task will be executed when any of the rules is met. -pub struct Scheduler { - tasks: HashMap>>, -} - -impl Default for Scheduler { - fn default() -> Self { - Self::new() - } -} - -impl Scheduler { - /// 🧉 » create a new scheduler - pub fn new() -> Self { - Self { - tasks: HashMap::new(), - } - } - - /// 🧉 » schedule a task - /// - /// schedules a task to be executed at times determined by the provided rules. - pub fn schedule(&mut self, name: &str, action: F, rules: SchedulingRule) -> TaskHandler - where - F: FnMut() + Send + Sync + 'static, - { - self.schedule_many_rules(name, action, vec![rules]) - } - - /// 🧉 » schedule a task - /// - /// schedules a task to be executed at times determined by the provided rules. - pub fn schedule_many_rules( - &mut self, - name: &str, - action: F, - rules: Vec, - ) -> TaskHandler - where - F: FnMut() + Send + Sync + 'static, - { - let task = Arc::new(Mutex::new(ScheduledTask { - name: name.to_string(), - action: Box::new(action), - rules: Arc::new(rules), - is_running: Arc::new(AtomicBool::new(false)), - is_stopped: Arc::new(AtomicBool::new(false)), - is_removed: Arc::new(AtomicBool::new(false)), - last_run: Arc::new(AtomicPtr::new(std::ptr::null_mut())), - })); - - self.tasks.insert(name.to_string(), task.clone()); - - // launch the task in its own thread - spawn_task(task.clone()); - - let handler: TaskHandler = { - let task = task.lock().unwrap(); - task.make_handler() - }; - - handler - } - - /// 🧉 » stop a task - pub fn stop(&mut self, handler: &TaskHandler) -> Result<()> { - // get the task from the tasks map - let task = self.tasks.get(handler.name()); - - if let Some(task) = task { - if let Ok(task) = task.lock() { - task.is_stopped.store(true, Ordering::Relaxed); - debug!("task {} has been stopped", handler.name()); - Ok(()) - } else { - Err(eyre!("error stopping task {}", handler.name())) - } - } else { - Err(eyre!("task {} was not found", handler.name())) - } - } - - /// 🧉 » resume a task - pub fn resume(&mut self, handler: &TaskHandler) -> Result<()> { - // get the task from the tasks map - let task = self.tasks.get(handler.name()); - - if let Some(task) = task { - if let Ok(task) = task.lock() { - task.is_stopped.store(false, Ordering::Relaxed); - debug!("task {} has been resumed", handler.name()); - Ok(()) - } else { - Err(eyre!("error resuming task {}", handler.name())) - } - } else { - Err(eyre!("task {} was not found", handler.name())) - } - } - - /// 🧉 » remove a task - pub fn remove(&mut self, handler: &TaskHandler) -> Result<()> { - // get the task from the tasks map - let task = self.tasks.remove(handler.name()); - - if let Some(task) = task { - if let Ok(task) = &mut task.lock() { - task.is_removed.store(true, Ordering::Relaxed); - debug!("task {} has been removed", handler.name()); - - Ok(()) - } else { - Err(eyre!("error removing task {}", handler.name())) - } - } else { - handler.is_removed.store(true, Ordering::Relaxed); - Err(eyre!("task {} was not found", handler.name())) - } - } -} - -/// 🧉 » task handler -/// -/// returned by the `Scheduler::schedule` method, -/// this struct can be used to check and control -/// the status of the task. -#[derive(Clone, Debug)] -pub struct TaskHandler { - name: String, - rules: Arc>, - is_running: Arc, - is_stopped: Arc, - is_removed: Arc, - last_run: Arc>>, -} - -impl TaskHandler { - /// 🧉 » last run date - /// - /// returns a `DateTime` representing the last time the task was run or None if the task - /// has never been run before. - pub fn get_last_run(&self) -> Option> { - let last_run = self.last_run.load(Ordering::Relaxed); - if last_run.is_null() { - None - } else { - Some(unsafe { *last_run }) - } - } - - /// 🧉 » next run date - /// - /// returns a `DateTime` representing the next time the task is scheduled to run - pub fn get_next_run(&self) -> Option> { - if self.is_active() { - return get_next_run_time(&self.rules, None); - } - - None - } - - /// 🧉 » is running? - /// - /// returns a `bool` indicating if the task is currently running in this moment - pub fn is_running(&self) -> bool { - self.is_running.load(Ordering::Relaxed) - } - - /// 🧉 » name - /// - /// returns the name of the task - pub fn name(&self) -> &str { - &self.name - } - - /// 🧉 » is active? - /// - /// returns a `bool` indicating if the task is active - pub fn is_active(&self) -> bool { - !self.is_stopped.load(Ordering::Relaxed) && !self.is_removed.load(Ordering::Relaxed) - } - - /// 🧉 » is stopped? - /// - /// returns a `bool` indicating if the task has been stopped - /// - /// a task being stopped means that it has been paused, but not removed from the scheduler. - /// So, although it's not running, it's still in the scheduler and can be resumed. - pub fn is_stopped(&self) -> bool { - self.is_stopped.load(Ordering::Relaxed) - } - - /// 🧉 » is removed? - /// - /// returns a `bool` indicating if the task has been removed - /// - /// once a task is removed, it's no longer in the scheduler and can't be resumed. - pub fn is_removed(&self) -> bool { - self.is_removed.load(Ordering::Relaxed) - } -} - -/// **main function to run the task in its own thread** -/// -/// it creates a new thread and runs the task according to its scheduling rules. -fn spawn_task(task_mutex: Arc>) -> JoinHandle<()> { - let thread = thread::spawn(move || { - let (mut maybe_next_run, name) = { - let task = task_mutex.lock().unwrap(); - let rules = &task.rules; - - (get_next_run_time(rules, None), task.name.clone()) - }; - - while let Some(run_date) = maybe_next_run { - let now = Local::now(); - if run_date > now { - // if the next run is in the future, go to bed until then - let sleep_until = run_date - now; - debug!( - "task {} will run in {} seconds", - name, - sleep_until.num_seconds() - ); - - std::thread::sleep(sleep_until.to_std().unwrap()); - } else { - // if the next run is in the past, run the task immediately, probably missed the - // run time for a few nanos - debug!("task will run in 0 seconds"); - } - - let mut task = task_mutex.lock().unwrap(); - - if task.is_active() { - let run_date_box = Box::new(run_date); - let run_date_raw = Box::into_raw(run_date_box); - - task.last_run.store(run_date_raw, Ordering::Relaxed); - task.is_running.store(true, Ordering::SeqCst); - task.run(); - task.is_running.store(false, Ordering::SeqCst); - } - - if !task.is_removed() { - maybe_next_run = get_next_run_time(&task.rules, Some(run_date)); - } else { - maybe_next_run = None; - } - } - - debug!("task {} has finished", name); - }); - - thread -} - -/// **get next run time** -/// -/// this function takes a list of scheduling rules and a base time, and returns the next time the -/// task should run. -/// -/// to determine the next run time, it iterates over the list of rules and calculates the next run -/// time for each of them, returning the earliest of them all. -fn get_next_run_time( - rules: &Vec, - from: Option>, -) -> Option> { - let mut next_run_so_far: Option> = None; - - let base = if let Some(from) = from { - from - } else { - Local::now() - }; - - for rule in rules { - let rule_next_run = rule.next_from(base); - - if let Some(next_run) = rule_next_run { - if let Some(d) = next_run_so_far { - if next_run < d { - next_run_so_far = Some(next_run); - } - } else { - next_run_so_far = Some(next_run); - } - } - } - - next_run_so_far -} +#[cfg(feature = "sched.threads")] +pub mod threads; diff --git a/lib/sched/scheduler/threads.rs b/lib/sched/scheduler/threads.rs new file mode 100644 index 0000000..83ad267 --- /dev/null +++ b/lib/sched/scheduler/threads.rs @@ -0,0 +1,243 @@ +use { + crate::sched::{ + rules::SchedulingRule, + task_handler::{get_next_run_time, TaskHandler}, + }, + chrono::{DateTime, Local}, + eyre::{eyre, Result}, + log::debug, + std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, AtomicPtr, Ordering}, + Arc, Mutex, + }, + thread::{self, JoinHandle}, + }, +}; + +type Action = Box; + +/// 🧉 » a scheduled task +/// +/// this structs represents a task that has been scheduled in the scheduler. +/// +/// this is returned by the `Scheduler::schedule` method, and can be used to check and control the +/// status of the task. +pub struct ScheduledTask { + #[allow(dead_code)] + name: String, + action: Action, + rules: Arc>, + is_running: Arc, + is_stopped: Arc, + is_removed: Arc, + last_run: Arc>>, +} + +impl ScheduledTask { + fn run(&mut self) { + let action = self.action.as_mut(); + action(); + } + + fn make_handler(&self) -> TaskHandler { + TaskHandler { + name: self.name.clone(), + rules: self.rules.clone(), + is_running: self.is_running.clone(), + is_stopped: self.is_stopped.clone(), + is_removed: self.is_removed.clone(), + last_run: self.last_run.clone(), + } + } + + fn is_active(&self) -> bool { + !self.is_stopped.load(Ordering::Relaxed) && !self.is_removed.load(Ordering::Relaxed) + } + + fn is_removed(&self) -> bool { + self.is_removed.load(Ordering::Relaxed) + } +} + +/// 🧉 » a task scheduler. +/// +/// this struct is responsible for scheduling tasks to be executed at specific times, depending on +/// the rules provided for each task. +/// +/// Each task can have n rules, and the task will be executed when any of the rules is met. +pub struct Scheduler { + tasks: HashMap>>, +} + +impl Default for Scheduler { + fn default() -> Self { + Self::new() + } +} + +impl Scheduler { + /// 🧉 » create a new scheduler + pub fn new() -> Self { + Self { + tasks: HashMap::new(), + } + } + + /// 🧉 » schedule a task + /// + /// schedules a task to be executed at times determined by the provided rules. + pub fn schedule(&mut self, name: &str, action: F, rules: SchedulingRule) -> TaskHandler + where + F: FnMut() + Send + Sync + 'static, + { + self.schedule_many_rules(name, action, vec![rules]) + } + + /// 🧉 » schedule a task + /// + /// schedules a task to be executed at times determined by the provided rules. + pub fn schedule_many_rules( + &mut self, + name: &str, + action: F, + rules: Vec, + ) -> TaskHandler + where + F: FnMut() + Send + Sync + 'static, + { + let task = Arc::new(Mutex::new(ScheduledTask { + name: name.to_string(), + action: Box::new(action), + rules: Arc::new(rules), + is_running: Arc::new(AtomicBool::new(false)), + is_stopped: Arc::new(AtomicBool::new(false)), + is_removed: Arc::new(AtomicBool::new(false)), + last_run: Arc::new(AtomicPtr::new(std::ptr::null_mut())), + })); + + self.tasks.insert(name.to_string(), task.clone()); + + // launch the task in its own thread + spawn_task(task.clone()); + + let handler: TaskHandler = { + let task = task.lock().unwrap(); + task.make_handler() + }; + + handler + } + + /// 🧉 » stop a task + pub fn stop(&mut self, handler: &TaskHandler) -> Result<()> { + // get the task from the tasks map + let task = self.tasks.get(handler.name()); + + if let Some(task) = task { + if let Ok(task) = task.lock() { + task.is_stopped.store(true, Ordering::Relaxed); + debug!("task {} has been stopped", handler.name()); + Ok(()) + } else { + Err(eyre!("error stopping task {}", handler.name())) + } + } else { + Err(eyre!("task {} was not found", handler.name())) + } + } + + /// 🧉 » resume a task + pub fn resume(&mut self, handler: &TaskHandler) -> Result<()> { + // get the task from the tasks map + let task = self.tasks.get(handler.name()); + + if let Some(task) = task { + if let Ok(task) = task.lock() { + task.is_stopped.store(false, Ordering::Relaxed); + debug!("task {} has been resumed", handler.name()); + Ok(()) + } else { + Err(eyre!("error resuming task {}", handler.name())) + } + } else { + Err(eyre!("task {} was not found", handler.name())) + } + } + + /// 🧉 » remove a task + pub fn remove(&mut self, handler: &TaskHandler) -> Result<()> { + // get the task from the tasks map + let task = self.tasks.remove(handler.name()); + + if let Some(task) = task { + if let Ok(task) = &mut task.lock() { + task.is_removed.store(true, Ordering::Relaxed); + debug!("task {} has been removed", handler.name()); + + Ok(()) + } else { + Err(eyre!("error removing task {}", handler.name())) + } + } else { + handler.is_removed.store(true, Ordering::Relaxed); + Err(eyre!("task {} was not found", handler.name())) + } + } +} + +/// **main function to run the task in its own thread** +/// +/// it creates a new thread and runs the task according to its scheduling rules. +fn spawn_task(task_mutex: Arc>) -> JoinHandle<()> { + let thread = thread::spawn(move || { + let (mut maybe_next_run, name) = { + let task = task_mutex.lock().unwrap(); + let rules = &task.rules; + + (get_next_run_time(rules, None), task.name.clone()) + }; + + while let Some(run_date) = maybe_next_run { + let now = Local::now(); + if run_date > now { + // if the next run is in the future, go to bed until then + let sleep_until = run_date - now; + debug!( + "task {} will run in {} seconds", + name, + sleep_until.num_seconds() + ); + + std::thread::sleep(sleep_until.to_std().unwrap()); + } else { + // if the next run is in the past, run the task immediately, probably missed the + // run time for a few nanos + debug!("task will run in 0 seconds"); + } + + let mut task = task_mutex.lock().unwrap(); + + if task.is_active() { + let run_date_box = Box::new(run_date); + let run_date_raw = Box::into_raw(run_date_box); + + task.last_run.store(run_date_raw, Ordering::Relaxed); + task.is_running.store(true, Ordering::SeqCst); + task.run(); + task.is_running.store(false, Ordering::SeqCst); + } + + if !task.is_removed() { + maybe_next_run = get_next_run_time(&task.rules, Some(run_date)); + } else { + maybe_next_run = None; + } + } + + debug!("task {} has finished", name); + }); + + thread +} diff --git a/lib/sched/scheduler/tokio.rs b/lib/sched/scheduler/tokio.rs new file mode 100644 index 0000000..58d97dd --- /dev/null +++ b/lib/sched/scheduler/tokio.rs @@ -0,0 +1,229 @@ +use { + crate::sched::{ + rules::SchedulingRule, + task_handler::{get_next_run_time, TaskHandler}, + }, + chrono::{DateTime, Local}, + eyre::{eyre, Result}, + log::debug, + std::{ + collections::HashMap, + future::Future, + pin::Pin, + sync::{ + atomic::{AtomicBool, AtomicPtr, Ordering}, + Arc, + }, + }, + tokio::{spawn, sync::Mutex, task::JoinHandle, time::sleep}, +}; + +struct ScheduledTask { + name: String, + action: Pin + Send + 'static>>, + rules: Arc>, + is_running: Arc, + is_stopped: Arc, + is_removed: Arc, + last_run: Arc>>, +} + +impl ScheduledTask { + async fn run(&mut self) { + self.action.as_mut().await; + } + + fn make_handler(&self) -> TaskHandler { + TaskHandler { + name: self.name.clone(), + rules: self.rules.clone(), + is_running: self.is_running.clone(), + is_stopped: self.is_stopped.clone(), + is_removed: self.is_removed.clone(), + last_run: self.last_run.clone(), + } + } + + fn is_active(&self) -> bool { + !self.is_stopped.load(Ordering::Relaxed) && !self.is_removed.load(Ordering::Relaxed) + } + + fn is_removed(&self) -> bool { + self.is_removed.load(Ordering::Relaxed) + } +} + +/// 🧉 » a task scheduler. +/// +/// this struct is responsible for scheduling tasks to be executed at specific times, depending on +/// the rules provided for each task. +/// +/// Each task can have n rules, and the task will be executed when any of the rules is met. +pub struct Scheduler { + tasks: HashMap>>, +} + +impl Default for Scheduler { + fn default() -> Self { + Self::new() + } +} + +impl Scheduler { + /// 🧉 » create a new scheduler + pub fn new() -> Self { + Self { + tasks: HashMap::new(), + } + } + + /// 🧉 » schedule a task + /// + /// schedules a task to be executed at times determined by the provided rules. + pub async fn schedule( + &mut self, + name: &str, + func: F, + rules: SchedulingRule, + ) -> TaskHandler + where + F: FnMut() -> Fut + Send + 'static, + Fut: Future + Send + 'static, + { + self.schedule_many_rules(name, func, vec![rules]).await + } + + /// 🧉 » schedule a task + /// + /// schedules a task to be executed at times determined by the provided rules. + pub async fn schedule_many_rules( + &mut self, + name: &str, + mut func: F, + rules: Vec, + ) -> TaskHandler + where + F: FnMut() -> Fut + Send + 'static, + Fut: Future + Send + 'static, + { + let task = Arc::new(Mutex::new(ScheduledTask { + name: name.to_string(), + action: Box::pin(func()), + rules: Arc::new(rules), + is_running: Arc::new(AtomicBool::new(false)), + is_stopped: Arc::new(AtomicBool::new(false)), + is_removed: Arc::new(AtomicBool::new(false)), + last_run: Arc::new(AtomicPtr::new(std::ptr::null_mut())), + })); + + self.tasks.insert(name.to_string(), task.clone()); + + spawn_task(task.clone()); + + let handler: TaskHandler = { + let task = task.lock().await; + task.make_handler() + }; + + handler + } + + /// 🧉 » stop a task + pub async fn stop(&mut self, handler: &TaskHandler) -> Result<()> { + // get the task from the tasks map + let task = self.tasks.get(handler.name()); + + if let Some(task) = task { + let task = task.lock().await; + task.is_stopped.store(true, Ordering::Relaxed); + debug!("task {} has been stopped", handler.name()); + Ok(()) + } else { + Err(eyre!("task {} was not found", handler.name())) + } + } + + /// 🧉 » resume a task + pub async fn resume(&mut self, handler: &TaskHandler) -> Result<()> { + // get the task from the tasks map + let task = self.tasks.get(handler.name()); + + if let Some(task) = task { + let task = task.lock().await; + task.is_stopped.store(false, Ordering::Relaxed); + debug!("task {} has been resumed", handler.name()); + Ok(()) + } else { + Err(eyre!("task {} was not found", handler.name())) + } + } + + /// 🧉 » remove a task + pub async fn remove(&mut self, handler: &TaskHandler) -> Result<()> { + // get the task from the tasks map + let task = self.tasks.remove(handler.name()); + + if let Some(task) = task { + let task = task.lock().await; + task.is_removed.store(true, Ordering::Relaxed); + debug!("task {} has been removed", handler.name()); + Ok(()) + } else { + handler.is_removed.store(true, Ordering::Relaxed); + Err(eyre!("task {} was not found", handler.name())) + } + } +} + +/// **main function to spawn a task in tokio** +/// +/// it spawns a new tokio task and runs the task according to its scheduling rules. +fn spawn_task(task_mutex: Arc>) -> JoinHandle<()> { + spawn(async move { + let (mut maybe_next_run, name) = { + let task = task_mutex.lock().await; + let rules = &task.rules; + + (get_next_run_time(rules, None), task.name.clone()) + }; + + while let Some(run_date) = maybe_next_run { + let now = Local::now(); + if run_date > now { + // if the next run is in the future, go to bed until then + let sleep_until = run_date - now; + debug!( + "task {} will run in {} seconds", + name, + sleep_until.num_seconds() + ); + + sleep(sleep_until.to_std().unwrap()).await; + } else { + // if the next run is in the past, run the task immediately, probably missed the + // run time for a few nanos + debug!("task will run in 0 seconds"); + } + + let mut task = task_mutex.lock().await; + + if task.is_active() { + let run_date_box = Box::new(run_date); + let run_date_raw = Box::into_raw(run_date_box); + + task.last_run.store(run_date_raw, Ordering::Relaxed); + task.is_running.store(true, Ordering::SeqCst); + task.run().await; + task.is_running.store(false, Ordering::SeqCst); + } + + if !task.is_removed() { + maybe_next_run = get_next_run_time(&task.rules, Some(run_date)); + } else { + maybe_next_run = None; + } + } + + debug!("task {} has finished", name); + }) +} diff --git a/lib/sched/task_handler.rs b/lib/sched/task_handler.rs new file mode 100644 index 0000000..febe303 --- /dev/null +++ b/lib/sched/task_handler.rs @@ -0,0 +1,142 @@ +use { + super::rules::SchedulingRule, + chrono::{DateTime, Local}, + core::fmt, + std::{ + fmt::{Debug, Formatter}, + sync::{ + atomic::{AtomicBool, AtomicPtr, Ordering}, + Arc, + }, + }, +}; + +/// 🧉 » task handler +/// +/// returned by the `Scheduler::schedule` method, +/// this struct can be used to check and control +/// the status of the task. +#[derive(Clone)] +pub struct TaskHandler { + pub(crate) name: String, + pub(crate) rules: Arc>, + pub(crate) is_running: Arc, + pub(crate) is_stopped: Arc, + pub(crate) is_removed: Arc, + pub(crate) last_run: Arc>>, +} + +impl Debug for TaskHandler { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("TaskHandler") + .field("name", &self.name) + .field("is_running", &self.is_running) + .field("is_stopped", &self.is_stopped) + .field("is_removed", &self.is_removed) + .field("last_run", &self.get_last_run()) + .field("next_run", &self.get_next_run()) + .finish() + } +} + +impl TaskHandler { + /// 🧉 » last run date + /// + /// returns a `DateTime` representing the last time the task was run or None if the task + /// has never been run before. + pub fn get_last_run(&self) -> Option> { + let last_run = self.last_run.load(Ordering::Relaxed); + if last_run.is_null() { + None + } else { + Some(unsafe { *last_run }) + } + } + + /// 🧉 » next run date + /// + /// returns a `DateTime` representing the next time the task is scheduled to run + pub fn get_next_run(&self) -> Option> { + if self.is_active() { + return get_next_run_time(&self.rules, None); + } + + None + } + + /// 🧉 » is running? + /// + /// returns a `bool` indicating if the task is currently running in this moment + pub fn is_running(&self) -> bool { + self.is_running.load(Ordering::Relaxed) + } + + /// 🧉 » name + /// + /// returns the name of the task + pub fn name(&self) -> &str { + &self.name + } + + /// 🧉 » is active? + /// + /// returns a `bool` indicating if the task is active + pub fn is_active(&self) -> bool { + !self.is_stopped.load(Ordering::Relaxed) && !self.is_removed.load(Ordering::Relaxed) + } + + /// 🧉 » is stopped? + /// + /// returns a `bool` indicating if the task has been stopped + /// + /// a task being stopped means that it has been paused, but not removed from the scheduler. + /// So, although it's not running, it's still in the scheduler and can be resumed. + pub fn is_stopped(&self) -> bool { + self.is_stopped.load(Ordering::Relaxed) + } + + /// 🧉 » is removed? + /// + /// returns a `bool` indicating if the task has been removed + /// + /// once a task is removed, it's no longer in the scheduler and can't be resumed. + pub fn is_removed(&self) -> bool { + self.is_removed.load(Ordering::Relaxed) + } +} + +/// **get next run time** +/// +/// this function takes a list of scheduling rules and a base time, and returns the next time the +/// task should run. +/// +/// to determine the next run time, it iterates over the list of rules and calculates the next run +/// time for each of them, returning the earliest of them all. +pub(crate) fn get_next_run_time( + rules: &Vec, + from: Option>, +) -> Option> { + let mut next_run_so_far: Option> = None; + + let base = if let Some(from) = from { + from + } else { + Local::now() + }; + + for rule in rules { + let rule_next_run = rule.next_from(base); + + if let Some(next_run) = rule_next_run { + if let Some(d) = next_run_so_far { + if next_run < d { + next_run_so_far = Some(next_run); + } + } else { + next_run_so_far = Some(next_run); + } + } + } + + next_run_so_far +} diff --git a/🧳 lool.code-workspace b/🧳 lool.code-workspace index 8e7a9b9..d6d1084 100644 --- a/🧳 lool.code-workspace +++ b/🧳 lool.code-workspace @@ -19,30 +19,5 @@ }, "rust-analyzer.cargo.features": "all", - "files.exclude": { - "**/.git": false, - "**/.svn": false, - "**/.hg": false, - "**/CVS": false, - "**/.DS_Store": false, - "**/Thumbs.db": true, - "**/.classpath": false, - "**/.factorypath": true, - "**/.project": false, - "**/.settings": true, - "*.ids": false, - "*.iml": false, - "*.ipr": false, - "*.iws": false, - "*.orig": false, - ".gradle": false, - ".idea/": false, - ".out/": false, - ".settings": false, - ".vscode": false, - "bin/": false, - "build/": false, - "out/": false - }, } }