From 5b8a9faabab863fc8bc10da5d95a66e17cac3dd1 Mon Sep 17 00:00:00 2001 From: Lucas Colombo Date: Wed, 10 Apr 2024 21:44:20 -0300 Subject: [PATCH] =?UTF-8?q?feat(sched):=20=E2=9C=A8=20proper=20task=20hand?= =?UTF-8?q?lers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/sched.rs | 21 +++---- lib/sched/scheduler.rs | 136 ++++++++++++++++++++++++++++++++--------- 2 files changed, 116 insertions(+), 41 deletions(-) diff --git a/examples/sched.rs b/examples/sched.rs index 3e7e800..8070c94 100644 --- a/examples/sched.rs +++ b/examples/sched.rs @@ -3,6 +3,8 @@ use lool::sched::{recur, ruleset, Scheduler}; fn my_action() { let now = chrono::Local::now(); println!("I'm running at {}", now.format("%Y-%m-%d %H:%M:%S")); + + std::thread::sleep(std::time::Duration::from_secs(15)); } fn main() { @@ -13,18 +15,15 @@ fn main() { std::thread::sleep(std::time::Duration::from_secs(1)); loop { - { - let task = handler.task.lock().unwrap(); + let is_running = handler.is_running(); + let last_run = handler.get_last_run(); + let next_run = handler.get_next_run(); + let name = handler.name(); - let is_running = task.is_running(); - let last_run = task.get_last_run(); - let name = task.name(); - - println!( - "task {} |--> is running: {}, last run: {:?}", - name, is_running, last_run - ); - } + println!( + "task {name} |--> running: {is_running}, last: {last_run:?}, next: {next_run:?}" + ); + std::thread::sleep(std::time::Duration::from_secs(2)); } diff --git a/lib/sched/scheduler.rs b/lib/sched/scheduler.rs index 1f86c3d..7d814e9 100644 --- a/lib/sched/scheduler.rs +++ b/lib/sched/scheduler.rs @@ -1,5 +1,5 @@ use std::sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering}, Arc, Mutex, }; @@ -21,12 +21,14 @@ type Action = Box; /// status of the task. pub struct ScheduledTask { #[allow(dead_code)] - index: usize, + index: Arc, name: String, action: Action, - rules: Vec, - is_running: AtomicBool, - last_run: Arc>>>, // TODO: remaining limits + rules: Arc>, + is_running: Arc, + is_stopped: Arc, + is_removed: Arc, + last_run: Arc>>, } impl ScheduledTask { @@ -35,17 +37,16 @@ impl ScheduledTask { action(); } - pub fn get_last_run(&self) -> Option> { - let last_run_lock = self.last_run.lock().unwrap(); - last_run_lock.as_ref().cloned() - } - - pub fn is_running(&self) -> bool { - self.is_running.load(Ordering::Relaxed) - } - - pub fn name(&self) -> &str { - &self.name + fn make_handler(&self) -> TaskHandler { + TaskHandler { + index: self.index.clone(), + name: self.name.clone(), + rules: self.rules.clone(), + is_running: self.is_running.clone(), + is_stopped: self.is_stopped.clone(), + is_removed: self.is_removed.clone(), + last_run: self.last_run.clone(), + } } } @@ -113,19 +114,26 @@ impl Scheduler { let index = self.tasks.len(); let task = Arc::new(Mutex::new(ScheduledTask { - index, + index: Arc::new(AtomicUsize::new(index)), name: name.to_string(), action: Box::new(action), - rules, - is_running: AtomicBool::new(false), - last_run: Arc::new(Mutex::new(None)), + rules: Arc::new(rules), + is_running: Arc::new(AtomicBool::new(false)), + is_stopped: Arc::new(AtomicBool::new(false)), + is_removed: Arc::new(AtomicBool::new(false)), + last_run: Arc::new(AtomicPtr::new(std::ptr::null_mut())), })); self.tasks.push(task.clone()); run_in_pool(task.clone(), &self.pool); - TaskHandler { task: task.clone() } + let handler: TaskHandler = { + let task = task.lock().unwrap(); + task.make_handler() + }; + + handler } } @@ -134,14 +142,77 @@ impl Scheduler { /// returned by the `Scheduler::schedule` method, /// this struct can be used to check and control /// the status of the task. +#[derive(Clone)] pub struct TaskHandler { - // HACK: holding the task in the TaskHandler is a temporal hack - // TaskHandler should hold Atomic references to the important parts of the task - // instead. e.g. is_running, last_run, etc. - // the problem is with last_run, as its a DateTime, and not a primitive type - // we could get around this by instead of holding the DateTime, holding the i64 - // value (unix timestamp) and then converting it to a DateTime when needed. - pub task: Arc>, + name: String, + index: Arc, + rules: Arc>, + is_running: Arc, + is_stopped: Arc, + is_removed: Arc, + last_run: Arc>>, +} + +impl TaskHandler { + /// 🧉 » last run date + /// + /// returns a `DateTime` representing the last time the task was run or None if the task + /// has never been run before. + pub fn get_last_run(&self) -> Option> { + let last_run = self.last_run.load(Ordering::Relaxed); + if last_run.is_null() { + None + } else { + Some(unsafe { *last_run }) + } + } + + /// 🧉 » next run date + /// + /// returns a `DateTime` representing the next time the task is scheduled to run + pub fn get_next_run(&self) -> Option> { + get_next_run_time(&self.rules, None) + } + + /// 🧉 » is running? + /// + /// returns a `bool` indicating if the task is currently running in this moment + pub fn is_running(&self) -> bool { + self.is_running.load(Ordering::Relaxed) + } + + /// 🧉 » name + /// + /// returns the name of the task + pub fn name(&self) -> &str { + &self.name + } + + /// 🧉 » is active? + /// + /// returns a `bool` indicating if the task is active + pub fn is_active(&self) -> bool { + !self.is_stopped.load(Ordering::Relaxed) && !self.is_removed.load(Ordering::Relaxed) + } + + /// 🧉 » is stopped? + /// + /// returns a `bool` indicating if the task has been stopped + /// + /// a task being stopped means that it has been paused, but not removed from the scheduler. + /// So, although it's not running, it's still in the scheduler and can be resumed. + pub fn is_stopped(&self) -> bool { + self.is_stopped.load(Ordering::Relaxed) + } + + /// 🧉 » is removed? + /// + /// returns a `bool` indicating if the task has been removed + /// + /// once a task is removed, it's no longer in the scheduler and can't be resumed. + pub fn is_removed(&self) -> bool { + self.is_removed.load(Ordering::Relaxed) + } } /// **main function to run the task in the thread pool** @@ -176,11 +247,16 @@ fn run_in_pool(task_mutex: Arc>, pool: &ThreadPool) { let mut task = task_mutex.lock().unwrap(); - task.last_run = Arc::new(Mutex::new(Some(run_date))); + let run_date_box = Box::new(run_date); + let run_date_raw = Box::into_raw(run_date_box); + task.last_run.store(run_date_raw, Ordering::Relaxed); + task.is_running.store(true, Ordering::SeqCst); task.run(); task.is_running.store(false, Ordering::SeqCst); - maybe_next_run = get_next_run_time(&task.rules, Some(run_date)); + + let run_date_box = unsafe { Box::from_raw(task.last_run.load(Ordering::Relaxed)) }; + maybe_next_run = get_next_run_time(&task.rules, Some(*run_date_box)); } }); }