# thread_io [![docs.rs](https://docs.rs/thread_io/badge.svg)](https://docs.rs/thread_io/latest/thread_io) [![crates.io](https://img.shields.io/crates/v/thread_io.svg)](https://crates.io/crates/thread_io) [![Build status](https://github.com/markschl/thread_io/workflows/ci/badge.svg)](https://github.com/markschl/thread_io/actions) This crate allows to easily wrap readers and writers in a background thread. This can be useful e.g. with readers and writers for compression formats to reduce load on the main thread. `thread_io` uses channels (optionally from the [crossbeam crate](https://docs.rs/crossbeam/latest/crossbeam/channel/index.html)) for communicating and exchanging chunks of data with the background reader / writer. **[Reader API documentation](https://docs.rs/thread_io/latest/thread_io/read)** **[Writer API documentation](https://docs.rs/thread_io/latest/thread_io/write)** The minimum Rust version is *1.38.0*. ## Examples ### Reading The following code counts the number of lines containing *spam* in a gzip compressed file. Decompression is done in a background thread using the `flate2` library, and the decompressed data is sent to a reader supplied to a closure in the main thread. The speed gain should be highest if decompression and text searching use about the same amount of CPU time. The resulting line number should be the same as the output of `zcat file.txt.gz | grep 'spam' | wc -l`. ```rust use io::prelude::*; use io; use fs::File; use thread_io::read::reader; use flate2::read::GzDecoder; // size of buffers sent across threads const BUF_SIZE: usize = 256 * 1024; // length of queue with buffers pre-filled in background thread const QUEUE_LEN: usize = 5; let f = File::open("file.txt.gz").unwrap(); let gz = GzDecoder::new(f); let search_term = "spam"; let found = reader( BUF_SIZE, QUEUE_LEN, gz, |reader| { let mut buf_reader = io::BufReader::new(reader); let mut found = 0; let mut line = String::new(); while buf_reader.read_line(&mut line)? > 0 { if line.contains(search_term) { found += 1; } line.clear(); } Ok::<_, io::Error>(found) } ) .expect("decoding error"); println!("Found '{}' in {} lines.", search_term, found); ``` Note that this is an example for illustration. To increase performance, one could read lines into a `Vec` buffer (instead of `String`) and search for *spam* e.g. using `memchr` from the [memchr crate](https://crates.io/crates/memchr). The compiler sometimes needs a hint about the exact error type returned from `func`, in this case this was done by specifying `Ok::<_, io::Error>()` as return value. `thread_io::read::reader` requires the underlying reader to implement `Send`. Unfortunately, this is not always the case, such as with `io::StdinLock`. There is the [`thread_io::read::reader_init`](https://docs.rs/thread_io/latest/thread_io/read/fn.reader_init.html) function to handle such cases. ### Writing Writing to a gzip compressed file in a background thread works similarly as reading. The following code writes all lines containing *spam* to a compressed file. The contents of the compressed output file `file.txt.gz` should be the same as if running `grep 'spam' file.txt | gzip -c > file.txt.gz` ```rust use fs::File; use io::prelude::*; use io; use thread_io::write::writer; use flate2::write::{GzEncoder}; use flate2::Compression; const BUF_SIZE: usize = 256 * 1024; const QUEUE_LEN: usize = 5; let infile = File::open("file.txt").unwrap(); let outfile = File::create("file.txt.gz").unwrap(); let mut gz_out = GzEncoder::new(outfile, Compression::default()); let search_term = "spam"; writer( BUF_SIZE, QUEUE_LEN, &mut gz_out, |writer| { // This function runs in the main thread, all writes are written to // 'gz_out' in the background let mut buf_infile = io::BufReader::new(infile); let mut line = String::new(); while buf_infile.read_line(&mut line)? > 0 { if line.contains(search_term) { writer.write(line.as_bytes()).expect("write error"); } line.clear(); } Ok::<_, io::Error>(()) }, ) .expect("encoding error"); gz_out.finish().expect("finishing failed"); ``` More details on the exact behavior and more flexible functions e.g. for dealing with *non-Send* writer types can be found in the documentation of the [write module](https://docs.rs/thread_io/latest/thread_io/write). After `func` returns, the background writer *always* calls `io::Write::flush`, making sure that possible flushing errors are caught before the file goes out of scope. ## Notes on errors Two types of errors may occur when using the readers and writers of this crate: * **`io::Error`** returned from `io::Read::read` / `io::Write::write` calls. This error cannot be returned *instantly*, instead it is pushed to a queue and will be returned in a subsequent read or write call. The delay depends on the `queuelen` parameter of the reading / writing functions, but also on the `bufsize` parameter and the size of the reading / writing buffer. * The `func` closure allows returning **custom errors** of any type, which may occur in the user program *after* reading from the background reader or *before* writing to the background writer. With the `thread_io` writer, there is the additional required trait bound `From` due to the way the writer works. Both with reading and writing, custom user errors are prioritized over eventual `io::Error`s. For example, it is possible that while parsing a file, a syntax error occurs, which the programmer returns from the `func` closure. Around the same time, `io::Error` may occur as well, e.g. because the GZIP file is truncated. If this error is still in the queue waiting to be reported as the syntax error happens, ultimately the syntax error will be returned and the `io::Error` discarded. After the func closure ends (with or without an error), a signal is placed in a queue telling the background thread to stop processing. However, `queuelen` reads or writes will be done before processing ultimately stops. More details on error handling are found in the documentation of the [read](https://docs.rs/thread_io/latest/thread_io/read) and [write](https://docs.rs/thread_io/latest/thread_io/write) modules. ## Crossbeam channels It is possible to use [the channel implementation from the crossbeam crate](https://docs.rs/crossbeam/latest/crossbeam/channel/index.html) by specifying the `crossbeam_channel` feature. The few tests I have done didn't show any performance gain over using the channels from the standard library. ## Similar projects [**fastq-rs**](https://github.com/aseyboldt/fastq-rs) provides a very similar functionality as `thread_io::read::reader` in its [`thread_reader`](https://docs.rs/fastq/latest/fastq/fn.thread_reader.html) module.