| Crates.io | seq_io_parallel |
| lib.rs | seq_io_parallel |
| version | 0.2.1 |
| created_at | 2025-01-09 18:30:15.941572+00 |
| updated_at | 2025-03-15 16:43:59.132899+00 |
| description | A map-reduce style parallel extension to seq_io |
| homepage | |
| repository | https://github.com/noamteyssier/seq_io_parallel |
| max_upload_size | |
| id | 1510298 |
| size | 29,626 |
A parallel processing extension for the seq_io crate, providing an ergonomic API for parallel FASTA/FASTQ file processing.
For an alternative implementation with native paired-end support see paraseq.
While seq_io includes parallel implementations for both FASTQ and FASTA readers, this library offers an alternative approach with a potentially more ergonomic API that is not reliant on closures.
The implementation follows a Map-Reduce style of parallelism that emphasizes clarity and ease of use.
This cannot support paired-end processing currently.
The library implements a parallel processing pipeline with the following components:
RecordSets until EOFRecordSets in parallelRecordSets may be processed out of order, records within each set maintain their sequenceTo use parallel processing, implement one of the following traits:
// For single-file processing
pub trait ParallelProcessor: Send + Clone {
// Map: Process individual records
fn process_record<'a, Rf: MinimalRefRecord<'a>>(&mut self, record: Rf) -> Result<()>;
// Reduce: Process completed batches (optional)
fn on_batch_complete(&mut self) -> Result<()> {
Ok(())
}
}
Both FASTA and FASTQ records are accessed through the MinimalRefRecord trait:
pub trait MinimalRefRecord<'a> {
fn ref_head(&self) -> &[u8]; // Header data
fn ref_seq(&self) -> &[u8]; // Sequence data
fn ref_qual(&self) -> &[u8]; // Quality scores (empty for FASTA)
}
This implementation allows for hooking into different stages of the processing pipeline:
process_record method to process individual records.on_batch_complete method to perform an operation after each batch (optional).on_thread_complete method to perform an operation after all batches within a thread (optional).get_thread_id and set_thread_id methods to access the thread ID (optional).Here's a simple example that performs parallel processing of a FASTQ file:
use anyhow::Result;
use seq_io::fastq;
use seq_io_parallel::{MinimalRefRecord, ParallelProcessor, ParallelReader};
use std::sync::{atomic::AtomicUsize, Arc};
#[derive(Clone, Default)]
pub struct ExpensiveCalculation {
local_sum: usize,
global_sum: Arc<AtomicUsize>,
}
impl ParallelProcessor for ExpensiveCalculation {
fn process_record<'a, Rf: MinimalRefRecord<'a>>(&mut self, record: Rf) -> Result<()> {
let seq = record.ref_seq();
let qual = record.ref_qual();
// Simulate expensive calculation
for _ in 0..100 {
for (s, q) in seq.iter().zip(qual.iter()) {
self.local_sum += (*s - 33) as usize + (*q - 33) as usize;
}
}
Ok(())
}
fn on_batch_complete(&mut self) -> Result<()> {
self.global_sum
.fetch_add(self.local_sum, std::sync::atomic::Ordering::Relaxed);
self.local_sum = 0;
Ok(())
}
}
fn main() -> Result<()> {
let path = std::env::args().nth(1).expect("No path provided");
let num_threads = std::env::args()
.nth(2)
.map(|n| n.parse().unwrap())
.unwrap_or(1);
let (handle, _) = niffler::send::from_path(&path)?;
let reader = fastq::Reader::new(handle);
let processor = ExpensiveCalculation::default();
reader.process_parallel(processor.clone(), num_threads)?;
Ok(())
}
FASTA/FASTQ processing is typically I/O-bound, so parallel processing benefits may vary:
Arc for processor state with heavy initialization costsClone of the ParallelProcessorArc<AtomicUsize>)ArcCurrently this library is making use of anyhow for all error handling.
This is not ideal for custom error types in libraries, but for many CLI tools will work just fine.
In the future this may change.