| Crates.io | adaptive-pipeline |
| lib.rs | adaptive-pipeline |
| version | 2.0.0 |
| created_at | 2025-10-12 10:50:26.301052+00 |
| updated_at | 2025-10-12 10:50:26.301052+00 |
| description | High-performance optimized, adaptive file processing pipeline with configurable stages, binary format support, and cross-platform compatibility |
| homepage | |
| repository | https://github.com/abitofhelp/adaptive_pipeline.git |
| max_upload_size | |
| id | 1879174 |
| size | 2,901,050 |
High-performance adaptive file processing pipeline with configurable stages, binary format support, and cross-platform compatibility.
This crate provides the application and infrastructure layers for the Adaptive Pipeline system - including use cases, services, adapters, repositories, and a production-ready CLI.
[dependencies]
adaptive-pipeline = "1.0"
cargo install adaptive-pipeline
This crate implements the Application and Infrastructure layers:
βββββββββββββββββββββββββββββββββββββββββββββ
β APPLICATION LAYER β
β βββββββββββββββββββββββββββββββββββββββ β
β β Use Cases β β
β β - ProcessFile β β
β β - RestoreFile β β
β β - CreatePipeline β β
β β - ValidateFile β β
β βββββββββββββββββββββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββ β
β β Application Services β β
β β - ConcurrentPipeline (orchestrator)β β
β β - StreamingFileProcessor β β
β βββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββ
β INFRASTRUCTURE LAYER β
β βββββββββββββββββββββββββββββββββββββββ β
β β Adapters β β
β β - TokioFileIO β β
β β - AsyncCompression β β
β β - AsyncEncryption β β
β βββββββββββββββββββββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββ β
β β Repositories β β
β β - SqlitePipelineRepository β β
β βββββββββββββββββββββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββ β
β β Runtime Management β β
β β - ResourceManager (global tokens) β β
β β - StageExecutor β β
β β - Supervisor (task management) β β
β βββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββ
use adaptive_pipeline::application::use_cases::ProcessFileUseCase;
use adaptive_pipeline::application::services::ConcurrentPipeline;
use adaptive_pipeline_domain::value_objects::PipelineId;
use std::path::Path;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create pipeline service
let pipeline_service = ConcurrentPipeline::new(
file_io_service,
pipeline_repository,
stage_registry,
);
// Process file through pipeline
let config = ProcessFileConfig {
input: Path::new("input.dat").to_path_buf(),
output: Path::new("output.adapipe").to_path_buf(),
pipeline: "compress-encrypt".to_string(),
chunk_size_mb: Some(8),
workers: None, // Auto-detect
channel_depth: Some(4),
};
let result = ProcessFileUseCase::execute(config, pipeline_service).await?;
println!("Processed {} bytes in {:.2}s",
result.bytes_processed,
result.duration.as_secs_f64()
);
Ok(())
}
use adaptive_pipeline::application::use_cases::CreatePipelineUseCase;
use adaptive_pipeline_domain::entities::{Pipeline, PipelineStage, StageType};
// Define pipeline stages
let stages = vec![
PipelineStage::new("compress".to_string(), StageType::Compression, 1),
PipelineStage::new("encrypt".to_string(), StageType::Encryption, 2),
PipelineStage::new("checksum".to_string(), StageType::Checksum, 3),
];
// Create and save pipeline
let pipeline = CreatePipelineUseCase::execute(
"secure-backup".to_string(),
stages,
pipeline_repository,
).await?;
println!("Created pipeline: {}", pipeline.id());
use adaptive_pipeline::application::use_cases::RestoreFileUseCase;
use std::path::Path;
// Restore from .adapipe format
let result = RestoreFileUseCase::execute(
Path::new("backup.adapipe"),
Some(Path::new("/restore/directory")),
false, // mkdir
false, // overwrite
pipeline_service,
).await?;
println!("Restored to: {}", result.output_path.display());
# Basic processing
adaptive-pipeline process \
--input data.bin \
--output data.adapipe \
--pipeline compress-encrypt
# With custom settings
adaptive-pipeline process \
-i large.dat \
-o large.adapipe \
-p secure \
--chunk-size-mb 16 \
--workers 8 \
--channel-depth 8
# Compression only
adaptive-pipeline create \
--name fast-compress \
--stages compression:lz4
# Full security pipeline
adaptive-pipeline create \
--name secure-backup \
--stages compression:zstd,encryption:aes256gcm,integrity
# Restore to original location
adaptive-pipeline restore --input backup.adapipe
# Restore to specific directory
adaptive-pipeline restore \
--input data.adapipe \
--output-dir /tmp/restored \
--mkdir \
--overwrite
# Quick format validation
adaptive-pipeline validate-file --file output.adapipe
# Full integrity check
adaptive-pipeline validate-file --file output.adapipe --full
# Quick benchmark
adaptive-pipeline benchmark
# Comprehensive test
adaptive-pipeline benchmark \
--size-mb 1000 \
--iterations 5
For complete CLI documentation, see the root README.
Channel-Based Execution:
βββββββββββββββ Channel ββββββββββββββββ Direct ββββββββββββββ
β Reader ββββββββββββββββ CPU Workers ββββββββββββββββ Writer β
β Task β Backpressure β (Parallel) β Random Accessβ (.adapipe) β
βββββββββββββββ ββββββββββββββββ ββββββββββββββ
β β β β β
File I/O Rayon Threads Concurrent Seeks
(Streaming) (CPU-bound) (No Mutex!)
Key Optimizations:
Measured with adaptive_pipeline benchmark command (2025-10-07):
| File Size | Best Throughput | Optimal Config | Adaptive Config |
|---|---|---|---|
| 100 MB | 811 MB/s | 16MB chunks, 7 workers | 502 MB/s (16MB, 8 workers) |
| 1 GB | 822 MB/s | 64MB chunks, 5 workers | 660 MB/s (64MB, 10 workers) |
Performance Insights:
Run your own benchmarks: adaptive_pipeline benchmark --file <path>
# Database
export ADAPIPE_SQLITE_PATH="./pipeline.db"
# Logging
export RUST_LOG="adaptive_pipeline=debug,tower_http=warn"
# Performance
export RAYON_NUM_THREADS=8
export TOKIO_WORKER_THREADS=4
# pipeline.toml
[pipeline]
chunk_size_mb = 8
parallel_workers = 0 # Auto-detect
[compression]
algorithm = "zstd"
level = "balanced"
[encryption]
algorithm = "aes256gcm"
key_derivation = "argon2id"
# Start with metrics endpoint
adaptive-pipeline process \
--input data.bin \
--output data.adapipe \
--pipeline test
# Query metrics (default port: 9090)
curl http://localhost:9090/metrics
Key Metrics:
pipeline_throughput_bytes_per_secondpipeline_cpu_queue_depthpipeline_worker_utilizationpipeline_chunk_processing_duration_ms# Enable debug logging
RUST_LOG=adaptive_pipeline=debug adaptive-pipeline process ...
# Log to file
adaptive-pipeline process ... 2>&1 | tee pipeline.log
Implement the StageService trait:
use adaptive_pipeline_domain::services::StageService;
use adaptive_pipeline_domain::entities::{FileChunk, ProcessingContext};
pub struct MyCustomStage {
// Stage configuration
}
impl StageService for MyCustomStage {
fn process_chunk(
&self,
chunk: FileChunk,
context: &mut ProcessingContext,
) -> Result<FileChunk, PipelineError> {
// Custom processing logic
Ok(chunk)
}
fn reverse_chunk(
&self,
chunk: FileChunk,
context: &mut ProcessingContext,
) -> Result<FileChunk, PipelineError> {
// Reverse transformation
Ok(chunk)
}
}
use adaptive_pipeline::infrastructure::runtime::ResourceManager;
// Global resource manager
let rm = ResourceManager::global();
// Acquire CPU token (respects core count)
let cpu_token = rm.acquire_cpu_token().await?;
// Acquire I/O token (respects device type)
let io_token = rm.acquire_io_token().await?;
// Tokens auto-release on drop
The .adapipe binary format includes:
ββββββββββββββββββββββββββββββββββββββ
β Header (1024 bytes) β
β - Magic bytes: ADAPIPE\0 β
β - Version: 1 β
β - Original filename & checksum β
β - Pipeline ID and stages β
β - Compression/encryption config β
ββββββββββββββββββββββββββββββββββββββ
ββββββββββββββββββββββββββββββββββββββ
β Chunk 0 (variable size) β
β - Sequence number β
β - Compressed size β
β - Data β
β - Checksum β
ββββββββββββββββββββββββββββββββββββββ
β ... more chunks ... β
ββββββββββββββββββββββββββββββββββββββ
β Footer (1024 bytes) β
β - Total chunks β
β - Output checksum β
β - Processing timestamp β
ββββββββββββββββββββββββββββββββββββββ
# Run all tests
cargo test --workspace
# Unit tests only
cargo test --lib
# Integration tests
cargo test --test '*'
# With logging
RUST_LOG=debug cargo test -- --nocapture
BSD 3-Clause License - see LICENSE file for details.
Contributions welcome! Focus areas:
High Performance | Production Ready | Enterprise Security