| Crates.io | ruv-swarm-transport |
| lib.rs | ruv-swarm-transport |
| version | 1.0.5 |
| created_at | 2025-06-30 04:41:17.178082+00 |
| updated_at | 2025-07-02 20:39:37.562131+00 |
| description | Transport layer for RUV-FANN swarm communication with WebSocket and SharedMemory support |
| homepage | |
| repository | https://github.com/ruvnet/ruv-FANN |
| max_upload_size | |
| id | 1731416 |
| size | 187,192 |
High-performance transport layer for distributed agent communication in the RUV-FANN swarm intelligence framework.
The ruv-swarm-transport crate provides a unified, async-first networking abstraction designed specifically for intelligent agent swarms. It delivers multiple transport backends optimized for different deployment scenarios, from high-throughput in-process communication to secure networked connections and low-latency shared memory transfers.
Add to your Cargo.toml:
[dependencies]
ruv-swarm-transport = "0.1.0"
# Optional: Enable specific backends
ruv-swarm-transport = { version = "0.1.0", features = ["websocket", "shared-memory", "wasm"] }
websocket (default): WebSocket transport implementationshared-memory (default): Shared memory transport for IPCwasm: Web Assembly support for browser environmentsPerfect for co-located agents requiring microsecond-level communication:
use ruv_swarm_transport::{Transport, TransportConfig, in_process::InProcessTransport, protocol::Message};
// Create connected transport pair
let config = TransportConfig::default();
let (mut agent1, mut agent2) = InProcessTransport::create_pair(
"agent_1".to_string(),
"agent_2".to_string(),
config,
).await?;
// Send structured message
let request = Message::request(
"agent_1".to_string(),
"compute_fibonacci".to_string(),
serde_json::json!({"n": 42})
);
agent1.send("agent_2", request).await?;
// Receive and process
let (sender, message) = agent2.receive().await?;
println!("Received from {}: {:?}", sender, message);
Ideal for distributed agents across network boundaries:
use ruv_swarm_transport::{websocket::{WebSocketTransport, WsMode}, TransportConfig};
// Server setup
let server = WebSocketTransport::new(
WsMode::Server {
bind_addr: "0.0.0.0:8080".to_string()
},
TransportConfig::default()
).await?;
// Client connection with TLS
let client = WebSocketTransport::new(
WsMode::Client {
url: "wss://swarm-hub.example.com:8080".to_string()
},
TransportConfig {
connection_timeout_ms: 10000,
retry_attempts: 5,
enable_compression: true,
..Default::default()
}
).await?;
// Broadcast to all connected agents
let announcement = Message::broadcast(
"coordinator".to_string(),
"task_assignment".to_string(),
serde_json::json!({
"task_id": "neural_training_batch_001",
"model_params": {...},
"deadline": "2024-01-15T10:00:00Z"
})
);
client.broadcast(announcement).await?;
Optimized for high-volume message passing between processes:
use ruv_swarm_transport::{shared_memory::{SharedMemoryTransport, SharedMemoryInfo}};
let memory_config = SharedMemoryInfo {
name: "neural_swarm_shared".to_string(),
size: 100 * 1024 * 1024, // 100MB shared segment
ring_buffer_size: 10 * 1024 * 1024, // 10MB ring buffer
};
let transport = SharedMemoryTransport::new(
memory_config,
TransportConfig {
max_message_size: 50 * 1024 * 1024, // 50MB max message
enable_compression: false, // Disable for raw throughput
..Default::default()
}
).await?;
// High-frequency neural network weight updates
for epoch in 0..1000 {
let weight_update = Message::event(
"trainer_node".to_string(),
"weights_updated".to_string(),
serde_json::json!({
"epoch": epoch,
"weights": generate_weight_matrix(), // Large tensor data
"loss": 0.001234
})
);
transport.broadcast(weight_update).await?;
}
let config = TransportConfig {
max_message_size: 100 * 1024 * 1024, // 100MB max
connection_timeout_ms: 30000, // 30 second timeout
retry_attempts: 10, // Aggressive retry
enable_compression: true, // Enable compression
compression_threshold: 4096, // Compress >4KB messages
};
use ruv_swarm_transport::protocol::{Message, MessageType, ProtocolVersion};
// Create versioned request with headers
let message = Message::request(
"neural_coordinator".to_string(),
"distribute_training".to_string(),
serde_json::json!({
"dataset": "imagenet_subset",
"batch_size": 256,
"learning_rate": 0.001
})
)
.with_header("Authorization".to_string(), "Bearer <token>".to_string())
.with_header("Content-Encoding".to_string(), "gzip".to_string())
.with_priority(255) // Highest priority
.with_ttl(10); // 10 hop limit
| Type | Purpose | Response Required | Use Case |
|---|---|---|---|
| Request | Command execution | Yes | RPC calls, data queries |
| Response | Request acknowledgment | No | Results, acknowledgments |
| Event | State notifications | No | Status updates, alerts |
| Broadcast | Topic-based messaging | No | Global announcements |
| Heartbeat | Connection monitoring | No | Health checking |
| Control | Protocol management | Varies | Handshakes, flow control |
// Client initiates handshake
let hello = Message::new("client_agent".to_string(), MessageType::Control {
operation: ControlOperation::Hello {
version: ProtocolVersion::CURRENT,
capabilities: vec!["compression".to_string(), "binary_codec".to_string()],
metadata: [("agent_type".to_string(), "neural_trainer".to_string())].into(),
}
});
// Server responds with negotiated capabilities
let hello_ack = Message::new("server_hub".to_string(), MessageType::Control {
operation: ControlOperation::HelloAck {
version: ProtocolVersion::CURRENT,
capabilities: vec!["compression".to_string()], // Intersection of capabilities
metadata: [("hub_region".to_string(), "us-west-2".to_string())].into(),
}
});
Performance characteristics on typical hardware (Intel i7-10700K, 32GB RAM):
| Transport | Latency (avg) | Throughput | Memory Usage | CPU Usage |
|---|---|---|---|---|
| In-Process | ~500ns | 2M+ msgs/sec | Minimal | <1% |
| Shared Memory | ~2ฮผs | 1M+ msgs/sec | ~100MB | 2-5% |
| WebSocket (local) | ~50ฮผs | 100K msgs/sec | ~10MB | 5-10% |
| WebSocket (network) | ~10ms* | 10K msgs/sec | ~50MB | 10-15% |
*Network latency dependent
# Run all transport benchmarks
cargo bench
# Specific benchmark suites
cargo bench --bench transport_benchmarks -- in_process
cargo bench --bench transport_benchmarks -- shared_memory
cargo bench --bench transport_benchmarks -- websocket
// Benchmark different message sizes
for size in &[1_KB, 10_KB, 100_KB, 1_MB, 10_MB] {
let data = vec![0u8; *size];
let msg = Message::event("bench".to_string(), "test".to_string(),
serde_json::to_value(&data)?);
let start = Instant::now();
transport.send("target", msg).await?;
let duration = start.elapsed();
println!("{}KB: {:?}", size / 1024, duration);
}
let secure_client = WebSocketTransport::new(
WsMode::Client {
url: "wss://secure-swarm.company.com:443".to_string()
},
TransportConfig {
connection_timeout_ms: 15000,
retry_attempts: 3,
// TLS handled automatically by tokio-tungstenite
..Default::default()
}
).await?;
use ruv_swarm_transport::TransportError;
match transport.send("target_agent", message).await {
Ok(()) => println!("Message sent successfully"),
Err(TransportError::ConnectionError(e)) => {
// Handle connection issues with retry
eprintln!("Connection failed: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
// Automatic reconnection will be attempted
},
Err(TransportError::MessageTooLarge { size, max }) => {
eprintln!("Message {} bytes exceeds limit {}", size, max);
// Implement message chunking or compression
},
Err(e) => eprintln!("Transport error: {}", e),
}
#[cfg(target_arch = "wasm32")]
use ruv_swarm_transport::wasm::WebWorkerTransport;
// Enable SharedArrayBuffer for high-performance communication
let transport = WebWorkerTransport::new(
worker_handle,
TransportConfig::default()
).await?;
// Works seamlessly with same Transport trait
transport.send("web_worker_agent", message).await?;
# Run all tests
cargo test
# Integration tests
cargo test --test integration_test
# Test specific transport
cargo test websocket
cargo test shared_memory
cargo test in_process
We welcome contributions! Please read our Contributing Guide and Code of Conduct.
git clone https://github.com/ruvnet/ruv-FANN.git
cd ruv-FANN/ruv-swarm/crates/ruv-swarm-transport
cargo build
cargo test
Licensed under either of:
at your option.
Created by rUv - Building the future of distributed intelligence ๐ง โจ
Part of the RUV-FANN (Robust Unified Virtual Functional Artificial Neural Network) ecosystem - enabling adaptive, self-organizing swarm intelligence systems.