| Crates.io | mxpnexus-registry |
| lib.rs | mxpnexus-registry |
| version | 0.2.0 |
| created_at | 2025-11-08 11:30:21.712001+00 |
| updated_at | 2025-12-20 22:12:21.489065+00 |
| description | Central registry service for MXP Nexus agent discovery and health tracking |
| homepage | |
| repository | https://github.com/yafatek/mxpnexus |
| max_upload_size | |
| id | 1922781 |
| size | 271,120 |
Central registry for agent discovery and health tracking in the MXP Nexus mesh.
Redis (for storage):
docker run -d --name mxpnexus-redis -p 6379:6379 redis:7-alpine
Add to Cargo.toml:
[dependencies]
mxpnexus-registry = "0.1.5"
tokio = { version = "1.35", features = ["full"] }
use mxpnexus_registry::{RegistryService, AgentInfo, AgentStatus};
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create registry service
let registry = RegistryService::new("redis://127.0.0.1:6379").await?;
// Register an agent
let info = AgentInfo {
id: uuid::Uuid::new_v4().to_string(),
name: "nlp-processor".to_string(),
capabilities: vec!["nlp".to_string(), "summarization".to_string()],
address: "127.0.0.1:9000".parse()?,
registered_at: chrono::Utc::now(),
last_heartbeat: chrono::Utc::now(),
status: AgentStatus::Online,
metadata: HashMap::new(),
};
registry.register(info).await?;
// Discover agents by capability
let nlp_agents = registry.discover("nlp").await?;
println!("Found {} NLP agents", nlp_agents.len());
// Send heartbeat
registry.heartbeat(&agent_id).await?;
// Subscribe to events
let mut events = registry.subscribe().await;
tokio::spawn(async move {
while let Some(event) = events.recv().await {
println!("Registry event: {:?}", event);
}
});
Ok(())
}
The Registry Service is fully integrated with the MXP protocol, allowing agents to register, discover, and send heartbeats over the network:
use mxp::protocol::{Message, MessageType};
use mxpnexus_registry::{
DiscoverRequest, HeartbeatRequest, MxpHandler, RegisterRequest, RegistryService,
};
use std::{collections::HashMap, sync::Arc};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create Registry Service
let registry = Arc::new(RegistryService::new("redis://127.0.0.1:6379").await?);
// Create MXP Handler
let handler = MxpHandler::new(registry.clone());
// Register agent via MXP
let register_request = RegisterRequest {
id: uuid::Uuid::new_v4().to_string(),
name: "my-agent".to_string(),
capabilities: vec!["nlp".to_string()],
address: "127.0.0.1:9000".parse()?,
metadata: HashMap::new(),
};
let payload = serde_json::to_vec(®ister_request)?;
let message = Message::new(MessageType::AgentRegister, payload);
let response = handler.handle_message(message).await?;
// Discover agents via MXP
let request = DiscoverRequest {
capability: "nlp".to_string(),
};
let payload = serde_json::to_vec(&request)?;
let message = Message::new(MessageType::AgentDiscover, payload);
let response = handler.handle_message(message).await?;
// Send heartbeat via MXP
let request = HeartbeatRequest {
agent_id: register_request.id.clone(),
};
let payload = serde_json::to_vec(&request)?;
let message = Message::new(MessageType::AgentHeartbeat, payload);
let response = handler.handle_message(message).await?;
Ok(())
}
Run the comprehensive example:
# Make sure Redis is running
docker run -d --name mxpnexus-redis -p 6379:6379 redis:7-alpine
# Run the example
cargo run --example basic_usage
The main entry point for interacting with the registry.
// Default configuration (30s heartbeat timeout)
let registry = RegistryService::new("redis://127.0.0.1:6379").await?;
// Custom configuration
let registry = RegistryService::with_config(
"redis://127.0.0.1:6379",
Duration::from_secs(60), // Custom heartbeat timeout
).await?;
let info = AgentInfo {
id: uuid::Uuid::new_v4().to_string(),
name: "my-agent".to_string(),
capabilities: vec!["nlp".to_string()],
address: "127.0.0.1:9000".parse()?,
registered_at: chrono::Utc::now(),
last_heartbeat: chrono::Utc::now(),
status: AgentStatus::Online,
metadata: HashMap::new(),
};
registry.register(info).await?;
Validation Rules:
// Find all agents with "nlp" capability
let nlp_agents = registry.discover("nlp").await?;
// Only returns online agents
for agent in nlp_agents {
println!("{}: {}", agent.name, agent.address);
}
// Send heartbeat (resets TTL to 30s)
registry.heartbeat(&agent_id).await?;
// If heartbeat expires, agent is marked as Offline
// Health monitor checks every 5 seconds
let mut events = registry.subscribe().await;
tokio::spawn(async move {
while let Some(event) = events.recv().await {
match event {
RegistryEvent::AgentRegistered { agent_id, info } => {
println!("Agent registered: {}", info.name);
}
RegistryEvent::AgentUnregistered { agent_id } => {
println!("Agent unregistered: {}", agent_id);
}
RegistryEvent::AgentStatusChanged { agent_id, old, new } => {
println!("Agent {} status: {:?} -> {:?}", agent_id, old, new);
}
RegistryEvent::HeartbeatReceived { agent_id, timestamp } => {
println!("Heartbeat from {}", agent_id);
}
}
}
});
// Start background health monitor
let registry = Arc::new(registry);
let _monitor = registry.clone().start_health_monitor();
// Check registry health
let health = registry.health_check().await?;
println!("Total agents: {}", health.total_agents);
println!("Online agents: {}", health.online_agents);
pub struct AgentInfo {
pub id: AgentId, // Unique identifier (UUID)
pub name: String, // Human-readable name (unique)
pub capabilities: Vec<String>, // What this agent can do
pub address: SocketAddr, // Network address
pub registered_at: DateTime<Utc>, // Registration timestamp
pub last_heartbeat: DateTime<Utc>, // Last heartbeat timestamp
pub status: AgentStatus, // Current status
pub metadata: HashMap<String, String>, // Additional metadata
}
pub enum AgentStatus {
Online, // Healthy and responding to heartbeats
Offline, // Heartbeat expired
Degraded, // Online but reporting issues
}
pub enum RegistryEvent {
AgentRegistered { agent_id: AgentId, info: AgentInfo },
AgentUnregistered { agent_id: AgentId },
AgentStatusChanged { agent_id: AgentId, old: AgentStatus, new: AgentStatus },
HeartbeatReceived { agent_id: AgentId, timestamp: DateTime<Utc> },
}
┌─────────────────────────────────────────┐
│ Registry Service │
│ │
│ ┌──────────┐ ┌──────────┐ │
│ │ Register │ │ Discover │ │
│ │ Handler │ │ Handler │ │
│ └────┬─────┘ └────┬─────┘ │
│ │ │ │
│ └─────────────┴─────────┐ │
│ │ │
│ ┌──────▼──────┐ │
│ │ Registry │ │
│ │ Core │ │
│ └──────┬──────┘ │
│ │ │
│ ┌──────────────────────┼────┐ │
│ │ │ │ │
│ ┌────▼────┐ ┌───────────▼┐ ┌▼──┐│
│ │ Redis │ │ Event │ │ H │││
│ │ Storage │ │ Bus │ │ M │││
│ └─────────┘ └────────────┘ └───┘│
│ │
└─────────────────────────────────────────┘
# Agent metadata (Hash)
agent:{agent_id} -> JSON
# Capability index (Set)
capability:nlp -> {agent-1, agent-2, ...}
# Agent name index (String)
agent_name:my-agent -> agent-id
# Heartbeat TTL (String with TTL)
heartbeat:{agent_id} -> "1" (expires in 30s)
# All agents (Set)
agents:all -> {agent-1, agent-2, ...}
# Run unit tests (no Redis required)
cargo test
# Run all tests (requires Redis)
docker run -d --name mxpnexus-redis -p 6379:6379 redis:7-alpine
cargo test -- --include-ignored
# Redis connection
REDIS_URL=redis://127.0.0.1:6379
# Logging
RUST_LOG=info
// Default: 30s timeout, 5s check interval
let registry = RegistryService::with_config(
redis_url,
Duration::from_secs(30), // Heartbeat timeout
).await?;
All operations return Result<T, Error>:
pub enum Error {
AgentNotFound(String),
AgentNameExists(String),
InvalidAgentName(String),
InvalidCapability(String),
InvalidCapabilities(String),
Redis(redis::RedisError),
Serialization(serde_json::Error),
Internal(String),
}
Uses tracing for structured logging:
// Initialize logging
tracing_subscriber::fmt::init();
// Set log level
RUST_LOG=mxpnexus_registry=debug cargo run
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
command: redis-server --appendonly yes
registry:
image: mxpnexus/registry:latest
ports:
- "9000:9000"
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379
- RUST_LOG=info
volumes:
redis-data:
For production, use Redis Sentinel or Redis Cluster:
// Redis Sentinel
let registry = RegistryService::new(
"redis://sentinel1:26379,sentinel2:26379,sentinel3:26379/mymaster"
).await?;
Dual-licensed under MIT OR Apache-2.0
See CONTRIBUTING.md for guidelines.