@@ -18,7 +18,7 @@
This library is for internal use. And as such, it's only published privately.
```bash
-cargo add lool --registry=lugit --features sched
+cargo add lool --registry=lugit --features sched {sub-feature}
```
# Sub-Features
@@ -28,6 +28,10 @@ cargo add lool --registry=lugit --features sched
Enables the `tokio` runtime support, replacing the default behaviour, which implies a `std::thread` pool to run the tasks.
+> [!WARNING]
+>
+> Not yet implemented
+
### sched.rule-recurrent
Enables the "**recurrent-rule**" style for scheduling tasks.
@@ -44,13 +48,6 @@ Enables the "**recurrent-rule**" style for scheduling tasks.
- `sched.rule-pyschedule`: Enables the [python schedule](https://pypi.org/project/schedule/)-like
style for scheduling tasks
-# Usage
-
-
-
-
# Inspiration
This library is inspired by several other libraries, including:
@@ -66,7 +63,7 @@ This library is inspired by several other libraries, including:
a human-friendly API
-Otros:
+Other useful libraries that might come in handy for this project during development are:
- https://crates.io/crates/tokio-cron-scheduler
- https://crates.io/crates/clokwerk
diff --git a/lib/utils/README.md b/lib/utils/README.md
new file mode 100644
index 0000000..371c392
--- /dev/null
+++ b/lib/utils/README.md
@@ -0,0 +1,30 @@
+
+
+
+
+
+
+
+lool » utils contains utilities for some of the most common tasks and operations in the Rust programming language.
+
+
+
+
+
+
+
+# Installation
+
+This library is for internal use. And as such, it's only published privately.
+
+```bash
+cargo add lool --registry=lugit --features utils {sub-feature}?
+```
+
+# Sub-Features
+
+This library is divided into several sub-features, each one providing a different set of utilities. This allows you to only include the parts you need, reducing compile times.
+
+- [x] [`utils.threads`](./threads): utilities for working with threads in Rust.
+
+
\ No newline at end of file
diff --git a/lib/utils/mod.rs b/lib/utils/mod.rs
new file mode 100644
index 0000000..29faf9c
--- /dev/null
+++ b/lib/utils/mod.rs
@@ -0,0 +1,2 @@
+#[cfg(feature = "utils.threads")]
+pub mod threads;
diff --git a/lib/utils/threads/README.md b/lib/utils/threads/README.md
new file mode 100644
index 0000000..710f82e
--- /dev/null
+++ b/lib/utils/threads/README.md
@@ -0,0 +1,72 @@
+
+
+
+
+
+
+
+lool » utils.threads contains utilities for working with threads in Rust.
+
+
+
+
+
+
+# Installation
+
+This library is for internal use. And as such, it's only published privately.
+
+```bash
+cargo add lool --registry=lugit --features utils.threads
+```
+
+# Utilities
+
+## Thread Pool
+
+This is a super basic and lightweight thread pool implementation to use when there's not need for
+a more complex solution.
+
+This is basically, the `ThreadPool` implementation from the book ["The Rust Programming Language" (chapter 20)](https://doc.rust-lang.org/book/ch20-00-final-project-a-web-server.html)
+by Steve Klabnik and Carol Nichols, but with some modifications and additions, inspired by the
+[threadpool](https://crates.io/crates/threadpool) crate.
+
+The `ThreadPool` implemented by the book waits for all threads to finish when the pool is dropped.
+
+This behavior is ok for some cases, for example, for a web server that needs to wait for all
+currently active requests to finish before shutting down. But it might not be the desired behavior
+for other cases (e.g. long-running tasks when we can't wait for a thread to finish).
+
+To keep this library generic enough, the `ThreadPool` implementation here exposes a `join` method
+that explicitly waits for all threads to finish.
+
+### More robust solutions
+
+- [rayon](https://crates.io/crates/rayon): a data parallelism library for Rust.
+- [threadpool](https://crates.io/crates/threadpool): a simple thread pool implementation (quite
+ abandoned).
+
+# Usage
+
+```rust
+use lool::utils::threads::ThreadPool;
+
+fn main() {
+ // create a thread pool with 4 threads
+ let pool = ThreadPool::new(4);
+
+ // spawn a bunch of tasks
+ for i in 0..8 {
+ pool.execute(move || {
+ println!("task {}", i);
+ });
+ }
+
+ // wait for all tasks to finish
+ // unlike the book's implementation, this doesn't happen automatically when the pool is dropped
+ // e.g. when the program ends, so we need to join explicitly
+ pool.join();
+}
+```
+
+Also, see the [`threadpool.rs`](../../../examples/threadpool.rs) example.
diff --git a/lib/utils/threads/mod.rs b/lib/utils/threads/mod.rs
new file mode 100644
index 0000000..9884013
--- /dev/null
+++ b/lib/utils/threads/mod.rs
@@ -0,0 +1 @@
+pub mod threadpool;
diff --git a/lib/utils/threads/threadpool.rs b/lib/utils/threads/threadpool.rs
new file mode 100644
index 0000000..87c776c
--- /dev/null
+++ b/lib/utils/threads/threadpool.rs
@@ -0,0 +1,201 @@
+use {
+ crate::fail,
+ core::fmt,
+ eyre::Result,
+ std::{
+ fmt::{Debug, Formatter},
+ sync::{
+ atomic::{AtomicUsize, Ordering},
+ mpsc::{channel, Receiver, Sender},
+ Arc, Mutex,
+ },
+ thread,
+ },
+};
+
+type Job = Box;
+
+/// status of the thread pool
+struct PoolStatus {
+ /// the number of pending jobs in the queue
+ queued_count: AtomicUsize,
+ /// the number of currently active jobs
+ active_count: AtomicUsize,
+ /// workers count
+ pool_size: usize,
+}
+
+impl PoolStatus {
+ fn create(pool_size: usize) -> PoolStatus {
+ PoolStatus {
+ queued_count: AtomicUsize::new(0),
+ active_count: AtomicUsize::new(0),
+ pool_size,
+ }
+ }
+
+ /// returns `true` if has active work (either queued or running)
+ fn has_work(&self) -> bool {
+ self.queued_count.load(Ordering::SeqCst) > 0 || self.active_count.load(Ordering::SeqCst) > 0
+ }
+}
+
+/// 🧉 » a simple thread pool implementation
+///
+/// this is based on the example from the *"The Rust Programming Language"* book, with some
+/// modifications inspired by the [threadpool](https://crates.io/crates/threadpool) crate.
+///
+/// unlike the original implementation from the book, this one doesn't wait for all jobs to finish
+/// when the pool is dropped. Instead, it's up to the user to call `join` to wait for all jobs to
+/// finish before dropping the pool. This allows the user to stop the pool and drop immediately if
+/// that's what they want
+///
+/// **why?**
+///
+/// the implementation from the book was thought to be used in a web server where it's
+/// important to wait till all active requests are finished before shutting down the server.
+///
+/// In other scenarios, though, this might not be necessary. For example, if we have a long-running
+/// process and we need to shut down the pool, we might want to implement our own logic on how to
+/// handle the shutdown. e.g. We might wait for some jobs, but not all of them, etc.
+pub struct ThreadPool {
+ workers: Vec,
+ job_sender: Option>,
+ status: Arc,
+}
+
+impl ThreadPool {
+ /// 🧉 » creates a new `ThreadPool`.
+ ///
+ /// The `capacity` is the number of threads in the pool.
+ ///
+ /// **Errors**
+ ///
+ /// If the `capacity` is zero, an error is returned.
+ pub fn create(capacity: usize) -> Result {
+ if capacity == 0 {
+ return fail!("ThreadPool size cannot be zero.");
+ }
+
+ let status = Arc::new(PoolStatus::create(capacity));
+
+ let (job_sender, job_receiver) = channel();
+ let receiver = Arc::new(Mutex::new(job_receiver));
+ let mut workers = Vec::with_capacity(capacity);
+
+ for id in 0..capacity {
+ workers.push(Worker::new(id, receiver.clone(), status.clone()));
+ }
+
+ Ok(ThreadPool {
+ job_sender: Some(job_sender),
+ workers,
+ status,
+ })
+ }
+
+ /// 🧉 » queues execution of a task/function in the thread pool
+ ///
+ /// the function `f` will be executed as soon as a worker thread is free.
+ pub fn execute(&self, f: F)
+ where
+ F: FnOnce() + Send + 'static,
+ {
+ if let Some(ref job_sender) = self.job_sender {
+ self.status.queued_count.fetch_add(1, Ordering::SeqCst);
+ job_sender
+ .send(Box::new(f))
+ .expect("ThreadPool::execute unable to send job into queue.");
+ } else {
+ panic!("ThreadPool::execute called, but there's no job_sender... weird!");
+ }
+ }
+
+ /// 🧉 » returns the number of active jobs
+ pub fn active_jobs(&self) -> usize {
+ self.status.active_count.load(Ordering::SeqCst)
+ }
+
+ /// 🧉 » returns the number of queued jobs
+ pub fn queued_jobs(&self) -> usize {
+ self.status.queued_count.load(Ordering::Relaxed)
+ }
+
+ /// 🧉 » returns the pool size
+ pub fn pool_size(&self) -> usize {
+ self.status.pool_size
+ }
+
+ /// 🧉 » returns `true` if the pool has active work (either queued or running)
+ pub fn has_work(&self) -> bool {
+ self.status.has_work()
+ }
+
+ /// 🧉 » waits for all threads to finish their work
+ pub fn join(&mut self) {
+ drop(self.job_sender.take());
+
+ for worker in &mut self.workers {
+ println!("Shutting down worker {}", worker.id);
+
+ if let Some(thread) = worker.thread.take() {
+ thread.join().unwrap();
+ }
+ }
+ }
+}
+
+impl Debug for ThreadPool {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ThreadPool")
+ .field("queued_jobs", &self.queued_jobs())
+ .field("active_jobs", &self.active_jobs())
+ .field("pool_size", &self.pool_size())
+ .finish()
+ }
+}
+
+/// the worker is responsible for executing the jobs on a specific thread.
+///
+/// it holds its own thread and waits for a job to be sent to it from the `ThreadPool::execute` fn.
+/// Once it receives a job, it
+/// executes
+/// it and then waits for another job, until the `job_receiver` is dropped (usually when the pool
+/// is dropped).
+struct Worker {
+ id: usize,
+ thread: Option>,
+}
+
+impl Worker {
+ fn new(
+ id: usize,
+ job_receiver: Arc>>,
+ pool_status: Arc,
+ ) -> Worker {
+ let thread = thread::spawn(move || loop {
+ let message = {
+ // Only lock jobs for the time it takes to get a job, not to run it.
+ let lock = job_receiver.lock().expect("Worker thread unable to lock job_receiver");
+ lock.recv()
+ }; // drops the lock here and now others are free to get another job
+
+ let job = match message {
+ Ok(job) => job,
+ Err(..) => break,
+ };
+
+ pool_status.active_count.fetch_add(1, Ordering::SeqCst);
+ pool_status.queued_count.fetch_sub(1, Ordering::SeqCst);
+
+ job();
+
+ pool_status.active_count.fetch_sub(1, Ordering::SeqCst);
+ });
+
+ Worker {
+ id,
+ thread: Some(thread),
+ }
+ }
+}