feat(sched): proper task handlers

This commit is contained in:
Lucas Colombo 2024-04-10 21:44:20 -03:00
parent 0f682a0600
commit 5b8a9faaba
Signed by: lucas
GPG Key ID: EF34786CFEFFAE35
2 changed files with 116 additions and 41 deletions

View File

@ -3,6 +3,8 @@ use lool::sched::{recur, ruleset, Scheduler};
fn my_action() {
let now = chrono::Local::now();
println!("I'm running at {}", now.format("%Y-%m-%d %H:%M:%S"));
std::thread::sleep(std::time::Duration::from_secs(15));
}
fn main() {
@ -13,18 +15,15 @@ fn main() {
std::thread::sleep(std::time::Duration::from_secs(1));
loop {
{
let task = handler.task.lock().unwrap();
let is_running = handler.is_running();
let last_run = handler.get_last_run();
let next_run = handler.get_next_run();
let name = handler.name();
let is_running = task.is_running();
let last_run = task.get_last_run();
let name = task.name();
println!(
"task {} |--> is running: {}, last run: {:?}",
name, is_running, last_run
);
}
println!(
"task {name} |--> running: {is_running}, last: {last_run:?}, next: {next_run:?}"
);
std::thread::sleep(std::time::Duration::from_secs(2));
}

View File

@ -1,5 +1,5 @@
use std::sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering},
Arc, Mutex,
};
@ -21,12 +21,14 @@ type Action = Box<dyn FnMut() + Send + Sync + 'static>;
/// status of the task.
pub struct ScheduledTask {
#[allow(dead_code)]
index: usize,
index: Arc<AtomicUsize>,
name: String,
action: Action,
rules: Vec<SchedulingRule>,
is_running: AtomicBool,
last_run: Arc<Mutex<Option<DateTime<Local>>>>, // TODO: remaining limits
rules: Arc<Vec<SchedulingRule>>,
is_running: Arc<AtomicBool>,
is_stopped: Arc<AtomicBool>,
is_removed: Arc<AtomicBool>,
last_run: Arc<AtomicPtr<DateTime<Local>>>,
}
impl ScheduledTask {
@ -35,17 +37,16 @@ impl ScheduledTask {
action();
}
pub fn get_last_run(&self) -> Option<DateTime<Local>> {
let last_run_lock = self.last_run.lock().unwrap();
last_run_lock.as_ref().cloned()
}
pub fn is_running(&self) -> bool {
self.is_running.load(Ordering::Relaxed)
}
pub fn name(&self) -> &str {
&self.name
fn make_handler(&self) -> TaskHandler {
TaskHandler {
index: self.index.clone(),
name: self.name.clone(),
rules: self.rules.clone(),
is_running: self.is_running.clone(),
is_stopped: self.is_stopped.clone(),
is_removed: self.is_removed.clone(),
last_run: self.last_run.clone(),
}
}
}
@ -113,19 +114,26 @@ impl Scheduler {
let index = self.tasks.len();
let task = Arc::new(Mutex::new(ScheduledTask {
index,
index: Arc::new(AtomicUsize::new(index)),
name: name.to_string(),
action: Box::new(action),
rules,
is_running: AtomicBool::new(false),
last_run: Arc::new(Mutex::new(None)),
rules: Arc::new(rules),
is_running: Arc::new(AtomicBool::new(false)),
is_stopped: Arc::new(AtomicBool::new(false)),
is_removed: Arc::new(AtomicBool::new(false)),
last_run: Arc::new(AtomicPtr::new(std::ptr::null_mut())),
}));
self.tasks.push(task.clone());
run_in_pool(task.clone(), &self.pool);
TaskHandler { task: task.clone() }
let handler: TaskHandler = {
let task = task.lock().unwrap();
task.make_handler()
};
handler
}
}
@ -134,14 +142,77 @@ impl Scheduler {
/// returned by the `Scheduler::schedule` method,
/// this struct can be used to check and control
/// the status of the task.
#[derive(Clone)]
pub struct TaskHandler {
// HACK: holding the task in the TaskHandler is a temporal hack
// TaskHandler should hold Atomic references to the important parts of the task
// instead. e.g. is_running, last_run, etc.
// the problem is with last_run, as its a DateTime, and not a primitive type
// we could get around this by instead of holding the DateTime, holding the i64
// value (unix timestamp) and then converting it to a DateTime when needed.
pub task: Arc<Mutex<ScheduledTask>>,
name: String,
index: Arc<AtomicUsize>,
rules: Arc<Vec<SchedulingRule>>,
is_running: Arc<AtomicBool>,
is_stopped: Arc<AtomicBool>,
is_removed: Arc<AtomicBool>,
last_run: Arc<AtomicPtr<DateTime<Local>>>,
}
impl TaskHandler {
/// 🧉 » last run date
///
/// returns a `DateTime<Local>` representing the last time the task was run or None if the task
/// has never been run before.
pub fn get_last_run(&self) -> Option<DateTime<Local>> {
let last_run = self.last_run.load(Ordering::Relaxed);
if last_run.is_null() {
None
} else {
Some(unsafe { *last_run })
}
}
/// 🧉 » next run date
///
/// returns a `DateTime<Local>` representing the next time the task is scheduled to run
pub fn get_next_run(&self) -> Option<DateTime<Local>> {
get_next_run_time(&self.rules, None)
}
/// 🧉 » is running?
///
/// returns a `bool` indicating if the task is currently running in this moment
pub fn is_running(&self) -> bool {
self.is_running.load(Ordering::Relaxed)
}
/// 🧉 » name
///
/// returns the name of the task
pub fn name(&self) -> &str {
&self.name
}
/// 🧉 » is active?
///
/// returns a `bool` indicating if the task is active
pub fn is_active(&self) -> bool {
!self.is_stopped.load(Ordering::Relaxed) && !self.is_removed.load(Ordering::Relaxed)
}
/// 🧉 » is stopped?
///
/// returns a `bool` indicating if the task has been stopped
///
/// a task being stopped means that it has been paused, but not removed from the scheduler.
/// So, although it's not running, it's still in the scheduler and can be resumed.
pub fn is_stopped(&self) -> bool {
self.is_stopped.load(Ordering::Relaxed)
}
/// 🧉 » is removed?
///
/// returns a `bool` indicating if the task has been removed
///
/// once a task is removed, it's no longer in the scheduler and can't be resumed.
pub fn is_removed(&self) -> bool {
self.is_removed.load(Ordering::Relaxed)
}
}
/// **main function to run the task in the thread pool**
@ -176,11 +247,16 @@ fn run_in_pool(task_mutex: Arc<Mutex<ScheduledTask>>, pool: &ThreadPool) {
let mut task = task_mutex.lock().unwrap();
task.last_run = Arc::new(Mutex::new(Some(run_date)));
let run_date_box = Box::new(run_date);
let run_date_raw = Box::into_raw(run_date_box);
task.last_run.store(run_date_raw, Ordering::Relaxed);
task.is_running.store(true, Ordering::SeqCst);
task.run();
task.is_running.store(false, Ordering::SeqCst);
maybe_next_run = get_next_run_time(&task.rules, Some(run_date));
let run_date_box = unsafe { Box::from_raw(task.last_run.load(Ordering::Relaxed)) };
maybe_next_run = get_next_run_time(&task.rules, Some(*run_date_box));
}
});
}