Crates.io | rvoip-call-engine |
lib.rs | rvoip-call-engine |
version | 0.1.26 |
created_at | 2025-07-03 07:54:51.234451+00 |
updated_at | 2025-08-15 17:52:19.05335+00 |
description | Call center engine and advanced call management for the rvoip stack |
homepage | https://github.com/eisenzopf/rvoip |
repository | https://github.com/eisenzopf/rvoip |
max_upload_size | |
id | 1735988 |
size | 1,365,353 |
The call-engine
library provides a working proof of concept example of a call center built on top of the RVOIP ecosystem. It does not include the features needed for a production call center. It does handle agent registration, call queuing, intelligent routing, and B2BUA call bridging with proper audio flow between customers and agents.
dialog-core
and transaction-core
media-core
and rtp-core
session-core
sip-transport
The Call Engine sits at the business logic layer, orchestrating call center operations while delegating protocol and media handling to specialized components:
┌─────────────────────────────────────────┐
│ Call Center Application │
├─────────────────────────────────────────┤
│ rvoip-call-engine ⬅️ YOU ARE HERE
├─────────────────────────────────────────┤
│ rvoip-session-core │
├─────────────────────────────────────────┤
│ rvoip-dialog-core │ rvoip-media-core │
├─────────────────────────────────────────┤
│ rvoip-transaction │ rvoip-rtp-core │
│ -core │ │
├─────────────────────────────────────────┤
│ rvoip-sip-core │
├─────────────────────────────────────────┤
│ Network Layer │
└─────────────────────────────────────────┘
Clean separation of concerns across the call center stack:
┌─────────────────┐ Call Center API ┌─────────────────┐
│ │ ──────────────────────► │ │
│ Management UI │ │ call-engine │
│ (Admin/Agent) │ ◄──────────────────────── │ (Business Logic)│
│ │ Real-time Events │ │
└─────────────────┘ └─────────────────┘
│
Session Coordination │ Queue Management
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ session-core │ │ Database │
│ (SIP Sessions) │ │ (State/Queues) │
└─────────────────┘ └─────────────────┘
related_session_id
┌─────────────────────────────────────────────────────────────┐
│ Call Center Application │
├─────────────────────────────────────────────────────────────┤
│ rvoip-call-engine │
│ ┌─────────────┬─────────────┬─────────────┬─────────────┐ │
│ │ orchestrator│ database │ queue │ routing │ │
│ ├─────────────┼─────────────┼─────────────┼─────────────┤ │
│ │ agent │ calls │ types │ handler │ │
│ └─────────────┴─────────────┴─────────────┴─────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ rvoip-session-core │
├─────────────────────────────────────────────────────────────┤
│ dialog-core|transaction-core│media-core│rtp-core│sip-core │
└─────────────────────────────────────────────────────────────┘
orchestrator/core.rs
: Main engine coordination (171 lines)orchestrator/calls.rs
: Call processing and B2BUA operations (387 lines)orchestrator/routing.rs
: Routing algorithms and queue management (227 lines)orchestrator/agents.rs
: Agent registration and status management (98 lines)database/
: Database integration with atomic operationsqueue/
: Queue management with priority and overflow handlingRefactored from monolithic structure to clean, maintainable modules
Add to your Cargo.toml
:
[dependencies]
rvoip-call-engine = "0.1.0"
rvoip-session-core = "0.1.0"
tokio = { version = "1.0", features = ["full"] }
tracing = "0.1"
tracing-subscriber = "0.3"
use rvoip_call_engine::prelude::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let engine = CallCenterEngine::new(CallCenterConfig::default()).await?;
println!("🏢 Call Center Server starting on port 5060...");
engine.run().await?;
Ok(())
}
use rvoip_call_engine::{CallCenterEngine, CallCenterConfig, GeneralConfig, DatabaseConfig};
use tracing_subscriber;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize comprehensive logging
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_target(false)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.init();
// Production-grade call center configuration
let config = CallCenterConfig {
general: GeneralConfig {
domain: "call-center.mycompany.com".to_string(),
local_ip: "10.0.1.100".to_string(),
port: 5060,
registrar_domain: "agents.mycompany.com".to_string(),
call_center_service: "support".to_string(),
bye_timeout_seconds: 15,
bye_retry_attempts: 3,
bye_race_delay_ms: 100,
..Default::default()
},
database: DatabaseConfig {
url: "sqlite:production_call_center.db".to_string(),
max_connections: 10,
connection_timeout_seconds: 30,
..Default::default()
},
..Default::default()
};
// Create and validate configuration
config.validate()?;
// Initialize call center engine
let engine = CallCenterEngine::new(config).await?;
println!("🏢 Production Call Center Server initializing...");
println!("📊 Features enabled:");
println!(" ✅ Agent SIP Registration");
println!(" ✅ Database-Backed Queuing");
println!(" ✅ Round-Robin Load Balancing");
println!(" ✅ B2BUA Call Bridging");
println!(" ✅ Real-Time Queue Monitoring");
// Start background monitoring tasks
let engine_clone = engine.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
if let Ok(stats) = engine_clone.get_stats().await {
println!("📈 Call Center Stats:");
println!(" 📞 Total Calls: {}", stats.total_calls);
println!(" 🔄 Active Calls: {}", stats.active_calls);
println!(" 👥 Available Agents: {}", stats.available_agents);
println!(" 📋 Queue Depth: {}", stats.total_queued);
println!(" ⏱️ Avg Wait Time: {:.1}s", stats.average_wait_time);
}
}
});
// Start the call center server
println!("🚀 Call Center Server running on {}:{}",
config.general.local_ip, config.general.port);
println!("📞 Ready to receive customer calls and agent registrations");
engine.run().await?;
Ok(())
}
use rvoip_client_core::prelude::*;
use std::time::Duration;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Create SIP client for call center agent
let config = ClientConfig {
sip_uri: "sip:alice@call-center.mycompany.com".to_string(),
server_uri: "sip:10.0.1.100:5060".to_string(),
local_port: 5071,
media: MediaConfig {
preferred_codecs: vec!["opus".to_string(), "G722".to_string(), "PCMU".to_string()],
echo_cancellation: true,
noise_suppression: true,
auto_gain_control: true,
dtmf_enabled: true,
max_bandwidth_kbps: Some(256),
preferred_ptime: Some(20),
..Default::default()
},
..Default::default()
};
let client = ClientManager::new(config).await?;
// Register with call center
println!("👤 Agent Alice registering with call center...");
client.register().await?;
println!("✅ Agent Alice registered and ready for calls");
// Set up call handling
let client_clone = client.clone();
tokio::spawn(async move {
let mut events = client_clone.subscribe_to_events().await;
while let Ok(event) = events.recv().await {
match event {
ClientEvent::IncomingCall { call_id, from, .. } => {
println!("📞 Incoming call from customer: {}", from);
// Accept call after brief delay (simulating agent response time)
let client_inner = client_clone.clone();
let call_id_inner = call_id.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(500)).await;
if let Err(e) = client_inner.answer_call(&call_id_inner).await {
eprintln!("❌ Failed to answer call: {}", e);
} else {
println!("✅ Call {} answered successfully", call_id_inner);
}
});
}
ClientEvent::CallStateChanged { call_id, new_state, .. } => {
match new_state {
CallState::Connected => {
println!("🔊 Call {} - Audio connected with customer", call_id);
}
CallState::Terminated => {
println!("📴 Call {} completed", call_id);
}
_ => println!("📱 Call {} state: {:?}", call_id, new_state),
}
}
ClientEvent::ErrorOccurred { error, .. } => {
eprintln!("❌ Agent error: {}", error);
}
_ => {}
}
}
});
// Keep agent running
println!("🎧 Agent Alice ready for calls - Press Ctrl+C to exit");
tokio::signal::ctrl_c().await?;
println!("👋 Agent Alice signing off");
Ok(())
}
# Run the comprehensive end-to-end test suite
cd examples/e2e_test
./run_e2e_test.sh
# What this tests:
# ✅ 2 agents register with call center (Alice & Bob)
# ✅ 5 customer calls placed simultaneously via SIPp
# ✅ Calls distributed fairly (typically 3/2 or 2/3 split)
# ✅ Full audio flow between customers and agents
# ✅ Proper call termination and cleanup
# ✅ Queue processing and agent status management
# Expected output:
# "📊 Test Results: Alice: 3 calls, Bob: 2 calls ✅"
# "🎯 All calls completed successfully with proper load balancing"
The call engine properly respects and propagates configured bind addresses:
// Configure specific IP addresses for your deployment
let config = CallCenterConfig {
general: GeneralConfig {
local_signaling_addr: "173.225.104.102:5060".parse()?, // Your server's public IP
local_media_addr: "173.225.104.102:20000".parse()?, // Same IP for media
..Default::default()
},
..Default::default()
};
// The engine ensures these addresses propagate to all layers
let engine = CallCenterEngine::new(config).await?;
Key points:
local_media_addr
portFor automatic media port allocation, the engine uses the port from local_media_addr
as the base and allocates a range from there (typically +1000 ports).
use rvoip_call_engine::config::*;
let config = CallCenterConfig {
general: GeneralConfig {
domain: "enterprise.call-center.com".to_string(),
local_ip: "192.168.1.100".to_string(),
port: 5060,
registrar_domain: "agents.enterprise.com".to_string(),
call_center_service: "premium-support".to_string(),
// Enhanced timeout configuration
bye_timeout_seconds: 20, // Longer timeout for enterprise
bye_retry_attempts: 5, // More retries for reliability
bye_race_delay_ms: 150, // Prevent race conditions
// Custom URI configuration
max_call_duration_minutes: 120, // 2-hour call limit
agent_heartbeat_interval: 30, // 30-second heartbeats
..Default::default()
},
database: DatabaseConfig {
url: "sqlite:enterprise_call_center.db".to_string(),
max_connections: 20, // Higher concurrency
connection_timeout_seconds: 45,
pool_idle_timeout_seconds: 300,
enable_wal_mode: true, // Better performance
enable_foreign_keys: true, // Data integrity
..Default::default()
},
// Queue configuration
queues: vec![
QueueConfig {
name: "vip".to_string(),
capacity: 50,
priority_base: 100, // Higher priority
overflow_queue: Some("premium".to_string()),
max_wait_time_seconds: 60, // VIP gets faster service
..Default::default()
},
QueueConfig {
name: "premium".to_string(),
capacity: 100,
priority_base: 75,
overflow_queue: Some("general".to_string()),
max_wait_time_seconds: 120,
..Default::default()
},
QueueConfig {
name: "general".to_string(),
capacity: 200,
priority_base: 50,
overflow_queue: None, // No overflow
max_wait_time_seconds: 300, // 5-minute max wait
..Default::default()
},
],
// Routing configuration
routing: RoutingConfig {
default_strategy: RoutingStrategy::RoundRobin,
enable_skills_routing: false, // Phase 2 feature
enable_overflow: true,
max_queue_depth: 500,
queue_monitor_interval_ms: 1000,
agent_assignment_timeout_seconds: 45,
..Default::default()
},
..Default::default()
};
let engine = CallCenterEngine::new(config).await?;
Call-engine provides a solid foundation for various call center applications:
related_session_id
Run the comprehensive test suite:
# Run end-to-end tests
cd examples/e2e_test
./run_e2e_test.sh
# Run unit tests
cargo test -p rvoip-call-engine
# Run integration tests
cargo test -p rvoip-call-engine --test integration_tests
# Run performance benchmarks
cargo test -p rvoip-call-engine --release -- --ignored benchmark
Test Coverage: Complete end-to-end validation
# Complete E2E test with load balancing verification
cd examples/e2e_test
./run_e2e_test.sh
# Manual server testing
cargo run --example call_center_server
# Agent client testing
cargo run --example agent_client alice 5071
cargo run --example agent_client bob 5072
# SIPp customer simulation
sipp -sf customer_calls.xml 127.0.0.1:5060 -m 5 -r 5 -rp 1000 -max_socket 100
pub struct CallCenterConfig {
pub general: GeneralConfig, // Server and network configuration
pub database: DatabaseConfig, // Database connection settings
pub queues: Vec<QueueConfig>, // Queue definitions
pub routing: RoutingConfig, // Routing algorithm settings
}
pub struct GeneralConfig {
pub domain: String, // SIP domain
pub local_ip: String, // Local IP for SIP URIs
pub port: u16, // SIP port (default: 5060)
pub registrar_domain: String, // Agent registration domain
pub call_center_service: String, // Service name for URIs
pub bye_timeout_seconds: u64, // BYE timeout (default: 15s)
pub bye_retry_attempts: u32, // BYE retry count (default: 3)
pub bye_race_delay_ms: u64, // Race condition prevention (default: 100ms)
}
pub struct DatabaseConfig {
pub url: String, // Database URL
pub max_connections: u32, // Connection pool size
pub connection_timeout_seconds: u64, // Connection timeout
pub enable_wal_mode: bool, // WAL mode for performance
pub enable_foreign_keys: bool, // Foreign key constraints
}
pub struct QueueConfig {
pub name: String, // Queue identifier
pub capacity: usize, // Maximum queue size
pub priority_base: i32, // Base priority value
pub overflow_queue: Option<String>, // Overflow destination
pub max_wait_time_seconds: u64, // Maximum wait time
}
The library provides comprehensive error handling with operational error recovery:
use rvoip_call_engine::{CallCenterError, CallCenterEngine};
match call_center_result {
Err(CallCenterError::DatabaseError(msg)) => {
log::error!("Database error: {}", msg);
// Implement database failover or retry logic
attempt_database_recovery().await?;
}
Err(CallCenterError::AgentNotFound(agent_id)) => {
log::warn!("Agent {} not found, may have disconnected", agent_id);
// Clean up agent state and re-queue calls
cleanup_agent_state(&agent_id).await?;
}
Err(CallCenterError::QueueFull(queue_name)) => {
log::warn!("Queue {} full, implementing overflow", queue_name);
// Route to overflow queue or callback system
handle_queue_overflow(&queue_name).await?;
}
Err(CallCenterError::SessionError(msg)) => {
log::error!("Session error: {}", msg);
// Implement session recovery
attempt_session_recovery().await?;
}
Ok(engine) => {
// Handle successful call center operation
start_monitoring_dashboard(&engine).await?;
}
}
Run the comprehensive test suite:
# Run all tests
cargo test -p rvoip-call-engine
# Run end-to-end tests
cd examples/e2e_test && ./run_e2e_test.sh
# Run specific test suites
cargo test -p rvoip-call-engine agent_management
cargo test -p rvoip-call-engine queue_operations
cargo test -p rvoip-call-engine routing_algorithms
# Run performance benchmarks
cargo test -p rvoip-call-engine --release -- --ignored benchmark
The library includes comprehensive examples demonstrating all features:
# Complete call center server
cargo run --example call_center_server
# Agent client applications
cargo run --example agent_client alice 5071
cargo run --example agent_client bob 5072
# Configuration examples
cargo run --example enterprise_config
cargo run --example multi_queue_config
# Testing and validation
cd examples/e2e_test
./run_comprehensive_tests.sh
Contributions are welcome! Please see the main rvoip contributing guidelines for details.
For call-engine specific contributions:
The modular architecture makes it easy to contribute:
orchestrator/core.rs
- Main engine coordinationorchestrator/calls.rs
- Call processing and B2BUA operationsorchestrator/routing.rs
- Routing algorithms and logicorchestrator/agents.rs
- Agent management and registrationdatabase/
- Database operations and schema managementqueue/
- Queue management and monitoringDevelopment Status: ✅ Production-Ready Call Center
Production Readiness: ✅ Ready for Small to Medium Call Centers
Current Capabilities: ✅ Production-Ready Core Features
Current Limitations: ⚠️ Enterprise Features Planned
Recent Major Fixes: 🔧 Critical Issues Resolved
Known Minor Issues: ⚠️ Non-Critical (Being Addressed)
Roadmap Progress: 📈 Phase 0 Complete, Phase 1 Ready
This project is licensed under either of
at your option.
Built with ❤️ for the Rust VoIP community - Production-ready call center development made simple