# Making it Multihtreaded Server can only process 1 Request at a time, this is bad no matter how fast they are processed. ## Simulating Slow Requests Just add a `thread::sleep(Duration::from_secs(5))` in the process, if the server got stuck there, other requests might be lost or not arrive on time and that is critical! ## Using a ThreadPool ThreadPool = Group of threads ready to handle tasks. - Don't create a thread / task per requests, create a finite number of threads and handle requests from a queue of requests. Before implementing, think about it: - Is this the best model? - Write the interface first - How would I want to interact with it? - Implement the interfaces you needed ### 1st - Thread per request ```rust thread::spawn(|| { handle_connection(stream); }); // Send closure with the local stream to the thread // Bad because at large number of request we will have a thread blocking problem ``` ### 2nd - Hypthetical ThreadPool ```rust let pool = ThreadPool::new(4); //... pool.execute(|| { handle_connection(stream); }); //... ``` WE create a threadpool with a finite amount of threads, instead of creating we just pass it the closures required to execute! ### Building the TreadPool - Compiler Driven Dev We use the errors from the compiler to finish what is needed? We write what we want, then compile and complete wha thte compiler is worrying about: ```rust error[E0433]: failed to resolve: use of undeclared type `ThreadPool` --> src\main.rs:5:16 | 5 | let pool = ThreadPool::new(4); | ^^^^^^^^^^ use of undeclared type `ThreadPool` // So we create struct `ThreadPool` ``` - Create lib.rs - Create folder `bin` inside `src` - Move `main.rs` to `bin` -> Signifies where the start is - add `use hello::ThreadPool` in `main.rs` - add `pub struct ThreadPool;` to `lib.rs` ```rust error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope --> src\bin\main.rs:6:28 | 6 | let pool = ThreadPool::new(4); | ^^^ function or associated item not found in `ThreadPool` // So we write the new function ``` ```rust pub struct ThreadPool{ num_threads: usize, count: u8, } impl ThreadPool { pub fn new(size: usize) -> ThreadPool { Threadpool{num_threads: size, count:0} } } } ``` Then we execute again and: ```rust error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope --> src\bin\main.rs:11:14 | 11 | pool.execute(|| { handle_connection(stream);}); | ^^^^^^^ method not found in `ThreadPool` // So we implement execute ``` For this method we need a way to: - Spawn threads - Execute that adds the closure to a queue of other closures - Decide which kind of closure Documentation of `thread::spawn` says: ```rust pub fn spawn(f: F) -> JoinHandle where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static, ``` We need the spawn to accept a Function of type `FnOnce`, Also implements traits `Send` and have a `'static` lifetime and the return type have that trait and lifetime too. Knowing we want to execute only 1 time, `FnOnce` makes sense, but we don't care about returns for now! ```rust pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static, // FnOnce() with () because it represents closure without parameters and no return aka (), done for simplification purposes { } ``` And with that, we compile again, but the browser will not receive anything! ## Validating Number of Threads in new I don't lik assert: ```rust pub fn(size: usize) -> ThreadPool { let mut pool: ThreadPool{num_threads: 0, count: 0}; pool.num_threads = if size > 0 {size} else {1} pool } // or, better: pub fn(size: usize) -> Result { //... } ``` ## Storing threads `thread::spawn` returns a `JoinHandle` where T is the return type. So we add a vector of threads instead of our bad count: ```rust use std::thread; pub struct ThreadPool { threads: Vec>, } //.. pub fn new(size: usize) -> ThreadPool { let num = if(size > 0) {size} else {1}; let mut threads = Vec::with_capacity(num); // with_capacity -> allocates first an amount of memory (like resize or .reserve() in C++) for _ in 0..num { // create some threads and store them in the vector } ThreadPool { threads } } //.. ``` ### Workers - Intermediate between thread and task We can't just add a live thread to the pool, when we spawn we have to tell the thread to just do something. We want to send closure later then execute it. We should not store `JoinHandle<()>` we store a `Worker` that will then store the following: - `id`, `JoinHandle<()>` - Has `Worker::new` that takes `id` and returns instance with an empty closure. The `ThreadPool::new` generates `Worker` with `id` and stores them. ```rust pub struct Worker { id: u16, instance: thread::JoinHandle<()>, } // new is not public, only a ThreadPool should initialize workers impl Worker { fn new(id: u16) -> Worker { Worker { id, instance: thread::spawn(||{}), } } } //... for v in 0..num { threads.push(Worker::new(v.try_into().unwrap())); } //... ``` ## Send requests via Channels When the thread is spawned, it already runs with nothing now. We need to communicate and tell it what to do: - `ThreadPool` creates `channel` and holds send part of the channel - `Worker` holds receiver - `Job` strcut that hodls the closure to be sent in the channel - `execute` sends `Job` using `channel` ```rust struct Job; // Not public, only used in implementation pub struct ThreadPool { threads: Vec, sender: mpsc::Sender } // in ThreadPool::new()... let (sender, receiver) = mpsc::channel(); // Create connection //... ThreadPool{threads, sender} ``` ### Sharing the receiver ```rust pub struct Worker { id: u16, receiver: mpsc::Receiver, } // in ThreadPool::new() when creating workers for v in 0..num { threads.push(Worker::new(v.try_into().unwrap(), receiver)); } ``` We can't send receiver to multiple workers. It is `Multiple Producer Single Consumer`. We need to share it and consume safely from multiple receiving ends! In Chapter 16,w e use `Arc` and `Mutex` to share ownership of somethign safely: ```rust use std::sync::Arc; use std::sync::Mutex: //... pub fn new(size: usize) -> ThreadPool { //... let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); // Safe receiver, behind a mutes that hodl sit and ARC to pass access to multiple let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } //... impl Worker { fn new(id: usize, receiver: Arc>>) -> Worker { Worker { id, instance: thread::spawn(||{ safe_rec; }), // Spawn a thread that will listen to the sender } } } ``` ### Implement `execute` Method ```rust // Instead of empty type, we use a type Alias to define a Job as any type that complies with the Closure type type Job = Box; // Chapter 19 - Typa Aliases + Chapter 15 Smart Pointers using Box // in the impl ThreadPool... pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); // Send will try and reach any safe reference of the receiver // that is available, that's it. If not it will block until some is available } //... // in impl Worker... fn new(id: usize, receiver: Arc>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); // Lock access to the variable and check if there is somethign to receive println!("Worker {} got a job; executing.", id); job(); }); Worker { id, thread } } //... ```