From c16a52e27e4d751509b90067fca735f8dadebf11 Mon Sep 17 00:00:00 2001 From: Lucas Colombo Date: Tue, 9 Apr 2024 01:56:50 -0300 Subject: [PATCH] =?UTF-8?q?feat(utils):=20=E2=9C=A8=20threads:=20implement?= =?UTF-8?q?=20threadpool?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .cocorc | 1 + .../{logo-tokio-sched.svg => logo-sched.svg} | 0 .github/img/logo-utils-threads.svg | 22 ++ .github/img/logo-utils.svg | 13 ++ Cargo.toml | 9 +- README.md | 2 +- Taskfile.yaml | 10 + examples/threadpool.rs | 32 +++ lib/cli/README.md | 4 +- lib/cli/stylize/README.md | 2 +- lib/lib.rs | 3 + lib/logger/README.md | 2 +- lib/macros/README.md | 2 +- lib/sched/README.md | 17 +- lib/utils/README.md | 30 +++ lib/utils/mod.rs | 2 + lib/utils/threads/README.md | 72 +++++++ lib/utils/threads/mod.rs | 1 + lib/utils/threads/threadpool.rs | 201 ++++++++++++++++++ 19 files changed, 406 insertions(+), 19 deletions(-) rename .github/img/{logo-tokio-sched.svg => logo-sched.svg} (100%) create mode 100644 .github/img/logo-utils-threads.svg create mode 100644 .github/img/logo-utils.svg create mode 100644 examples/threadpool.rs create mode 100644 lib/utils/README.md create mode 100644 lib/utils/mod.rs create mode 100644 lib/utils/threads/README.md create mode 100644 lib/utils/threads/mod.rs create mode 100644 lib/utils/threads/threadpool.rs diff --git a/.cocorc b/.cocorc index 9a68399..e94fe28 100644 --- a/.cocorc +++ b/.cocorc @@ -8,3 +8,4 @@ scopes: - "logger" - "macros" - "sched" + - "utils" diff --git a/.github/img/logo-tokio-sched.svg b/.github/img/logo-sched.svg similarity index 100% rename from .github/img/logo-tokio-sched.svg rename to .github/img/logo-sched.svg diff --git a/.github/img/logo-utils-threads.svg b/.github/img/logo-utils-threads.svg new file mode 100644 index 0000000..ae1c026 --- /dev/null +++ b/.github/img/logo-utils-threads.svg @@ -0,0 +1,22 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.github/img/logo-utils.svg b/.github/img/logo-utils.svg new file mode 100644 index 0000000..ffc90e8 --- /dev/null +++ b/.github/img/logo-utils.svg @@ -0,0 +1,13 @@ + + + + + + + + \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index d8e3655..227029b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,9 +29,12 @@ path = "lib/lib.rs" "macros" = [] # scheduling "sched" = ["dep:chrono"] -"sched.tokio" = ["dep:tokio", "tokio?/time", "tokio?/rt"] -"sched.rule-recurrence" = [] -"sched.rule-cron" = [] +"sched.tokio" = ["dep:tokio", "tokio?/time", "tokio?/rt", "sched"] +"sched.rule-recurrence" = ["sched"] +"sched.rule-cron" = ["sched"] +# utils +"utils" = [] +"utils.threads" = ["macros", "utils", "dep:log"] [dependencies] diff --git a/README.md b/README.md index 467d2d4..fb57a38 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -

+



