feat(sched): ✨ use thread instead of pool
This commit is contained in:
parent
5b8a9faaba
commit
2d3fcbaabd
@ -1,4 +1,11 @@
|
|||||||
use lool::sched::{recur, ruleset, Scheduler};
|
use {
|
||||||
|
eyre::{set_hook, DefaultHandler},
|
||||||
|
lool::sched::{recur, ruleset, Scheduler},
|
||||||
|
};
|
||||||
|
|
||||||
|
fn setup_eyre() {
|
||||||
|
let _ = set_hook(Box::new(DefaultHandler::default_with));
|
||||||
|
}
|
||||||
|
|
||||||
fn my_action() {
|
fn my_action() {
|
||||||
let now = chrono::Local::now();
|
let now = chrono::Local::now();
|
||||||
@ -8,6 +15,8 @@ fn my_action() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
|
setup_eyre();
|
||||||
|
|
||||||
let mut sched = Scheduler::new();
|
let mut sched = Scheduler::new();
|
||||||
|
|
||||||
let handler = sched.schedule("test-task", my_action, recur(ruleset().at_second(0)));
|
let handler = sched.schedule("test-task", my_action, recur(ruleset().at_second(0)));
|
||||||
@ -20,11 +29,16 @@ fn main() {
|
|||||||
let next_run = handler.get_next_run();
|
let next_run = handler.get_next_run();
|
||||||
let name = handler.name();
|
let name = handler.name();
|
||||||
|
|
||||||
println!(
|
println!("task {name} |--> running: {is_running}, last: {last_run:?}, next: {next_run:?}");
|
||||||
"task {name} |--> running: {is_running}, last: {last_run:?}, next: {next_run:?}"
|
|
||||||
);
|
|
||||||
|
|
||||||
|
std::thread::sleep(std::time::Duration::from_secs(60));
|
||||||
|
|
||||||
std::thread::sleep(std::time::Duration::from_secs(2));
|
let result = sched.remove(&handler);
|
||||||
|
|
||||||
|
if result.is_ok() {
|
||||||
|
println!("task {name} removed");
|
||||||
|
} else {
|
||||||
|
println!("task {name} not present in the scheduler");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,11 +1,16 @@
|
|||||||
use std::sync::{
|
use std::{
|
||||||
atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering},
|
collections::HashMap,
|
||||||
Arc, Mutex,
|
sync::{
|
||||||
|
atomic::{AtomicBool, AtomicPtr, Ordering},
|
||||||
|
Arc, Mutex,
|
||||||
|
},
|
||||||
|
thread::{self, JoinHandle},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use eyre::{eyre, Result};
|
||||||
|
|
||||||
use {
|
use {
|
||||||
super::SchedulingRule,
|
super::SchedulingRule,
|
||||||
crate::utils::threads::threadpool::ThreadPool,
|
|
||||||
chrono::{DateTime, Local},
|
chrono::{DateTime, Local},
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -21,7 +26,6 @@ type Action = Box<dyn FnMut() + Send + Sync + 'static>;
|
|||||||
/// status of the task.
|
/// status of the task.
|
||||||
pub struct ScheduledTask {
|
pub struct ScheduledTask {
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
index: Arc<AtomicUsize>,
|
|
||||||
name: String,
|
name: String,
|
||||||
action: Action,
|
action: Action,
|
||||||
rules: Arc<Vec<SchedulingRule>>,
|
rules: Arc<Vec<SchedulingRule>>,
|
||||||
@ -39,7 +43,6 @@ impl ScheduledTask {
|
|||||||
|
|
||||||
fn make_handler(&self) -> TaskHandler {
|
fn make_handler(&self) -> TaskHandler {
|
||||||
TaskHandler {
|
TaskHandler {
|
||||||
index: self.index.clone(),
|
|
||||||
name: self.name.clone(),
|
name: self.name.clone(),
|
||||||
rules: self.rules.clone(),
|
rules: self.rules.clone(),
|
||||||
is_running: self.is_running.clone(),
|
is_running: self.is_running.clone(),
|
||||||
@ -48,6 +51,14 @@ impl ScheduledTask {
|
|||||||
last_run: self.last_run.clone(),
|
last_run: self.last_run.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn is_active(&self) -> bool {
|
||||||
|
!self.is_stopped.load(Ordering::Relaxed) && !self.is_removed.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_removed(&self) -> bool {
|
||||||
|
self.is_removed.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// 🧉 » a task scheduler.
|
/// 🧉 » a task scheduler.
|
||||||
@ -57,8 +68,7 @@ impl ScheduledTask {
|
|||||||
///
|
///
|
||||||
/// Each task can have n rules, and the task will be executed when any of the rules is met.
|
/// Each task can have n rules, and the task will be executed when any of the rules is met.
|
||||||
pub struct Scheduler {
|
pub struct Scheduler {
|
||||||
pool: ThreadPool,
|
tasks: HashMap<String, Arc<Mutex<ScheduledTask>>>,
|
||||||
tasks: Vec<Arc<Mutex<ScheduledTask>>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Scheduler {
|
impl Default for Scheduler {
|
||||||
@ -69,23 +79,9 @@ impl Default for Scheduler {
|
|||||||
|
|
||||||
impl Scheduler {
|
impl Scheduler {
|
||||||
/// 🧉 » create a new scheduler
|
/// 🧉 » create a new scheduler
|
||||||
///
|
|
||||||
/// default constructor, sets the internal thread pool to have 5 threads at most.
|
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
tasks: vec![],
|
tasks: HashMap::new(),
|
||||||
pool: ThreadPool::create(5).unwrap(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// 🧉 » create a new scheduler
|
|
||||||
///
|
|
||||||
/// creates a new scheduler, just like `Scheduler::new`, but with a specific capacity for the
|
|
||||||
/// internal thread pool.
|
|
||||||
pub fn with_capacity(capacity: usize) -> Self {
|
|
||||||
Self {
|
|
||||||
tasks: vec![],
|
|
||||||
pool: ThreadPool::create(capacity).unwrap(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -111,10 +107,7 @@ impl Scheduler {
|
|||||||
where
|
where
|
||||||
F: FnMut() + Send + Sync + 'static,
|
F: FnMut() + Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
let index = self.tasks.len();
|
|
||||||
|
|
||||||
let task = Arc::new(Mutex::new(ScheduledTask {
|
let task = Arc::new(Mutex::new(ScheduledTask {
|
||||||
index: Arc::new(AtomicUsize::new(index)),
|
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
action: Box::new(action),
|
action: Box::new(action),
|
||||||
rules: Arc::new(rules),
|
rules: Arc::new(rules),
|
||||||
@ -124,9 +117,10 @@ impl Scheduler {
|
|||||||
last_run: Arc::new(AtomicPtr::new(std::ptr::null_mut())),
|
last_run: Arc::new(AtomicPtr::new(std::ptr::null_mut())),
|
||||||
}));
|
}));
|
||||||
|
|
||||||
self.tasks.push(task.clone());
|
self.tasks.insert(name.to_string(), task.clone());
|
||||||
|
|
||||||
run_in_pool(task.clone(), &self.pool);
|
// launch the task in its own thread
|
||||||
|
spawn_task(task.clone());
|
||||||
|
|
||||||
let handler: TaskHandler = {
|
let handler: TaskHandler = {
|
||||||
let task = task.lock().unwrap();
|
let task = task.lock().unwrap();
|
||||||
@ -135,6 +129,59 @@ impl Scheduler {
|
|||||||
|
|
||||||
handler
|
handler
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// 🧉 » stop a task
|
||||||
|
pub fn stop(&mut self, handler: &TaskHandler) -> Result<()> {
|
||||||
|
// get the task from the tasks map
|
||||||
|
let task = self.tasks.get(handler.name());
|
||||||
|
|
||||||
|
if let Some(task) = task {
|
||||||
|
if let Ok(task) = task.lock() {
|
||||||
|
task.is_stopped.store(true, Ordering::Relaxed);
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(eyre!("error stopping task {}", handler.name()))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Err(eyre!("task {} was not found", handler.name()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 🧉 » resume a task
|
||||||
|
pub fn resume(&mut self, handler: &TaskHandler) -> Result<()> {
|
||||||
|
// get the task from the tasks map
|
||||||
|
let task = self.tasks.get(handler.name());
|
||||||
|
|
||||||
|
if let Some(task) = task {
|
||||||
|
if let Ok(task) = task.lock() {
|
||||||
|
task.is_stopped.store(false, Ordering::Relaxed);
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(eyre!("error resuming task {}", handler.name()))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Err(eyre!("task {} was not found", handler.name()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 🧉 » remove a task
|
||||||
|
pub fn remove(&mut self, handler: &TaskHandler) -> Result<()> {
|
||||||
|
// get the task from the tasks map
|
||||||
|
let task = self.tasks.remove(handler.name());
|
||||||
|
|
||||||
|
if let Some(task) = task {
|
||||||
|
if let Ok(task) = &mut task.lock() {
|
||||||
|
task.is_removed.store(true, Ordering::Relaxed);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(eyre!("error removing task {}", handler.name()))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
handler.is_removed.store(true, Ordering::Relaxed);
|
||||||
|
Err(eyre!("task {} was not found", handler.name()))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// 🧉 » task handler
|
/// 🧉 » task handler
|
||||||
@ -145,7 +192,6 @@ impl Scheduler {
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct TaskHandler {
|
pub struct TaskHandler {
|
||||||
name: String,
|
name: String,
|
||||||
index: Arc<AtomicUsize>,
|
|
||||||
rules: Arc<Vec<SchedulingRule>>,
|
rules: Arc<Vec<SchedulingRule>>,
|
||||||
is_running: Arc<AtomicBool>,
|
is_running: Arc<AtomicBool>,
|
||||||
is_stopped: Arc<AtomicBool>,
|
is_stopped: Arc<AtomicBool>,
|
||||||
@ -171,7 +217,11 @@ impl TaskHandler {
|
|||||||
///
|
///
|
||||||
/// returns a `DateTime<Local>` representing the next time the task is scheduled to run
|
/// returns a `DateTime<Local>` representing the next time the task is scheduled to run
|
||||||
pub fn get_next_run(&self) -> Option<DateTime<Local>> {
|
pub fn get_next_run(&self) -> Option<DateTime<Local>> {
|
||||||
get_next_run_time(&self.rules, None)
|
if self.is_active() {
|
||||||
|
return get_next_run_time(&self.rules, None);
|
||||||
|
}
|
||||||
|
|
||||||
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
/// 🧉 » is running?
|
/// 🧉 » is running?
|
||||||
@ -215,12 +265,11 @@ impl TaskHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// **main function to run the task in the thread pool**
|
/// **main function to run the task in its own thread**
|
||||||
///
|
///
|
||||||
/// it spawns a new job in the thread pool to run the task until the task is no longer scheduled to
|
/// it creates a new thread and runs the task according to its scheduling rules.
|
||||||
/// run.
|
fn spawn_task(task_mutex: Arc<Mutex<ScheduledTask>>) -> JoinHandle<()> {
|
||||||
fn run_in_pool(task_mutex: Arc<Mutex<ScheduledTask>>, pool: &ThreadPool) {
|
let thread = thread::spawn(move || {
|
||||||
pool.execute(move || {
|
|
||||||
let (mut maybe_next_run, name) = {
|
let (mut maybe_next_run, name) = {
|
||||||
let task = task_mutex.lock().unwrap();
|
let task = task_mutex.lock().unwrap();
|
||||||
let rules = &task.rules;
|
let rules = &task.rules;
|
||||||
@ -247,18 +296,25 @@ fn run_in_pool(task_mutex: Arc<Mutex<ScheduledTask>>, pool: &ThreadPool) {
|
|||||||
|
|
||||||
let mut task = task_mutex.lock().unwrap();
|
let mut task = task_mutex.lock().unwrap();
|
||||||
|
|
||||||
let run_date_box = Box::new(run_date);
|
if task.is_active() {
|
||||||
let run_date_raw = Box::into_raw(run_date_box);
|
let run_date_box = Box::new(run_date);
|
||||||
task.last_run.store(run_date_raw, Ordering::Relaxed);
|
let run_date_raw = Box::into_raw(run_date_box);
|
||||||
|
|
||||||
task.is_running.store(true, Ordering::SeqCst);
|
task.last_run.store(run_date_raw, Ordering::Relaxed);
|
||||||
task.run();
|
task.is_running.store(true, Ordering::SeqCst);
|
||||||
task.is_running.store(false, Ordering::SeqCst);
|
task.run();
|
||||||
|
task.is_running.store(false, Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
|
||||||
let run_date_box = unsafe { Box::from_raw(task.last_run.load(Ordering::Relaxed)) };
|
if !task.is_removed() {
|
||||||
maybe_next_run = get_next_run_time(&task.rules, Some(*run_date_box));
|
maybe_next_run = get_next_run_time(&task.rules, Some(run_date));
|
||||||
|
} else {
|
||||||
|
maybe_next_run = None;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
thread
|
||||||
}
|
}
|
||||||
|
|
||||||
/// **get next run time**
|
/// **get next run time**
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user