mxpnexus-registry

Crates.iomxpnexus-registry
lib.rsmxpnexus-registry
version0.2.0
created_at2025-11-08 11:30:21.712001+00
updated_at2025-12-20 22:12:21.489065+00
descriptionCentral registry service for MXP Nexus agent discovery and health tracking
homepage
repositoryhttps://github.com/yafatek/mxpnexus
max_upload_size
id1922781
size271,120
Feras Alawadi (ferasawadi)

documentation

README

MXP Nexus Registry Service

Central registry for agent discovery and health tracking in the MXP Nexus mesh.

Features

  • Agent Registration: Register agents with capabilities and metadata
  • Capability-Based Discovery: Find agents by what they can do
  • Health Tracking: Monitor agent liveness via heartbeats with TTL
  • Event Notifications: Subscribe to registry changes (pub/sub)
  • Redis-Backed: Fast, persistent storage with connection pooling
  • Production-Ready: Comprehensive error handling and logging

Quick Start

Prerequisites

  1. Redis (for storage):

    docker run -d --name mxpnexus-redis -p 6379:6379 redis:7-alpine
    
  2. Add to Cargo.toml:

    [dependencies]
    mxpnexus-registry = "0.1.5"
    tokio = { version = "1.35", features = ["full"] }
    

Basic Usage

use mxpnexus_registry::{RegistryService, AgentInfo, AgentStatus};
use std::collections::HashMap;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create registry service
    let registry = RegistryService::new("redis://127.0.0.1:6379").await?;

    // Register an agent
    let info = AgentInfo {
        id: uuid::Uuid::new_v4().to_string(),
        name: "nlp-processor".to_string(),
        capabilities: vec!["nlp".to_string(), "summarization".to_string()],
        address: "127.0.0.1:9000".parse()?,
        registered_at: chrono::Utc::now(),
        last_heartbeat: chrono::Utc::now(),
        status: AgentStatus::Online,
        metadata: HashMap::new(),
    };

    registry.register(info).await?;

    // Discover agents by capability
    let nlp_agents = registry.discover("nlp").await?;
    println!("Found {} NLP agents", nlp_agents.len());

    // Send heartbeat
    registry.heartbeat(&agent_id).await?;

    // Subscribe to events
    let mut events = registry.subscribe().await;
    tokio::spawn(async move {
        while let Some(event) = events.recv().await {
            println!("Registry event: {:?}", event);
        }
    });

    Ok(())
}

Using MXP Protocol Integration

The Registry Service is fully integrated with the MXP protocol, allowing agents to register, discover, and send heartbeats over the network:

use mxp::protocol::{Message, MessageType};
use mxpnexus_registry::{
    DiscoverRequest, HeartbeatRequest, MxpHandler, RegisterRequest, RegistryService,
};
use std::{collections::HashMap, sync::Arc};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create Registry Service
    let registry = Arc::new(RegistryService::new("redis://127.0.0.1:6379").await?);
    
    // Create MXP Handler
    let handler = MxpHandler::new(registry.clone());
    
    // Register agent via MXP
    let register_request = RegisterRequest {
        id: uuid::Uuid::new_v4().to_string(),
        name: "my-agent".to_string(),
        capabilities: vec!["nlp".to_string()],
        address: "127.0.0.1:9000".parse()?,
        metadata: HashMap::new(),
    };
    
    let payload = serde_json::to_vec(&register_request)?;
    let message = Message::new(MessageType::AgentRegister, payload);
    let response = handler.handle_message(message).await?;
    
    // Discover agents via MXP
    let request = DiscoverRequest {
        capability: "nlp".to_string(),
    };
    let payload = serde_json::to_vec(&request)?;
    let message = Message::new(MessageType::AgentDiscover, payload);
    let response = handler.handle_message(message).await?;
    
    // Send heartbeat via MXP
    let request = HeartbeatRequest {
        agent_id: register_request.id.clone(),
    };
    let payload = serde_json::to_vec(&request)?;
    let message = Message::new(MessageType::AgentHeartbeat, payload);
    let response = handler.handle_message(message).await?;
    
    Ok(())
}

