| Crates.io | montana-pipeline |
| lib.rs | montana-pipeline |
| version | 0.0.1 |
| created_at | 2025-12-16 16:53:45.436258+00 |
| updated_at | 2025-12-16 16:53:45.436258+00 |
| description | Core pipeline traits and types for Montana |
| homepage | https://github.com/base/montana |
| repository | https://github.com/base/montana |
| max_upload_size | |
| id | 1988263 |
| size | 57,105 |
Core pipeline traits and types for Montana.
A robust, trait-abstracted compression pipeline for L2 batch submission and derivation.
Montana implements a three-stage, trait-abstracted duplex pipeline:
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ BatchSource │ ──▶ │ Compressor │ ──▶ │ BatchSink │
│ (L2 Blocks) │ │ (Brotli 11) │ │ (L1 Blobs) │
└───────────────┘ └───────────────┘ └───────────────┘
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ L1BatchSource │ ──▶ │ Decompressor │ ──▶ │ L2BlockSink │
│ (L1 Blobs) │ │ (Brotli) │ │ (L2 Blocks) │
└───────────────┘ └───────────────┘ └───────────────┘
| Trait | Purpose | Key Methods |
|---|---|---|
BatchSource |
Provides L2 blocks to batch | pending_blocks(), l1_origin(), l1_origin_hash(), parent_hash() |
Compressor |
Compress/decompress data | compress(), decompress(), estimated_ratio() |
BatchSink |
Submits compressed batches | submit(), capacity(), health_check() |
BatchCodec |
Encode/decode wire format | encode(), decode() |
L1BatchSource |
Fetches batches from L1 | next_batch(), l1_head() |
L2BlockSink |
Ingests derived L2 blocks | ingest() |
The pipeline configuration is not exported directly from this crate. Configuration is typically done through implementations that use these traits. For example, the montana-batcher crate provides BatcherConfig for configuring batch submission pipelines.
Compression is configured separately:
pub struct CompressionConfig {
pub level: u32, // 1-11 for Brotli (default: 11, maximum)
pub window_size: u32, // Log2 of window size (default: 22 = 4MB)
}
| Constant | Value | Purpose |
|---|---|---|
MAX_BATCH_SIZE |
128 KB | Maximum compressed batch size (fits in 1 blob) |
MIN_BATCH_SIZE |
1 KB | Minimum batch size to avoid dust submissions |
COMPRESSION_LEVEL |
11 | Brotli maximum compression level |
COMPRESSION_WINDOW |
22 | 4MB sliding window for Brotli |
BATCH_INTERVAL |
12 seconds | Aligned with L1 block time |
SEQUENCING_WINDOW |
3600 L1 blocks | ~12 hours of L1 blocks |
SAFE_CONFIRMATIONS |
12 L1 blocks | Required confirmations for safe head |
The pipeline provides core traits for implementing batching strategies:
BatchSource provides pending L2 blocksCompressor handles compression/decompressionBatchCodec encodes blocks into wire formatBatchSink submits compressed batches to L1Implementations using these traits (like montana-batcher) typically follow a polling accumulation model where batches are submitted based on:
The trait design allows for different batching strategies and submission patterns.
The current architecture supports continuous data feeding but uses batch processing semantics. For true streaming with target size control, the following enhancements would be needed:
Target Size Configuration: The current design has min and max but no target_batch_bytes to aim for a specific output size.
Predictive Batching: Logic to estimate when to "cut" a batch before it exceeds limits:
let estimated_compressed = uncompressed_size as f64 / compressor.estimated_ratio();
if estimated_compressed >= target_batch_bytes {
// Cut batch here
}
Incremental Compression (optional): Brotli's streaming API could provide intermediate size estimates for better prediction.
pub struct StreamingPipelineConfig {
pub target_batch_bytes: usize, // Target compressed size (e.g., 100KB)
pub max_batch_bytes: usize, // Hard limit (128KB)
pub min_batch_bytes: usize, // Minimum to submit (1KB)
pub max_blocks_per_batch: u16,
pub max_wait_time: Duration, // Force submit after this time
}
The trait abstractions are well-suited for streaming:
BatchSource that yields blocks as they arriveCompressor::estimated_ratio() method exists for size prediction┌─────────────────────────────────────────────────────────────┐
│ version (1B) | batch_number (8B) | l1_origin (8B) │
│ l1_origin_check (20B) | parent_check (20B) │
│ timestamp (8B) | block_count (2B) │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Block 0: [timestamp_delta: u16] [tx_count: u16] [txs...] │
│ Block 1: [timestamp_delta: u16] [tx_count: u16] [txs...] │
│ ... │
└─────────────────────────────────────────────────────────────┘
Each transaction: [tx_len: u24] [tx_data: tx_len bytes]
┌─────────────────────────────────────────────────────────────┐
│ [version: u8 = 0x00] [brotli_compressed_payload] │
└─────────────────────────────────────────────────────────────┘
The pipeline uses comprehensive error types for each component:
SourceError: Connection failures, empty results, missing L1 originSinkError: L1 connection failures, transaction failures, insufficient funds, blob gas pricing, timeoutsCompressionError: Compression failures, corrupted data, size limitsCodecError: Invalid versions, truncated data, invalid block countsPipelineError: Wrapper combining all error typesImplementations can determine retry strategies based on error types. For example, connection failures and timeout errors are typically retryable, while insufficient funds or sequencing window expiration are not.
This crate provides the core traits for building pipelines. Here's an example of implementing a custom source:
use montana_pipeline::{BatchSource, L2BlockData, SourceError};
use async_trait::async_trait;
struct MyBlockSource {
// Your implementation details
}
#[async_trait]
impl BatchSource for MyBlockSource {
async fn pending_blocks(&mut self) -> Result<Vec<L2BlockData>, SourceError> {
// Fetch blocks from your L2 execution client
todo!()
}
async fn l1_origin(&self) -> Result<u64, SourceError> {
// Return current L1 origin block number
todo!()
}
async fn l1_origin_hash(&self) -> Result<[u8; 20], SourceError> {
// Return L1 origin block hash prefix
todo!()
}
async fn parent_hash(&self) -> Result<[u8; 20], SourceError> {
// Return parent L2 block hash prefix
todo!()
}
}
For a complete batching system, see the montana-batcher crate which provides a service that orchestrates these traits.
MIT