| Crates.io | daa-prime-core |
| lib.rs | daa-prime-core |
| version | 0.2.1 |
| created_at | 2025-06-25 15:20:48.311074+00 |
| updated_at | 2025-06-25 15:59:14.358035+00 |
| description | Core shared structures and protocol definitions for Prime distributed ML framework |
| homepage | https://github.com/example/prime-rust |
| repository | https://github.com/example/prime-rust |
| max_upload_size | |
| id | 1726024 |
| size | 185,726 |
Core shared structures and protocol definitions for the Prime distributed machine learning framework. This crate provides the foundational types, protocols, and message formats used across all Prime components.
DAA Prime Core is the foundational layer of the Prime distributed ML system, providing:
Add this to your Cargo.toml:
[dependencies]
daa-prime-core = "0.2.1"
use daa_prime_core::{NodeId, TrainingConfig, OptimizerType, AggregationStrategy};
// Create a node identifier
let node_id = NodeId::new("trainer-001");
// Configure training parameters
let config = TrainingConfig {
batch_size: 32,
learning_rate: 0.001,
epochs: 100,
optimizer: OptimizerType::Adam { beta1: 0.9, beta2: 0.999 },
aggregation_strategy: AggregationStrategy::FederatedAveraging,
};
use daa_prime_core::{ProtocolMessage, MessageType, NodeId};
// Create a protocol message
let sender = NodeId::new("coordinator-1");
let message = ProtocolMessage::new(sender, MessageType::Ping);
// Serialize for network transmission
let serialized = serde_json::to_string(&message)?;
// Send to specific recipient
let recipient = NodeId::new("trainer-1");
let targeted_message = ProtocolMessage::new(sender, MessageType::Ping)
.with_recipient(recipient);
use daa_prime_core::{GradientUpdate, TrainingMetrics, NodeId};
use std::collections::HashMap;
// Create a gradient update
let update = GradientUpdate {
node_id: NodeId::new("trainer-001"),
model_version: 42,
round: 10,
gradients: HashMap::from([
("layer1.weight".to_string(), vec![0.1, -0.2, 0.05]),
("layer1.bias".to_string(), vec![0.01]),
]),
metrics: TrainingMetrics {
loss: 0.234,
accuracy: 0.892,
samples_processed: 1000,
computation_time_ms: 150,
},
timestamp: 1634567890,
};
NodeId: Unique identifier for network participantsModelMetadata: Model version and architecture informationTrainingConfig: Comprehensive training parameter configurationOptimizerType: Support for SGD, Adam, and AdamW optimizersAggregationStrategy: Federated averaging, secure aggregation, and Byzantine-fault-tolerant methodsGradientUpdate: Structured gradient sharing between nodesTrainingMetrics: Training performance and timing metricsMessageType: All supported message types for distributed coordinationProtocolMessage: Signed, versioned message wrapperProtocolHandler: Trait for implementing custom message handlersThe protocol supports several categories of messages:
GradientUpdate: Share model gradients with coordinatorsModelSync: Distribute updated model parametersTrainingRequest: Request participation in training roundsConsensusProposal: Propose values for consensusConsensusVote: Vote on consensus proposalsConsensusCommit: Commit agreed-upon valuesDhtPut: Store key-value pairs in distributed hash tableDhtGet: Retrieve values from DHTDhtResponse: Response with requested DHT valuesPing/Pong: Network connectivity testingJoinRequest/JoinResponse: Network participation coordinationThe crate includes comprehensive testing utilities:
use daa_prime_core::*;
use proptest::prelude::*;
// Property-based testing for gradient updates
proptest! {
#[test]
fn test_gradient_serialization(
node_id in "[a-zA-Z0-9]{5,20}",
version in 0u64..1000u64,
) {
let update = GradientUpdate {
node_id: NodeId::new(node_id),
model_version: version,
// ... rest of fields
};
let serialized = serde_json::to_string(&update)?;
let deserialized: GradientUpdate = serde_json::from_str(&serialized)?;
assert_eq!(update.node_id, deserialized.node_id);
}
}
All operations return Result<T, Error> for comprehensive error handling:
use daa_prime_core::{Result, Error};
fn process_message(data: &[u8]) -> Result<ProtocolMessage> {
let message: ProtocolMessage = serde_json::from_slice(data)
.map_err(|e| Error::Serialization(e.to_string()))?;
// Validate message format
if message.version != PROTOCOL_VERSION {
return Err(Error::UnsupportedVersion(message.version));
}
Ok(message)
}
Prime Core integrates seamlessly with the broader DAA ecosystem:
use daa_prime_core::{ProtocolHandler, ProtocolMessage, Result};
use async_trait::async_trait;
struct CustomHandler;
#[async_trait]
impl ProtocolHandler for CustomHandler {
async fn handle_message(&self, message: ProtocolMessage) -> Result<Option<ProtocolMessage>> {
match message.message_type {
MessageType::Ping => {
// Respond with Pong
Ok(Some(ProtocolMessage::new(
message.sender.clone(),
MessageType::Pong
)))
},
_ => Ok(None)
}
}
async fn validate_message(&self, message: &ProtocolMessage) -> Result<()> {
// Custom validation logic
if message.signature.is_none() {
return Err(Error::InvalidMessage("Missing signature".to_string()));
}
Ok(())
}
}
use daa_prime_core::ModelMetadata;
let metadata = ModelMetadata {
id: "resnet50-v1".to_string(),
version: 1,
architecture: "ResNet".to_string(),
parameters_count: 25_557_032,
created_at: 1634567890,
updated_at: 1634567890,
};
// Track model evolution
let updated_metadata = ModelMetadata {
version: metadata.version + 1,
updated_at: current_timestamp(),
..metadata
};
serde_json::from_slice for network dataContributions are welcome! Please see our Contributing Guide for details.
This project is licensed under the MIT License - see the LICENSE file for details.
daa-prime-dht: Distributed hash table implementationdaa-prime-trainer: Training node implementationdaa-prime-coordinator: Coordination and governancedaa-prime-cli: Command-line interface