Examples

Run the comprehensive example:

# Make sure Redis is running
docker run -d --name mxpnexus-redis -p 6379:6379 redis:7-alpine

# Run the example
cargo run --example basic_usage

API Overview

RegistryService

The main entry point for interacting with the registry.

Creating a Registry

// Default configuration (30s heartbeat timeout)
let registry = RegistryService::new("redis://127.0.0.1:6379").await?;

// Custom configuration
let registry = RegistryService::with_config(
    "redis://127.0.0.1:6379",
    Duration::from_secs(60), // Custom heartbeat timeout
).await?;

Registering Agents

let info = AgentInfo {
    id: uuid::Uuid::new_v4().to_string(),
    name: "my-agent".to_string(),
    capabilities: vec!["nlp".to_string()],
    address: "127.0.0.1:9000".parse()?,
    registered_at: chrono::Utc::now(),
    last_heartbeat: chrono::Utc::now(),
    status: AgentStatus::Online,
    metadata: HashMap::new(),
};

registry.register(info).await?;

Validation Rules:

  • Name: 3-64 chars, alphanumeric + hyphens/underscores
  • Capabilities: 1-20 items, each 2-32 chars, lowercase alphanumeric + hyphens
  • Name must be unique

Discovering Agents

// Find all agents with "nlp" capability
let nlp_agents = registry.discover("nlp").await?;

// Only returns online agents
for agent in nlp_agents {
    println!("{}: {}", agent.name, agent.address);
}

Heartbeats

// Send heartbeat (resets TTL to 30s)
registry.heartbeat(&agent_id).await?;

// If heartbeat expires, agent is marked as Offline
// Health monitor checks every 5 seconds

Event Subscription

let mut events = registry.subscribe().await;

tokio::spawn(async move {
    while let Some(event) = events.recv().await {
        match event {
            RegistryEvent::AgentRegistered { agent_id, info } => {
                println!("Agent registered: {}", info.name);
            }
            RegistryEvent::AgentUnregistered { agent_id } => {
                println!("Agent unregistered: {}", agent_id);
            }
            RegistryEvent::AgentStatusChanged { agent_id, old, new } => {
                println!("Agent {} status: {:?} -> {:?}", agent_id, old, new);
            }
            RegistryEvent::HeartbeatReceived { agent_id, timestamp } => {
                println!("Heartbeat from {}", agent_id);
            }
        }
    }
});

Health Monitoring

// Start background health monitor
let registry = Arc::new(registry);
let _monitor = registry.clone().start_health_monitor();

// Check registry health
let health = registry.health_check().await?;
println!("Total agents: {}", health.total_agents);
println!("Online agents: {}", health.online_agents);

Data Models

AgentInfo

pub struct AgentInfo {
    pub id: AgentId,                      // Unique identifier (UUID)
    pub name: String,                     // Human-readable name (unique)
    pub capabilities: Vec<String>,        // What this agent can do
    pub address: SocketAddr,              // Network address
    pub registered_at: DateTime<Utc>,     // Registration timestamp
    pub last_heartbeat: DateTime<Utc>,    // Last heartbeat timestamp
    pub status: AgentStatus,              // Current status
    pub metadata: HashMap<String, String>, // Additional metadata
}

AgentStatus

pub enum AgentStatus {
    Online,    // Healthy and responding to heartbeats
    Offline,   // Heartbeat expired
    Degraded,  // Online but reporting issues
}

RegistryEvent

pub enum RegistryEvent {
    AgentRegistered { agent_id: AgentId, info: AgentInfo },
    AgentUnregistered { agent_id: AgentId },
    AgentStatusChanged { agent_id: AgentId, old: AgentStatus, new: AgentStatus },
    HeartbeatReceived { agent_id: AgentId, timestamp: DateTime<Utc> },
}

