| Crates.io | newton-prover-aggregator |
| lib.rs | newton-prover-aggregator |
| version | 0.1.28 |
| created_at | 2026-01-06 16:26:28.828412+00 |
| updated_at | 2026-01-06 16:26:28.828412+00 |
| description | newton prover aggregator utils |
| homepage | |
| repository | https://github.com/newt-foundation/newton-prover-avs |
| max_upload_size | |
| id | 2026239 |
| size | 386,513 |
The aggregator crate is the core component of the Newton Prover AVS (Actively Validated Service) responsible for orchestrating BLS signature aggregation from multiple operators. It serves as the central coordinator that:
The aggregator is designed for high availability, low latency, and robustness. It handles edge cases gracefully, prevents memory leaks through bounded data structures and cleanup mechanisms, and ensures errors in one task don't affect others.
The aggregator architecture consists of two main components that work together:
┌─────────────────────────────────────────────────────────────────┐
│ AggregatorCore (core.rs) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ • Task initialization & validation │ │
│ │ • Signature processing & buffering │ │
│ │ • Task response storage (task_responses) │ │
│ │ • Pending signature buffer (pending_signatures) │ │
│ │ • Background cleanup tasks │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ │ Channels (ServiceHandle) │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ BlsAggregatorService (bls.rs) │ │
│ │ • BLS signature aggregation engine │ │
│ │ • Per-task aggregator tasks │ │
│ │ • Signature verification │ │
│ │ • Quorum threshold checking │ │
│ │ • Aggregated response generation │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
AggregatorCore (core.rs):
ServiceHandleBlsAggregatorService (bls.rs):
initialize_task() → BLS service spawns single_task_aggregator task → Creates per-task response channel → Returns receiver to AggregatorCoreprocess_signed_response() → Buffered if task not initialized, otherwise sent to BLS servicetask_response_digest, checks quorum thresholdswait_for_aggregation(task_id, timeout) → Receives from task-specific channel → Returns responsesubmit_aggregated_response() submits to contract and cleans up task statecore.rs)AggregatorCore is the central orchestrator that manages the complete lifecycle of aggregation tasks. It provides a high-level API for task initialization, signature processing, and response submission while managing memory, concurrency, and error handling.
pub struct AggregatorCore {
/// Service handle to interact with the BLS Aggregator Service
pub service_handle: Arc<Mutex<ServiceHandle>>,
/// To receive aggregated responses from the BLS Aggregator Service (legacy, deprecated)
/// NOTE: This is kept for backward compatibility. New code uses task_response_receivers.
pub aggregate_receiver: Arc<Mutex<AggregateReceiver>>,
/// Per-task response receivers for direct routing (no mutex contention)
/// Each task gets its own channel, eliminating response stealing and lock contention
/// DashMap enables lock-free concurrent access for different TaskIds
task_response_receivers: DashMap<TaskId, UnboundedReceiver<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>>,
/// DashMap for lock-free concurrent access - different tasks can access their states simultaneously
/// Per-task locking eliminates contention between tasks accessing different TaskIds
pub task_states: Arc<DashMap<TaskId, TaskState>>,
/// Cancellation token for background tasks
cancellation_token: CancellationToken,
}
/// Task state to reduce lock contention
#[derive(Debug, Clone)]
pub struct TaskState {
/// Quorum numbers for reference timestamp queries
quorum_nums: Vec<u8>,
/// Operator errors for this task
operator_errors: Vec<OperatorErrorResponse>,
/// Expected operator count (for early exit detection)
expected_operators: usize,
/// Task responses by digest
task_responses: HashMap<TaskResponseDigest, BindingTaskResponse>,
}
Design Decisions:
task_response_receivers uses DashMap: Lock-free concurrent access for different TaskIds. Per-task channels enable zero mutex contention. Each task has its own dedicated channel, eliminating response stealing and serialization bottlenecks.task_states uses Arc<DashMap>: Lock-free concurrent access for different tasks. Per-task locking eliminates contention between tasks accessing different TaskIds. Critical for high-throughput scenarios with 10k+ concurrent tasks.TaskState for better locality and reduced lock scope.new()Initializes the aggregator core and spawns background tasks:
pub async fn new(
avs_registry_reader: AvsRegistryChainReader,
operator_registry_address: Address,
ws_rpc_url: Option<String>,
http_rpc_url: String,
) -> Result<Self, eyre::Error>
Responsibilities:
process_pending_signatures_loop: Processes buffered signatures when tasks are initializedcleanup_expired_pending_signatures_loop: Removes expired pending signatures (5s timeout)cleanup_stale_task_responses_loop: Evicts stale task responses (60s interval)Memory Safety: All background tasks use CancellationToken for graceful shutdown, ensuring resources are cleaned up when AggregatorCore is dropped.
initialize_task()Initializes a new aggregation task with validation:
pub async fn initialize_task(
&self,
task_id: TaskId,
task_created_block: u64,
quorum_nums: Vec<u8>,
quorum_threshold_percentage: u8,
time_to_expiry: Duration,
) -> Result<(), eyre::Error>
Input Validation:
Flow:
TaskMetadata with task configurationServiceHandle before async call (avoids holding lock during async operation)InitializeTask message to BLS servicetask_response_receivers for wait_for_aggregation() to useError Handling: Returns structured errors with context. Errors in initialization don't affect other tasks.
Performance: Per-task channels eliminate mutex contention and response stealing, enabling true concurrent processing of multiple tasks.
process_signed_response()Processes a signed task response from an operator:
pub async fn process_signed_response(
&self,
signed_response: SignedTaskResponse
) -> Result<(), eyre::Error>
Flow:
task_id and operator_id (both must be non-zero)task_response_digest via Keccak256 hashTaskSignature and sends to BLS servicepending_signatures (with size limit check)task_responses with size limit enforcementMemory Management:
MAX_TASK_RESPONSES limit before adding new task entriesError Handling:
TaskNotFound: Buffers signature for later processingwait_for_aggregation()Waits for aggregated response with timeout using per-task channels:
pub async fn wait_for_aggregation(
&self,
task_id: TaskId,
timeout_duration: Duration,
) -> Result<BlsAggregationServiceResponse, AggregatorCoreError>
Flow:
Performance Benefits:
Error Handling:
TaskNotInitialized: Task not found in receivers mapTimeout: Includes operator errors for the specific taskAggregationServiceError: Task-specific errors (guaranteed to be for this task_id)Cancelled: Operation was cancelled via cancellation tokenIsolation: Each task's response channel is independent. One task's channel closure doesn't affect others.
submit_aggregated_response()Submits aggregated response to contract:
pub async fn submit_aggregated_response(
&self,
avs_writer: &AvsWriter,
task: Task,
task_response: BindingTaskResponse,
service_response: BlsAggregationServiceResponse,
) -> Result<TransactionReceipt, eyre::Error>
Flow:
AvsWritertask_responses entry and insertion orderCleanup: Automatically removes task from task_responses and task_responses_insertion_order after successful submission to prevent memory leaks.
process_pending_signatures_loopContinuously processes buffered signatures when notified:
pending_notify_rx channel for task IDsServiceHandle to avoid holding lock during async operationsPerformance: Processes signatures in batch, collecting successful responses before single lock acquisition.
cleanup_expired_pending_signatures_loopPeriodically removes expired pending signatures:
PENDING_SIGNATURE_CLEANUP_INTERVAL (10 seconds)PENDING_SIGNATURE_TIMEOUT (5 seconds)cleanup_stale_task_responses_loopPeriodically evicts stale task responses:
TASK_RESPONSES_CLEANUP_INTERVAL (60 seconds)MAX_TASK_RESPONSES limit, evicts oldest 10% of entriesbls.rs)The BLS Aggregation Service is the low-level engine that performs cryptographic BLS signature aggregation. It runs per-task aggregation loops in isolated spawned tasks, verifying signatures, aggregating them, and checking quorum thresholds.
/// Aggregated operators information for a specific task_response_digest
pub struct AggregatedOperators {
signers_apk_g2: BlsG2Point, // Aggregated public key (G2)
signers_agg_sig_g1: Signature, // Aggregated signature (G1)
signers_total_stake_per_quorum: HashMap<u8, U256>, // Total stake per quorum
signers_operator_ids_set: HashMap<FixedBytes<32>, bool>, // Set of signer operator IDs
}
/// Task metadata for initialization
pub struct TaskMetadata {
task_id: TaskId,
quorum_numbers: Vec<u8>,
quorum_threshold_percentages: QuorumThresholdPercentages,
time_to_expiry: Duration,
window_duration: Duration,
task_created_block: u64,
}
Design Decisions:
task_response_digest aggregation: Different operators may propose different responses (different task_response_digest values). Each digest has its own aggregation state, allowing multiple valid responses to be aggregated simultaneously.start()Initializes the BLS service and spawns main loop:
pub fn start(self) -> (ServiceHandle, AggregateReceiver)
Responsibilities:
run() loop in background taskServiceHandle (for sending messages) and AggregateReceiver (for receiving responses)run()Main message processing loop:
async fn run(
self,
mut msg_receiver: UnboundedReceiver<AggregationMessage>,
aggregate_sender: UnboundedSender<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>,
)
Message Types:
InitializeTask: Creates new task aggregator, spawns single_task_aggregator taskProcessSignature: Forwards signature to appropriate task aggregator via per-task channelMemory Management:
task_channels HashMap with FIFO eviction (MAX_ACTIVE_TASKS = 10000)Error Isolation: Each task runs in its own spawned task. Panics are caught and logged without affecting other tasks.
single_task_aggregator()Per-task aggregation logic:
async fn single_task_aggregator(
avs_registry_service: A,
metadata: TaskMetadata,
aggregated_response_sender: UnboundedSender<...>,
signatures_rx: UnboundedReceiver<SignedTaskResponseDigest>,
) -> Result<(), BlsAggregationServiceError>
Flow:
loop_task_aggregator() to process signaturesIsolation: Each task runs in isolated spawned task. Errors don't propagate to other tasks.
loop_task_aggregator()Main signature processing loop for a task:
async fn loop_task_aggregator(
avs_registry_service: A,
task_id: TaskId,
task_created_block: u64,
time_to_expiry: Duration,
aggregated_response_sender: UnboundedSender<...>,
mut signatures_rx: UnboundedReceiver<SignedTaskResponseDigest>,
operator_state_avs: HashMap<FixedBytes<32>, OperatorAvsState>,
total_stake_per_quorum: HashMap<u8, Uint<256, 4>>,
quorum_threshold_percentage_map: HashMap<u8, u8>,
quorum_apks_g1: Vec<BlsG1Point>,
quorum_nums: Vec<u8>,
window_duration: Duration,
) -> Result<(), BlsAggregationServiceError>
Flow:
aggregated_operators HashMap (keyed by task_response_digest)handle_new_signature() to processMemory Management:
aggregated_operators limited to MAX_AGGREGATED_OPERATORS_PER_TASK = 100 per taskhandle_new_signature()Processes a new signature:
async fn handle_new_signature(
avs_registry_service: &A,
aggregated_operators: &mut HashMap<FixedBytes<32>, AggregatedOperators>,
open_window: &mut bool,
current_aggregated_response: &mut Option<BlsAggregationServiceResponse>,
window_tx: &UnboundedSender<bool>,
task_id: TaskId,
task_created_block: u64,
operator_state_avs: &HashMap<FixedBytes<32>, OperatorAvsState>,
total_stake_per_quorum: &HashMap<u8, Uint<256, 4>>,
quorum_threshold_percentage_map: &HashMap<u8, u8>,
quorum_apks_g1: &[BlsG1Point],
quorum_nums: &[u8],
window_duration: Duration,
signed_task_digest: Option<SignedTaskResponseDigest>,
) -> Result<(), BlsAggregationServiceError>
Flow:
aggregated_operators for the task_response_digestError Handling:
RegistryErrorupdate_aggregated_operators()Updates aggregation state for a task_response_digest:
fn update_aggregated_operators(
task_id: TaskId,
aggregated_operators: &mut HashMap<FixedBytes<32>, AggregatedOperators>,
operator_state: &OperatorAvsState,
task_response_digest: FixedBytes<32>,
bls_signature: Signature,
operator_id: FixedBytes<32>,
) -> Result<AggregatedOperators, BlsAggregationServiceError>
Flow:
aggregate_new_operator(task_id, ...) to add operator to existing aggregationAggregatedOperators entryError Handling:
RegistryError { task_id, operator_context, reason } if operator public keys are missingtask_id parameter for error context (replaces previous .unwrap() panics)The aggregator implements multiple layers of memory protection to prevent unbounded growth:
task_responses)MAX_TASK_RESPONSES = 10000 taskstask_responses_insertion_order: RwLock<HashMap<TaskId, u64>> tracks insertion ordertask_responses_insertion_counter: AtomicU64 for lock-free counter incrementsTASK_RESPONSES_CLEANUP_INTERVAL)pending_signatures)MAX_PENDING_SIGNATURE_TASKS = 1000 tasksPENDING_SIGNATURE_TIMEOUT = 5 secondscreated_at: Instant timestampPENDING_SIGNATURE_CLEANUP_INTERVAL)MAX_ACTIVE_TASKS = 10000 concurrent tasks
MAX_AGGREGATED_OPERATORS_PER_TASK = 100 response digests
The insertion counter uses AtomicU64 for lock-free increments:
task_responses_insertion_counter: Arc<AtomicU64>
// Increment (lock-free)
let counter_value = task_responses_insertion_counter.fetch_add(1, Ordering::Relaxed) + 1;
Benefits: No blocking, no contention, constant-time operation.
The insertion order map uses RwLock to allow concurrent reads:
task_responses_insertion_order: Arc<RwLock<HashMap<TaskId, u64>>>
// Read (non-blocking, allows concurrent readers)
let insertion_order_read = task_responses_insertion_order.read().await;
let oldest = insertion_order_read.iter().min_by_key(|(_, order)| *order);
// Write (exclusive, but only when modifying)
let mut insertion_order_write = task_responses_insertion_order.write().await;
insertion_order_write.insert(task_id, counter_value);
Benefits:
ServiceHandle is cloned before async operations to avoid holding locks:
// Clone handle before async call to avoid holding lock during async operation
let handle = {
let locked_handle = self.service_handle.lock().await;
locked_handle.clone()
};
let result = handle.process_signature(task_signature).await;
Benefits: Lock is released immediately after cloning, allowing other operations to proceed during async call.
task_responses entry removed immediatelyProcessSignature handling, triggers cleanupsingle_task_aggregatorAggregatorCore::Drop cancels background tasksEach aggregation task runs in its own spawned task with dedicated channels, ensuring complete isolation:
// In BLS service run() loop
tokio::spawn(async move {
let result = Self::single_task_aggregator(
avs_registry_service,
metadata,
task_response_sender, // Task-specific response sender
signature_rx,
).await;
// Handle result, log errors
// Response sender dropped here, channel closes naturally
});
// Monitor task for panic detection
tokio::spawn(async move {
if let Err(e) = join_handle.await {
error!("Task aggregator panicked: {:?}", e);
}
});
Benefits:
The system uses unbounded channels for message passing with per-task isolation:
// Task initialization - creates TWO channels per task:
// 1. Signature channel (for sending signatures to task aggregator)
// 2. Response channel (for receiving aggregated responses)
let (signature_tx, signature_rx) = mpsc::unbounded_channel::<SignedTaskResponseDigest>();
let (response_tx, response_rx) = mpsc::unbounded_channel::<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>();
// Store both channels
task_channels.insert(task_id, (signature_tx, response_tx, timestamp));
// Return response receiver to caller (AggregatorCore)
result_sender.send(Ok(response_rx))?;
// Result channel (oneshot for verification results)
let (result_tx, result_rx) = oneshot::channel();
Benefits:
Multiple strategies minimize lock contention:
RwLock::read() allows concurrent readswait_for_aggregation(), ensuring only one waiter per taskExample:
// Read lock to find oldest (non-blocking)
let oldest_task_id_opt = {
let insertion_order_read = self.task_responses_insertion_order.read().await;
insertion_order_read.iter().min_by_key(|(_, order)| *order).map(|(id, _)| *id)
};
// Write lock only when evicting
if let Some(oldest_task_id) = oldest_task_id_opt {
if task_responses_map.remove(&oldest_task_id).is_some() {
let mut insertion_order_write = self.task_responses_insertion_order.write().await;
insertion_order_write.remove(&oldest_task_id);
}
}
All public methods validate inputs before processing:
TaskId::ZERO check)Example:
if task_id == TaskId::ZERO {
return Err(eyre::eyre!("Invalid task_id: zero task ID"));
}
if operator_id.as_slice().iter().all(|&b| b == 0) {
return Err(eyre::eyre!("Invalid operator_id: zero operator ID"));
}
All BlsAggregationServiceError variants are enriched with structured context fields for comprehensive debugging:
BlsAggregationServiceError::TaskExpired {
task_id: TaskId,
reason: String, // e.g., "task expired without reaching quorum threshold"
}
BlsAggregationServiceError::TaskNotFound {
task_id: TaskId,
reason: String, // e.g., "task not found in task_channels (task may not be initialized yet)"
}
BlsAggregationServiceError::SignatureVerificationError {
task_id: TaskId,
operator_id: FixedBytes<32>,
verification_error: SignatureVerificationError, // DuplicateSignature, IncorrectSignature, etc.
}
BlsAggregationServiceError::SignaturesChannelClosed {
task_id: TaskId,
reason: String, // e.g., "signature channel receiver dropped (task aggregator may have finished)"
}
BlsAggregationServiceError::RegistryError {
task_id: TaskId,
operator_context: String, // e.g., " from operator 0x1234..." or empty
reason: String, // e.g., "failed to get operator AVS state at block 12345: ..."
}
BlsAggregationServiceError::DuplicateTaskId {
task_id: TaskId,
reason: String, // e.g., "task already exists in task_channels (message #42)"
}
Benefits of Structured Errors:
task_id and specific reason for immediate debuggingoperator_id to identify problematic operatorsoperator_context when applicablereason strings explain exactly what went wrong and wherematch result {
Err(BlsAggregationServiceError::TaskNotFound { task_id, reason }) => {
error!(
task_id = %task_id,
reason = %reason,
"Task not found - buffering signature for later processing"
);
}
Err(BlsAggregationServiceError::SignatureVerificationError {
task_id,
operator_id,
verification_error,
}) => {
error!(
task_id = %task_id,
operator_id = %hex!(operator_id.as_slice()),
verification_error = ?verification_error,
"Signature verification failed"
);
}
// ... other error variants
}
Errors are isolated at multiple levels:
Example: In process_pending_signatures_for_task(), failed signatures are dropped and logged, but processing continues for remaining signatures.
The system degrades gracefully under failure:
The most significant performance improvements come from two architectural changes:
1. Per-Task Channels (eliminates response stealing):
UnboundedReceiver<Result<BlsAggregationServiceResponse, BlsAggregationServiceError>>2. DashMap for Task States (eliminates lock contention):
Performance Comparison:
| Metric | Before (RwLock) | After (DashMap) | Improvement |
|---|---|---|---|
| Latency (10k tasks) | 5-500ms | 0.1-0.5ms | 10-1000x faster |
| Throughput | Limited by mutex | 10k+ concurrent | Linear scaling |
| Contention | High (global lock) | Zero (per-entry) | Eliminated |
| Response Stealing | Possible | Impossible | Guaranteed correctness |
Implementation:
// In initialize_task()
self.task_response_receivers.insert(task_id, response_receiver);
self.task_states.insert(task_id, TaskState::new(quorum_nums, broadcast_count));
// In wait_for_aggregation()
let receiver = self.task_response_receivers.remove(&task_id).map(|(_, v)| v)?;
// Receive directly from task-specific channel (no mutex lock)
match receiver.recv().await { ... }
// In process_signed_response()
if let Some(mut state) = self.task_states.get_mut(&task_id) {
state.task_responses.insert(digest, response);
}
Benefits:
Batch insertion minimizes lock contention:
// Collect successful responses before lock acquisition
let mut successful_responses: Vec<(TaskResponseDigest, BindingTaskResponse)> = Vec::new();
for signature in signatures_to_process {
// Process signature...
if result.is_ok() {
successful_responses.push((digest, response));
}
}
// Single lock acquisition for batch insert
if !successful_responses.is_empty() {
let mut task_responses_map = task_responses.lock().await;
let task_entry = task_responses_map.entry(task_id).or_default();
for (digest, response) in successful_responses {
task_entry.entry(digest).or_insert(response);
}
}
Read locks allow concurrent access:
// Multiple readers can check insertion order concurrently
let insertion_order_read = task_responses_insertion_order.read().await;
let oldest = insertion_order_read.iter().min_by_key(|(_, order)| *order);
Early returns avoid unnecessary work:
// Early return if no signatures to process
if signatures_to_process.is_empty() {
return;
}
// Early return if task already exists
if task_channels.contains_key(&task_id) {
result_sender.send(Err(BlsAggregationServiceError::DuplicateTaskId)).ok();
continue;
}
initialize_task() called with task metadataServiceHandle to BLS servicerun() loopsingle_task_aggregator task createdsignature_tx added to task_channelspending_notify_tx.send(task_id) triggers retryprocess_signed_response() called with signed responseprocess_signature() called: Sends to BLS servicepending_signaturesMAX_PENDING_SIGNATURE_TASKS)task_responses with size limit checksingle_task_aggregatorupdate_aggregated_operators() called for task_response_digestcheck_if_stake_thresholds_met() calledBlsAggregationServiceResponse)aggregated_response_senderwindow_duration)time_to_expiryProcessSignature handling)Scenario: Operator submits signature before initialize_task() is called.
Handling:
pending_signatures HashMapcreated_at timestamppending_notify_tx.send(task_id) notifies background loopMemory Safety: Limited to MAX_PENDING_SIGNATURE_TASKS = 1000 tasks. New tasks rejected if at limit.
Scenario: System receives more tasks than memory limits allow.
Handling:
MAX_TASK_RESPONSES reached, oldest entry evicted (FIFO)MAX_PENDING_SIGNATURE_TASKS reached, new tasks rejectedMAX_ACTIVE_TASKS reached, oldest task evictedMAX_AGGREGATED_OPERATORS_PER_TASK reached, oldest digest evictedLogging: All evictions logged with warning level, including which entry was evicted and current size.
Scenario: Task expires before quorum is reached.
Handling:
single_task_aggregator expires after time_to_expiryProcessSignature messages detect channel closuretask_channelsScenario: Same operator submits multiple signatures for same task_response_digest.
Handling:
is_duplicate_signature() checks if operator already in signers_operator_ids_setBlsAggregationServiceError::SignatureVerificationError {
task_id,
operator_id: signed_digest.operator_id,
verification_error: SignatureVerificationError::DuplicateSignature,
}
Note: Different operators can sign same digest (aggregated), but same operator cannot sign twice.
Scenario: Client times out or cancels request, receiver dropped.
Handling:
result_channel.send() returns Err if receiver droppedExample:
if signed_digest.result_channel.send(verification_result).is_err() {
warn!("Failed to send verification result (receiver dropped - likely timeout)");
return Ok(()); // Don't propagate error
}
Scenario: Operator state exists but public keys are None.
Handling:
update_aggregated_operators() checks for pub_keys existence
Returns BlsAggregationServiceError::RegistryError { task_id, operator_context, reason } if missing
Error logged with full context:
BlsAggregationServiceError::RegistryError {
task_id,
operator_context: format!(" from operator {}", hex!(operator_id.as_slice())),
reason: "operator public keys not found in operator state".to_string(),
}
Signature not aggregated, but processing continues
Previous Issue: Used .unwrap() which would panic. Now properly handled with structured error return including task_id, operator_context, and detailed reason.
All configuration constants are defined in the respective modules:
core.rs)MAX_PENDING_SIGNATURE_TASKS: usize = 1000
MAX_TASK_RESPONSES: usize = 10000
PENDING_SIGNATURE_TIMEOUT: Duration = Duration::from_secs(5)
PENDING_SIGNATURE_CLEANUP_INTERVAL: Duration = Duration::from_secs(10)
PENDING_SIGNATURE_TIMEOUTTASK_RESPONSES_CLEANUP_INTERVAL: Duration = Duration::from_secs(60)
MAX_TASK_RESPONSES limitbls.rs)MAX_ACTIVE_TASKS: usize = 10000
task_channelsMAX_AGGREGATED_OPERATORS_PER_TASK: usize = 100
aggregated_operatorsTest scenarios:
Test scenarios:
Test scenarios:
Test scenarios:
use newton_prover_aggregator::AggregatorCore;
use eigensdk::client_avsregistry::reader::AvsRegistryChainReader;
// Initialize aggregator
let aggregator = AggregatorCore::new(
avs_registry_reader,
operator_registry_address,
ws_rpc_url,
http_rpc_url,
).await?;
// Initialize a task
aggregator.initialize_task(
task_id,
task_created_block,
quorum_nums,
quorum_threshold_percentage,
time_to_expiry,
).await?;
// Process signed response from operator
aggregator.process_signed_response(signed_response).await?;
// Wait for aggregation (with timeout) - now requires task_id parameter
let aggregated_response = aggregator.wait_for_aggregation(task_id, timeout_duration).await?;
// Submit to contract
let receipt = aggregator.submit_aggregated_response(
&avs_writer,
task,
task_response,
aggregated_response,
).await?;
All errors include structured context for debugging. Pattern matching on error variants provides access to detailed information:
match aggregator.process_signed_response(signed_response).await {
Ok(()) => {
info!("Signature processed successfully");
}
Err(BlsAggregationServiceError::TaskNotFound { task_id, reason }) => {
warn!(
task_id = %task_id,
reason = %reason,
"Task not initialized yet - signature buffered for later processing"
);
// Signature will be processed when task is initialized
}
Err(BlsAggregationServiceError::SignatureVerificationError {
task_id,
operator_id,
verification_error,
}) => {
error!(
task_id = %task_id,
operator_id = %hex!(operator_id.as_slice()),
verification_error = ?verification_error,
"Signature verification failed"
);
}
Err(BlsAggregationServiceError::RegistryError {
task_id,
operator_context,
reason,
}) => {
error!(
task_id = %task_id,
operator_context = %operator_context,
reason = %reason,
"AVS registry error"
);
}
Err(e) => {
error!("Failed to process signature: {}", e);
// All errors include task_id and reason in their Display implementation
}
}
match aggregator.wait_for_aggregation(task_id, timeout_duration).await {
Ok(response) => {
info!("Aggregation successful: {} signers", response.non_signers_pub_keys_g1.len());
}
Err(AggregatorCoreError::TaskNotInitialized { task_id }) => {
warn!(task_id = %task_id, "Task not initialized - call initialize_task first");
}
Err(AggregatorCoreError::Timeout { duration_ms, timeout_ms, operator_errors }) => {
warn!(
task_id = %task_id,
duration_ms,
timeout_ms,
"Aggregation timed out after {} ms",
timeout_ms
);
if let Some(errors) = operator_errors {
warn!("Operator errors: {:?}", errors);
}
}
Err(AggregatorCoreError::AggregationServiceError(BlsAggregationServiceError::TaskExpired { task_id, reason })) => {
warn!(
task_id = %task_id,
reason = %reason,
"Task expired before aggregation completed"
);
}
Err(AggregatorCoreError::Cancelled) => {
warn!(task_id = %task_id, "Aggregation cancelled");
}
Err(e) => {
error!(task_id = %task_id, "Aggregation failed: {}", e);
}
}
Error Context Access:
task_id, operator_id, reason, etc.Display with formatted context (e.g., "task 123 expired: quorum not reached")The aggregator can be customized by:
MAX_TASK_RESPONSES, etc.) based on expected loadPENDING_SIGNATURE_CLEANUP_INTERVAL and TASK_RESPONSES_CLEANUP_INTERVALnewton_prover_metrics usage)The per-task channel architecture provides significant performance improvements:
Metrics (under high concurrency with 10k+ concurrent tasks):
AggregateReceiver mutexScalability Characteristics:
Fault Tolerance:
Error Handling:
Memory Management:
wait_for_aggregation() startsThe Newton Prover Aggregator is designed for reliability, scalability, and high performance.
The architecture separates concerns cleanly: AggregatorCore handles high-level orchestration while BlsAggregatorService handles low-level cryptographic aggregation. The per-task channel architecture eliminates bottlenecks and enables true concurrent processing of multiple aggregation tasks.