From 2d3fcbaabd6aa6e16fff0e79c1c503554b281408 Mon Sep 17 00:00:00 2001 From: Lucas Colombo Date: Thu, 11 Apr 2024 00:15:12 -0300 Subject: [PATCH] =?UTF-8?q?feat(sched):=20=E2=9C=A8=20use=20thread=20inste?= =?UTF-8?q?ad=20of=20pool?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/sched.rs | 26 ++++++-- lib/sched/scheduler.rs | 142 ++++++++++++++++++++++++++++------------- 2 files changed, 119 insertions(+), 49 deletions(-) diff --git a/examples/sched.rs b/examples/sched.rs index 8070c94..cc291e1 100644 --- a/examples/sched.rs +++ b/examples/sched.rs @@ -1,4 +1,11 @@ -use lool::sched::{recur, ruleset, Scheduler}; +use { + eyre::{set_hook, DefaultHandler}, + lool::sched::{recur, ruleset, Scheduler}, +}; + +fn setup_eyre() { + let _ = set_hook(Box::new(DefaultHandler::default_with)); +} fn my_action() { let now = chrono::Local::now(); @@ -8,6 +15,8 @@ fn my_action() { } fn main() { + setup_eyre(); + let mut sched = Scheduler::new(); let handler = sched.schedule("test-task", my_action, recur(ruleset().at_second(0))); @@ -20,11 +29,16 @@ fn main() { let next_run = handler.get_next_run(); let name = handler.name(); - println!( - "task {name} |--> running: {is_running}, last: {last_run:?}, next: {next_run:?}" - ); - + println!("task {name} |--> running: {is_running}, last: {last_run:?}, next: {next_run:?}"); - std::thread::sleep(std::time::Duration::from_secs(2)); + std::thread::sleep(std::time::Duration::from_secs(60)); + + let result = sched.remove(&handler); + + if result.is_ok() { + println!("task {name} removed"); + } else { + println!("task {name} not present in the scheduler"); + } } } diff --git a/lib/sched/scheduler.rs b/lib/sched/scheduler.rs index 7d814e9..5908f5d 100644 --- a/lib/sched/scheduler.rs +++ b/lib/sched/scheduler.rs @@ -1,11 +1,16 @@ -use std::sync::{ - atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering}, - Arc, Mutex, +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, AtomicPtr, Ordering}, + Arc, Mutex, + }, + thread::{self, JoinHandle}, }; +use eyre::{eyre, Result}; + use { super::SchedulingRule, - crate::utils::threads::threadpool::ThreadPool, chrono::{DateTime, Local}, }; @@ -21,7 +26,6 @@ type Action = Box; /// status of the task. pub struct ScheduledTask { #[allow(dead_code)] - index: Arc, name: String, action: Action, rules: Arc>, @@ -39,7 +43,6 @@ impl ScheduledTask { fn make_handler(&self) -> TaskHandler { TaskHandler { - index: self.index.clone(), name: self.name.clone(), rules: self.rules.clone(), is_running: self.is_running.clone(), @@ -48,6 +51,14 @@ impl ScheduledTask { last_run: self.last_run.clone(), } } + + fn is_active(&self) -> bool { + !self.is_stopped.load(Ordering::Relaxed) && !self.is_removed.load(Ordering::Relaxed) + } + + fn is_removed(&self) -> bool { + self.is_removed.load(Ordering::Relaxed) + } } /// 🧉 » a task scheduler. @@ -57,8 +68,7 @@ impl ScheduledTask { /// /// Each task can have n rules, and the task will be executed when any of the rules is met. pub struct Scheduler { - pool: ThreadPool, - tasks: Vec>>, + tasks: HashMap>>, } impl Default for Scheduler { @@ -69,23 +79,9 @@ impl Default for Scheduler { impl Scheduler { /// 🧉 » create a new scheduler - /// - /// default constructor, sets the internal thread pool to have 5 threads at most. pub fn new() -> Self { Self { - tasks: vec![], - pool: ThreadPool::create(5).unwrap(), - } - } - - /// 🧉 » create a new scheduler - /// - /// creates a new scheduler, just like `Scheduler::new`, but with a specific capacity for the - /// internal thread pool. - pub fn with_capacity(capacity: usize) -> Self { - Self { - tasks: vec![], - pool: ThreadPool::create(capacity).unwrap(), + tasks: HashMap::new(), } } @@ -111,10 +107,7 @@ impl Scheduler { where F: FnMut() + Send + Sync + 'static, { - let index = self.tasks.len(); - let task = Arc::new(Mutex::new(ScheduledTask { - index: Arc::new(AtomicUsize::new(index)), name: name.to_string(), action: Box::new(action), rules: Arc::new(rules), @@ -124,9 +117,10 @@ impl Scheduler { last_run: Arc::new(AtomicPtr::new(std::ptr::null_mut())), })); - self.tasks.push(task.clone()); + self.tasks.insert(name.to_string(), task.clone()); - run_in_pool(task.clone(), &self.pool); + // launch the task in its own thread + spawn_task(task.clone()); let handler: TaskHandler = { let task = task.lock().unwrap(); @@ -135,6 +129,59 @@ impl Scheduler { handler } + + /// 🧉 » stop a task + pub fn stop(&mut self, handler: &TaskHandler) -> Result<()> { + // get the task from the tasks map + let task = self.tasks.get(handler.name()); + + if let Some(task) = task { + if let Ok(task) = task.lock() { + task.is_stopped.store(true, Ordering::Relaxed); + Ok(()) + } else { + Err(eyre!("error stopping task {}", handler.name())) + } + } else { + Err(eyre!("task {} was not found", handler.name())) + } + } + + /// 🧉 » resume a task + pub fn resume(&mut self, handler: &TaskHandler) -> Result<()> { + // get the task from the tasks map + let task = self.tasks.get(handler.name()); + + if let Some(task) = task { + if let Ok(task) = task.lock() { + task.is_stopped.store(false, Ordering::Relaxed); + Ok(()) + } else { + Err(eyre!("error resuming task {}", handler.name())) + } + } else { + Err(eyre!("task {} was not found", handler.name())) + } + } + + /// 🧉 » remove a task + pub fn remove(&mut self, handler: &TaskHandler) -> Result<()> { + // get the task from the tasks map + let task = self.tasks.remove(handler.name()); + + if let Some(task) = task { + if let Ok(task) = &mut task.lock() { + task.is_removed.store(true, Ordering::Relaxed); + + Ok(()) + } else { + Err(eyre!("error removing task {}", handler.name())) + } + } else { + handler.is_removed.store(true, Ordering::Relaxed); + Err(eyre!("task {} was not found", handler.name())) + } + } } /// 🧉 » task handler @@ -145,7 +192,6 @@ impl Scheduler { #[derive(Clone)] pub struct TaskHandler { name: String, - index: Arc, rules: Arc>, is_running: Arc, is_stopped: Arc, @@ -171,7 +217,11 @@ impl TaskHandler { /// /// returns a `DateTime` representing the next time the task is scheduled to run pub fn get_next_run(&self) -> Option> { - get_next_run_time(&self.rules, None) + if self.is_active() { + return get_next_run_time(&self.rules, None); + } + + None } /// 🧉 » is running? @@ -215,12 +265,11 @@ impl TaskHandler { } } -/// **main function to run the task in the thread pool** +/// **main function to run the task in its own thread** /// -/// it spawns a new job in the thread pool to run the task until the task is no longer scheduled to -/// run. -fn run_in_pool(task_mutex: Arc>, pool: &ThreadPool) { - pool.execute(move || { +/// it creates a new thread and runs the task according to its scheduling rules. +fn spawn_task(task_mutex: Arc>) -> JoinHandle<()> { + let thread = thread::spawn(move || { let (mut maybe_next_run, name) = { let task = task_mutex.lock().unwrap(); let rules = &task.rules; @@ -247,18 +296,25 @@ fn run_in_pool(task_mutex: Arc>, pool: &ThreadPool) { let mut task = task_mutex.lock().unwrap(); - let run_date_box = Box::new(run_date); - let run_date_raw = Box::into_raw(run_date_box); - task.last_run.store(run_date_raw, Ordering::Relaxed); + if task.is_active() { + let run_date_box = Box::new(run_date); + let run_date_raw = Box::into_raw(run_date_box); - task.is_running.store(true, Ordering::SeqCst); - task.run(); - task.is_running.store(false, Ordering::SeqCst); + task.last_run.store(run_date_raw, Ordering::Relaxed); + task.is_running.store(true, Ordering::SeqCst); + task.run(); + task.is_running.store(false, Ordering::SeqCst); + } - let run_date_box = unsafe { Box::from_raw(task.last_run.load(Ordering::Relaxed)) }; - maybe_next_run = get_next_run_time(&task.rules, Some(*run_date_box)); + if !task.is_removed() { + maybe_next_run = get_next_run_time(&task.rules, Some(run_date)); + } else { + maybe_next_run = None; + } } }); + + thread } /// **get next run time**