Architecture

Components

┌─────────────────────────────────────────┐
│         Registry Service                │
│                                         │
│  ┌──────────┐  ┌──────────┐           │
│  │ Register │  │ Discover │           │
│  │ Handler  │  │ Handler  │           │
│  └────┬─────┘  └────┬─────┘           │
│       │             │                  │
│       └─────────────┴─────────┐        │
│                               │        │
│                        ┌──────▼──────┐ │
│                        │  Registry   │ │
│                        │    Core     │ │
│                        └──────┬──────┘ │
│                               │        │
│        ┌──────────────────────┼────┐   │
│        │                      │    │   │
│   ┌────▼────┐    ┌───────────▼┐  ┌▼──┐│
│   │  Redis  │    │   Event    │  │ H │││
│   │ Storage │    │    Bus     │  │ M │││
│   └─────────┘    └────────────┘  └───┘│
│                                         │
└─────────────────────────────────────────┘

Redis Data Structures

# Agent metadata (Hash)
agent:{agent_id} -> JSON

# Capability index (Set)
capability:nlp -> {agent-1, agent-2, ...}

# Agent name index (String)
agent_name:my-agent -> agent-id

# Heartbeat TTL (String with TTL)
heartbeat:{agent_id} -> "1" (expires in 30s)

# All agents (Set)
agents:all -> {agent-1, agent-2, ...}

Performance

Targets

  • Register: < 5ms (p99)
  • Discover: < 1ms (p99)
  • Heartbeat: < 100μs (p99)
  • Unregister: < 5ms (p99)

Optimizations

  • Connection pooling (20 connections)
  • Async I/O throughout
  • Efficient Redis data structures
  • Background health monitoring

Testing

Unit Tests

# Run unit tests (no Redis required)
cargo test

# Run all tests (requires Redis)
docker run -d --name mxpnexus-redis -p 6379:6379 redis:7-alpine
cargo test -- --include-ignored

Test Coverage

  • ✅ 19 tests total
  • ✅ 8 unit tests (validation logic)
  • ✅ 11 integration tests (Redis operations)
  • ✅ 4 doc tests (API examples)

Configuration

Environment Variables

# Redis connection
REDIS_URL=redis://127.0.0.1:6379

# Logging
RUST_LOG=info

Heartbeat Settings

// Default: 30s timeout, 5s check interval
let registry = RegistryService::with_config(
    redis_url,
    Duration::from_secs(30), // Heartbeat timeout
).await?;

Error Handling

All operations return Result<T, Error>:

pub enum Error {
    AgentNotFound(String),
    AgentNameExists(String),
    InvalidAgentName(String),
    InvalidCapability(String),
    InvalidCapabilities(String),
    Redis(redis::RedisError),
    Serialization(serde_json::Error),
    Internal(String),
}

Logging

Uses tracing for structured logging:

// Initialize logging
tracing_subscriber::fmt::init();

// Set log level
RUST_LOG=mxpnexus_registry=debug cargo run

Production Deployment

Docker Compose

version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data
    command: redis-server --appendonly yes
  
  registry:
    image: mxpnexus/registry:latest
    ports:
      - "9000:9000"
    depends_on:
      - redis
    environment:
      - REDIS_URL=redis://redis:6379
      - RUST_LOG=info

volumes:
  redis-data:

High Availability

For production, use Redis Sentinel or Redis Cluster:

// Redis Sentinel
let registry = RegistryService::new(
    "redis://sentinel1:26379,sentinel2:26379,sentinel3:26379/mymaster"
).await?;

Roadmap

  • Redis Cluster support
  • Metrics export (Prometheus)
  • Agent groups/namespaces
  • Rate limiting
  • Multi-capability discovery filters
  • Agent versioning support

License

Dual-licensed under MIT OR Apache-2.0

Contributing

See CONTRIBUTING.md for guidelines.

Support

Commit count: 0

cargo fmt