diff --git a/Taskfile.yaml b/Taskfile.yaml index 86afe27..150adaf 100644 --- a/Taskfile.yaml +++ b/Taskfile.yaml @@ -19,6 +19,16 @@ tasks: - cargo build --release - python check_size.py + example:logger: + desc: 🚀 run lool «example logger» + cmds: + - cargo run --example=logger --release --features logger + + example:threadpool: + desc: 🚀 run lool «example threadpool» + cmds: + - cargo run --example=threadpool --release --features utils.threads + fmt: desc: 🎨 format lool cmds: diff --git a/examples/threadpool.rs b/examples/threadpool.rs new file mode 100644 index 0000000..60c21d2 --- /dev/null +++ b/examples/threadpool.rs @@ -0,0 +1,32 @@ +use { + eyre::{Ok, Result}, + lool::utils::threads::threadpool::ThreadPool, +}; + +fn job(id: usize) { + // wait for a while + std::thread::sleep(std::time::Duration::from_secs(1)); + println!("job {}", id); +} + +fn main() -> Result<()> { + let pool = ThreadPool::create(4)?; + + for i in 0..10 { + pool.execute(move || job(i)); + } + + // wait for all jobs to finish + // another way to do this is to call `pool.join()` but I want to log stuff in the meantime + loop { + println!("{:?}", pool); + if !pool.has_work() { + break; + } + + // wait for a while + std::thread::sleep(std::time::Duration::from_secs(1)); + } + + Ok(()) +} diff --git a/lib/cli/README.md b/lib/cli/README.md index 7b90cd9..7a5cc5c 100644 --- a/lib/cli/README.md +++ b/lib/cli/README.md @@ -1,4 +1,4 @@ -

+



@@ -16,7 +16,7 @@ This crate is for internal use. It's only published privately. ```bash -cargo add lool --registry=lugit --features cli +cargo add lool --registry=lugit --features cli {sub-feature} ``` # Sub-Features diff --git a/lib/cli/stylize/README.md b/lib/cli/stylize/README.md index 1a20a7d..5db580c 100644 --- a/lib/cli/stylize/README.md +++ b/lib/cli/stylize/README.md @@ -1,4 +1,4 @@ -

+



diff --git a/lib/lib.rs b/lib/lib.rs index e793ca8..ca88306 100644 --- a/lib/lib.rs +++ b/lib/lib.rs @@ -9,3 +9,6 @@ pub mod logger; #[cfg(feature = "macros")] pub mod macros; + +#[cfg(feature = "utils")] +pub mod utils; diff --git a/lib/logger/README.md b/lib/logger/README.md index b76f03c..8a07193 100644 --- a/lib/logger/README.md +++ b/lib/logger/README.md @@ -1,4 +1,4 @@ -

+



diff --git a/lib/macros/README.md b/lib/macros/README.md index dd1b2e3..82c3041 100644 --- a/lib/macros/README.md +++ b/lib/macros/README.md @@ -1,4 +1,4 @@ -

+



diff --git a/lib/sched/README.md b/lib/sched/README.md index f814b5d..c963b43 100644 --- a/lib/sched/README.md +++ b/lib/sched/README.md @@ -1,4 +1,4 @@ -

+



@@ -18,7 +18,7 @@ This library is for internal use. And as such, it's only published privately. ```bash -cargo add lool --registry=lugit --features sched +cargo add lool --registry=lugit --features sched {sub-feature} ``` # Sub-Features @@ -28,6 +28,10 @@ cargo add lool --registry=lugit --features sched Enables the `tokio` runtime support, replacing the default behaviour, which implies a `std::thread` pool to run the tasks. +> [!WARNING] +> +> Not yet implemented + ### has subfeatures  sched.rule-recurrent Enables the "**recurrent-rule**" style for scheduling tasks. @@ -44,13 +48,6 @@ Enables the "**recurrent-rule**" style for scheduling tasks. - `sched.rule-pyschedule`: Enables the [python schedule](https://pypi.org/project/schedule/)-like style for scheduling tasks -# Usage - - - - # Inspiration This library is inspired by several other libraries, including: @@ -66,7 +63,7 @@ This library is inspired by several other libraries, including: a human-friendly API -Otros: +Other useful libraries that might come in handy for this project during development are: - https://crates.io/crates/tokio-cron-scheduler - https://crates.io/crates/clokwerk diff --git a/lib/utils/README.md b/lib/utils/README.md new file mode 100644 index 0000000..371c392 --- /dev/null +++ b/lib/utils/README.md @@ -0,0 +1,30 @@ +

+ +
+
+
+ +

+lool » utils contains utilities for some of the most common tasks and operations in the Rust programming language. +

+ + +
+
+
+ +# Installation + +This library is for internal use. And as such, it's only published privately. + +```bash +cargo add lool --registry=lugit --features utils {sub-feature}? +``` + +# Sub-Features + +This library is divided into several sub-features, each one providing a different set of utilities. This allows you to only include the parts you need, reducing compile times. + +- [x] [`utils.threads`](./threads): utilities for working with threads in Rust. + + \ No newline at end of file diff --git a/lib/utils/mod.rs b/lib/utils/mod.rs new file mode 100644 index 0000000..29faf9c --- /dev/null +++ b/lib/utils/mod.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "utils.threads")] +pub mod threads; diff --git a/lib/utils/threads/README.md b/lib/utils/threads/README.md new file mode 100644 index 0000000..710f82e --- /dev/null +++ b/lib/utils/threads/README.md @@ -0,0 +1,72 @@ +

+ +
+
+
+ +

+lool » utils.threads contains utilities for working with threads in Rust. +

+ +
+
+
+ +# Installation + +This library is for internal use. And as such, it's only published privately. + +```bash +cargo add lool --registry=lugit --features utils.threads +``` + +# Utilities + +## Thread Pool + +This is a super basic and lightweight thread pool implementation to use when there's not need for +a more complex solution. + +This is basically, the `ThreadPool` implementation from the book ["The Rust Programming Language" (chapter 20)](https://doc.rust-lang.org/book/ch20-00-final-project-a-web-server.html) +by Steve Klabnik and Carol Nichols, but with some modifications and additions, inspired by the +[threadpool](https://crates.io/crates/threadpool) crate. + +The `ThreadPool` implemented by the book waits for all threads to finish when the pool is dropped. + +This behavior is ok for some cases, for example, for a web server that needs to wait for all +currently active requests to finish before shutting down. But it might not be the desired behavior +for other cases (e.g. long-running tasks when we can't wait for a thread to finish). + +To keep this library generic enough, the `ThreadPool` implementation here exposes a `join` method +that explicitly waits for all threads to finish. + +### More robust solutions + +- [rayon](https://crates.io/crates/rayon): a data parallelism library for Rust. +- [threadpool](https://crates.io/crates/threadpool): a simple thread pool implementation (quite + abandoned). + +# Usage + +```rust +use lool::utils::threads::ThreadPool; + +fn main() { + // create a thread pool with 4 threads + let pool = ThreadPool::new(4); + + // spawn a bunch of tasks + for i in 0..8 { + pool.execute(move || { + println!("task {}", i); + }); + } + + // wait for all tasks to finish + // unlike the book's implementation, this doesn't happen automatically when the pool is dropped + // e.g. when the program ends, so we need to join explicitly + pool.join(); +} +``` + +Also, see the [`threadpool.rs`](../../../examples/threadpool.rs) example. diff --git a/lib/utils/threads/mod.rs b/lib/utils/threads/mod.rs new file mode 100644 index 0000000..9884013 --- /dev/null +++ b/lib/utils/threads/mod.rs @@ -0,0 +1 @@ +pub mod threadpool; diff --git a/lib/utils/threads/threadpool.rs b/lib/utils/threads/threadpool.rs new file mode 100644 index 0000000..87c776c --- /dev/null +++ b/lib/utils/threads/threadpool.rs @@ -0,0 +1,201 @@ +use { + crate::fail, + core::fmt, + eyre::Result, + std::{ + fmt::{Debug, Formatter}, + sync::{ + atomic::{AtomicUsize, Ordering}, + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, + }, + thread, + }, +}; + +type Job = Box; + +/// status of the thread pool +struct PoolStatus { + /// the number of pending jobs in the queue + queued_count: AtomicUsize, + /// the number of currently active jobs + active_count: AtomicUsize, + /// workers count + pool_size: usize, +} + +impl PoolStatus { + fn create(pool_size: usize) -> PoolStatus { + PoolStatus { + queued_count: AtomicUsize::new(0), + active_count: AtomicUsize::new(0), + pool_size, + } + } + + /// returns `true` if has active work (either queued or running) + fn has_work(&self) -> bool { + self.queued_count.load(Ordering::SeqCst) > 0 || self.active_count.load(Ordering::SeqCst) > 0 + } +} + +/// 🧉 » a simple thread pool implementation +/// +/// this is based on the example from the *"The Rust Programming Language"* book, with some +/// modifications inspired by the [threadpool](https://crates.io/crates/threadpool) crate. +/// +/// unlike the original implementation from the book, this one doesn't wait for all jobs to finish +/// when the pool is dropped. Instead, it's up to the user to call `join` to wait for all jobs to +/// finish before dropping the pool. This allows the user to stop the pool and drop immediately if +/// that's what they want +/// +/// **why?** +/// +/// the implementation from the book was thought to be used in a web server where it's +/// important to wait till all active requests are finished before shutting down the server. +/// +/// In other scenarios, though, this might not be necessary. For example, if we have a long-running +/// process and we need to shut down the pool, we might want to implement our own logic on how to +/// handle the shutdown. e.g. We might wait for some jobs, but not all of them, etc. +pub struct ThreadPool { + workers: Vec, + job_sender: Option>, + status: Arc, +} + +impl ThreadPool { + /// 🧉 » creates a new `ThreadPool`. + /// + /// The `capacity` is the number of threads in the pool. + /// + /// **Errors** + /// + /// If the `capacity` is zero, an error is returned. + pub fn create(capacity: usize) -> Result { + if capacity == 0 { + return fail!("ThreadPool size cannot be zero."); + } + + let status = Arc::new(PoolStatus::create(capacity)); + + let (job_sender, job_receiver) = channel(); + let receiver = Arc::new(Mutex::new(job_receiver)); + let mut workers = Vec::with_capacity(capacity); + + for id in 0..capacity { + workers.push(Worker::new(id, receiver.clone(), status.clone())); + } + + Ok(ThreadPool { + job_sender: Some(job_sender), + workers, + status, + }) + } + + /// 🧉 » queues execution of a task/function in the thread pool + /// + /// the function `f` will be executed as soon as a worker thread is free. + pub fn execute(&self, f: F) + where + F: FnOnce() + Send + 'static, + { + if let Some(ref job_sender) = self.job_sender { + self.status.queued_count.fetch_add(1, Ordering::SeqCst); + job_sender + .send(Box::new(f)) + .expect("ThreadPool::execute unable to send job into queue."); + } else { + panic!("ThreadPool::execute called, but there's no job_sender... weird!"); + } + } + + /// 🧉 » returns the number of active jobs + pub fn active_jobs(&self) -> usize { + self.status.active_count.load(Ordering::SeqCst) + } + + /// 🧉 » returns the number of queued jobs + pub fn queued_jobs(&self) -> usize { + self.status.queued_count.load(Ordering::Relaxed) + } + + /// 🧉 » returns the pool size + pub fn pool_size(&self) -> usize { + self.status.pool_size + } + + /// 🧉 » returns `true` if the pool has active work (either queued or running) + pub fn has_work(&self) -> bool { + self.status.has_work() + } + + /// 🧉 » waits for all threads to finish their work + pub fn join(&mut self) { + drop(self.job_sender.take()); + + for worker in &mut self.workers { + println!("Shutting down worker {}", worker.id); + + if let Some(thread) = worker.thread.take() { + thread.join().unwrap(); + } + } + } +} + +impl Debug for ThreadPool { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("ThreadPool") + .field("queued_jobs", &self.queued_jobs()) + .field("active_jobs", &self.active_jobs()) + .field("pool_size", &self.pool_size()) + .finish() + } +} + +/// the worker is responsible for executing the jobs on a specific thread. +/// +/// it holds its own thread and waits for a job to be sent to it from the `ThreadPool::execute` fn. +/// Once it receives a job, it +/// executes +/// it and then waits for another job, until the `job_receiver` is dropped (usually when the pool +/// is dropped). +struct Worker { + id: usize, + thread: Option>, +} + +impl Worker { + fn new( + id: usize, + job_receiver: Arc>>, + pool_status: Arc, + ) -> Worker { + let thread = thread::spawn(move || loop { + let message = { + // Only lock jobs for the time it takes to get a job, not to run it. + let lock = job_receiver.lock().expect("Worker thread unable to lock job_receiver"); + lock.recv() + }; // drops the lock here and now others are free to get another job + + let job = match message { + Ok(job) => job, + Err(..) => break, + }; + + pool_status.active_count.fetch_add(1, Ordering::SeqCst); + pool_status.queued_count.fetch_sub(1, Ordering::SeqCst); + + job(); + + pool_status.active_count.fetch_sub(1, Ordering::SeqCst); + }); + + Worker { + id, + thread: Some(thread), + } + } +}