Crates.io | rvoip-session-core |
lib.rs | rvoip-session-core |
version | 0.1.26 |
created_at | 2025-07-03 07:51:07.036847+00 |
updated_at | 2025-08-15 17:50:38.780507+00 |
description | Call session management for the rvoip stack |
homepage | https://github.com/eisenzopf/rvoip |
repository | https://github.com/eisenzopf/rvoip |
max_upload_size | |
id | 1735979 |
size | 2,163,351 |
The session-core
library provides high-level SIP session coordination and management capabilities for the rvoip VoIP stack. It orchestrates SIP dialog lifecycles, media session coordination, and call control operations while integrating seamlessly with dialog-core
(SIP protocol), media-core
(audio processing), and call-engine
(business logic).
dialog-core
and transaction-core
media-core
call-engine
rtp-core
sip-core
The Session Core sits at the coordination layer, providing high-level session management while delegating protocol details to specialized components:
┌─────────────────────────────────────────┐
│ Application Layer │
├─────────────────────────────────────────┤
│ rvoip-call-engine │
├─────────────────────────────────────────┤
│ rvoip-session-core ⬅️ YOU ARE HERE
├─────────────────────────────────────────┤
│ rvoip-dialog-core │ rvoip-media-core │
├─────────────────────────────────────────┤
│ rvoip-transaction │ rvoip-rtp-core │
│ -core │ │
├─────────────────────────────────────────┤
│ rvoip-sip-core │
├─────────────────────────────────────────┤
│ Network Layer │
└─────────────────────────────────────────┘
Clean separation of concerns across the rvoip stack:
┌─────────────────┐ Session Events ┌─────────────────┐
│ │ ──────────────────────► │ │
│ call-engine │ │ session-core │
│ (Business Logic)│ ◄──────────────────────── │ (Coordination) │
│ │ Call Control API │ │
└─────────────────┘ └─────────────────┘
│
SIP Dialogs │ Media Sessions
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ dialog-core │ │ media-core │
│ (SIP Protocol) │ │ (Audio Process) │
└─────────────────┘ └─────────────────┘
create_outgoing_call()
with automatic SDP generationwait_for_answer()
with configurable timeoutshold_session()
, resume_session()
, transfer_session()
operationssend_dtmf()
tone transmission and terminate_session()
cleanupaccept_incoming_call()
and reject_incoming_call()
generate_sdp_offer()
and generate_sdp_answer()
with real capabilitiesestablish_media_flow()
with actual RTP session coordinationget_media_statistics()
with real quality metrics (MOS, jitter, packet loss)start_statistics_monitoring()
for continuous quality trackingstart_audio_transmission()
, stop_audio_transmission()
)on_incoming_call()
→ CallDecision
flowCallDecision::Defer
for async processingbridge_sessions()
for connecting two active callsBasicSessionEvent
with all session lifecycle eventsBasicEventBus
for session-to-session communicationSessionManager::new(config)
- One-line manager creationset_call_handler(handler)
- Single method for all call eventsstart_server(addr)
- One-line server startup01_basic_infrastructure.rs
- Clean infrastructure setup without protocol exposure02_session_lifecycle.rs
- Complete session management patterns03_event_handling.rs
- Event bus integration and cross-session communication04_media_coordination.rs
- Media session coordination with quality monitoringuse rvoip_session_core::api::*;
#[tokio::main]
async fn main() -> Result<()> {
let session_manager = SessionManager::new(SessionConfig::server("127.0.0.1:5060")?).await?;
session_manager.set_call_handler(Arc::new(AutoAnswerHandler)).await?;
session_manager.start_server("127.0.0.1:5060".parse()?).await?;
println!("🚀 SIP server running - auto-answering all calls");
tokio::signal::ctrl_c().await?;
Ok(())
}
use rvoip_session_core::api::*;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<()> {
// Production-grade session coordinator
let coordinator = SessionManagerBuilder::new()
.with_sip_port(5060)
.with_local_address("sip:callcenter@pbx.company.com:5060")
.with_rtp_port_range(10000, 20000)
.with_max_sessions(1000)
.with_session_timeout(Duration::from_secs(3600))
.with_handler(Arc::new(CallCenterHandler::new()))
.build()
.await?;
// Start accepting calls
SessionControl::start(&coordinator).await?;
// Monitor system health
tokio::spawn(monitor_system_health(coordinator.clone()));
// Wait for shutdown signal
tokio::signal::ctrl_c().await?;
SessionControl::stop(&coordinator).await?;
Ok(())
}
#[derive(Debug)]
struct CallCenterHandler {
queue: Arc<Mutex<VecDeque<IncomingCall>>>,
}
#[async_trait]
impl CallHandler for CallCenterHandler {
async fn on_incoming_call(&self, call: IncomingCall) -> CallDecision {
// Route based on called number
match call.to.as_str() {
"sip:support@pbx.company.com" => {
self.queue.lock().unwrap().push_back(call);
CallDecision::Defer // Queue for agent assignment
}
"sip:sales@pbx.company.com" => {
CallDecision::Forward("sip:sales-queue@internal".to_string())
}
_ => CallDecision::Accept(None) // Auto-answer for other calls
}
}
async fn on_call_established(&self, call: CallSession, _local_sdp: Option<String>, _remote_sdp: Option<String>) {
println!("📞 Call {} established with real media", call.id());
// Start quality monitoring for all calls
let coordinator = call.coordinator();
MediaControl::start_statistics_monitoring(
&coordinator,
call.id(),
Duration::from_secs(5)
).await.ok();
}
async fn on_call_ended(&self, call: CallSession, reason: &str) {
println!("📞 Call {} ended: {} (Duration: {:?})",
call.id(), reason, call.duration());
}
}
Session-core respects configured bind addresses and propagates them through all layers:
// Configure specific IP addresses for production deployment
let coordinator = SessionManagerBuilder::new()
.with_sip_port(5060)
.with_local_bind_addr("173.225.104.102:5060".parse()?) // Your server's IP
.with_media_ports(10000, 20000)
.build()
.await?;
Key points:
The library supports automatic port allocation when you use port 0:
// Port 0 signals automatic allocation from the configured range
let config = SessionManagerConfig {
local_bind_addr: "192.168.1.100:0".parse()?, // Port 0 = auto
media_port_start: 10000,
media_port_end: 20000,
..Default::default()
};
How it works:
media_port_start
to media_port_end
// Example 1: Production server with specific IP
let coordinator = SessionManagerBuilder::new()
.with_sip_port(5060)
.with_local_bind_addr("203.0.113.10:5060".parse()?)
.with_media_ports(30000, 40000) // Custom RTP range
.build()
.await?;
// Example 2: Development with automatic ports
let coordinator = SessionManagerBuilder::new()
.with_sip_port(0) // Let OS assign SIP port
.with_local_bind_addr("127.0.0.1:0".parse()?)
.with_media_ports(10000, 11000)
.build()
.await?;
// Example 3: Docker/container with all interfaces
let coordinator = SessionManagerBuilder::new()
.with_sip_port(5060)
.with_local_bind_addr("0.0.0.0:5060".parse()?) // Bind all interfaces
.with_media_ports(10000, 20000)
.build()
.await?;
use rvoip_session_core::api::*;
async fn handle_call_with_quality_monitoring(
coordinator: Arc<SessionCoordinator>,
session_id: SessionId
) -> Result<()> {
// Get real media information
let media_info = MediaControl::get_media_info(&coordinator, &session_id).await?;
if let Some(info) = media_info {
println!("📊 Media Session Info:");
println!(" Local RTP: {}:{}", info.local_rtp_address, info.local_rtp_port);
println!(" Remote RTP: {}:{}", info.remote_rtp_address, info.remote_rtp_port);
println!(" Codec: {:?}", info.codec);
}
// Start comprehensive quality monitoring
MediaControl::start_statistics_monitoring(
&coordinator,
&session_id,
Duration::from_secs(2)
).await?;
// Monitor call quality in real-time
let mut poor_quality_strikes = 0;
let mut quality_history = Vec::new();
while let Some(session) = SessionControl::get_session(&coordinator, &session_id).await? {
if session.state().is_final() {
break;
}
// Get comprehensive media statistics
if let Some(stats) = MediaControl::get_media_statistics(&coordinator, &session_id).await? {
if let Some(quality) = stats.quality_metrics {
let mos = quality.mos_score.unwrap_or(0.0);
let packet_loss = quality.packet_loss_percent;
let jitter = quality.jitter_ms;
let rtt = quality.round_trip_time_ms;
println!("📊 Real-Time Quality Metrics:");
println!(" MOS Score: {:.1} ({})", mos, quality_rating(mos));
println!(" Packet Loss: {:.2}%", packet_loss);
println!(" Jitter: {:.1}ms", jitter);
println!(" RTT: {:.0}ms", rtt);
quality_history.push(mos);
// Adaptive quality management
if mos < 3.0 || packet_loss > 5.0 {
poor_quality_strikes += 1;
println!("⚠️ Poor quality detected (strike {}/3)", poor_quality_strikes);
if poor_quality_strikes >= 3 {
println!("🚨 Sustained poor quality - taking action!");
// Could trigger codec switching, bandwidth adaptation, etc.
// For now, we'll just alert
notify_operations_team(&session_id, mos, packet_loss).await?;
poor_quality_strikes = 0; // Reset after action
}
} else {
poor_quality_strikes = 0; // Reset on good quality
}
}
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
// Final quality report
if !quality_history.is_empty() {
let avg_mos = quality_history.iter().sum::<f64>() / quality_history.len() as f64;
let min_mos = quality_history.iter().fold(f64::INFINITY, |a, &b| a.min(b));
let max_mos = quality_history.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
println!("📊 Final Call Quality Report:");
println!(" Average MOS: {:.1}", avg_mos);
println!(" Min MOS: {:.1}", min_mos);
println!(" Max MOS: {:.1}", max_mos);
println!(" Quality Samples: {}", quality_history.len());
}
Ok(())
}
fn quality_rating(mos: f64) -> &'static str {
match mos {
x if x >= 4.0 => "Excellent",
x if x >= 3.5 => "Good",
x if x >= 3.0 => "Fair",
x if x >= 2.5 => "Poor",
_ => "Bad"
}
}
use rvoip_session_core::api::*;
#[derive(Debug)]
struct DatabaseDrivenHandler {
pending_calls: Arc<Mutex<VecDeque<IncomingCall>>>,
db_pool: Arc<DatabasePool>,
}
#[async_trait]
impl CallHandler for DatabaseDrivenHandler {
async fn on_incoming_call(&self, call: IncomingCall) -> CallDecision {
println!("📞 Incoming call from {} - queuing for database lookup", call.from);
// Queue for async processing
self.pending_calls.lock().unwrap().push_back(call);
CallDecision::Defer
}
async fn on_call_ended(&self, call: CallSession, reason: &str) {
// Update call records in database
if let Err(e) = self.update_call_record(&call, reason).await {
eprintln!("Failed to update call record: {}", e);
}
}
}
impl DatabaseDrivenHandler {
async fn process_pending_calls(&self, coordinator: Arc<SessionCoordinator>) -> Result<()> {
loop {
// Process queued calls
let calls: Vec<IncomingCall> = {
let mut pending = self.pending_calls.lock().unwrap();
pending.drain(..).collect()
};
for call in calls {
match self.lookup_caller_authorization(&call.from).await {
Ok(caller_info) => {
if caller_info.is_authorized {
// Generate SDP answer using real media capabilities
let sdp_answer = if let Some(offer) = &call.sdp {
Some(MediaControl::generate_sdp_answer(
&coordinator,
&call.id,
offer
).await?)
} else {
None
};
// Accept the call
SessionControl::accept_incoming_call(
&coordinator,
&call,
sdp_answer
).await?;
println!("✅ Accepted call from authorized user: {} (Account: {})",
call.from, caller_info.account_id);
} else {
// Reject unauthorized callers
SessionControl::reject_incoming_call(
&coordinator,
&call,
"Account not authorized for calling"
).await?;
println!("❌ Rejected unauthorized call from: {}", call.from);
}
}
Err(e) => {
eprintln!("Database lookup failed for {}: {}", call.from, e);
// Reject on database error (fail closed)
SessionControl::reject_incoming_call(
&coordinator,
&call,
"Service temporarily unavailable"
).await?;
}
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
async fn lookup_caller_authorization(&self, caller: &str) -> Result<CallerInfo> {
sqlx::query_as!(
CallerInfo,
"SELECT account_id, is_authorized, max_concurrent_calls FROM callers WHERE sip_uri = $1",
caller
)
.fetch_one(self.db_pool.as_ref())
.await
.map_err(|e| anyhow::anyhow!("Database query failed: {}", e))
}
}
use rvoip_session_core::api::*;
async fn setup_conference_bridge(coordinator: Arc<SessionCoordinator>) -> Result<()> {
println!("🎥 Setting up conference bridge...");
// Create calls to participants
let participants = vec![
"sip:alice@company.com",
"sip:bob@company.com",
"sip:charlie@partner.com"
];
let mut sessions = Vec::new();
// Initiate calls to all participants
for participant in &participants {
let session = SessionControl::create_outgoing_call(
&coordinator,
"sip:conference@pbx.company.com",
participant,
None
).await?;
println!("📞 Calling {}...", participant);
sessions.push(session);
}
// Wait for all participants to answer
let mut answered_sessions = Vec::new();
for session in sessions {
match SessionControl::wait_for_answer(
&coordinator,
&session.id,
Duration::from_secs(30)
).await {
Ok(_) => {
println!("✅ {} answered", participants[answered_sessions.len()]);
answered_sessions.push(session);
}
Err(e) => {
println!("❌ {} didn't answer: {}", participants[answered_sessions.len()], e);
SessionControl::terminate_session(&coordinator, &session.id).await?;
}
}
}
if answered_sessions.len() < 2 {
println!("❌ Not enough participants for conference");
return Ok(());
}
// Create conference bridge (for simplicity, using pairwise bridges)
// In a real implementation, you'd use media-core's N-way mixing
let mut bridges = Vec::new();
for i in 0..answered_sessions.len() {
for j in (i + 1)..answered_sessions.len() {
let bridge_id = coordinator.bridge_sessions(
&answered_sessions[i].id,
&answered_sessions[j].id
).await?;
bridges.push(bridge_id);
println!("🌉 Bridged participants {} and {}", i + 1, j + 1);
}
}
// Monitor conference events
let events = coordinator.subscribe_to_bridge_events().await;
tokio::spawn(async move {
let mut events = events;
while let Some(event) = events.recv().await {
match event {
BridgeEvent::ParticipantAdded { bridge_id, session_id } => {
println!("🎤 Participant {} joined bridge {}", session_id, bridge_id);
}
BridgeEvent::ParticipantRemoved { bridge_id, session_id, reason } => {
println!("🔇 Participant {} left bridge {}: {}", session_id, bridge_id, reason);
}
BridgeEvent::BridgeDestroyed { bridge_id } => {
println!("🏁 Bridge {} ended", bridge_id);
}
}
}
});
println!("🎥 Conference bridge active with {} participants", answered_sessions.len());
Ok(())
}
The library provides sophisticated session coordination while maintaining simplicity:
Complete state machine with validation:
Session State Flow:
Initializing → Dialing → Ringing → Connected → OnHold → Transferring → Terminating → Terminated
↓ ↓ ↓ ↓ ↓ ↓ ↓
Cancelled Cancelled Rejected Terminated Resumed Completed Terminated
Run the comprehensive test suite:
# Run all tests
cargo test -p rvoip-session-core
# Run integration tests
cargo test -p rvoip-session-core --test '*'
# Run specific test suites
cargo test -p rvoip-session-core session_lifecycle
cargo test -p rvoip-session-core media_integration
cargo test -p rvoip-session-core state_transitions
# Run performance benchmarks
cargo test -p rvoip-session-core --release -- --ignored benchmark
The library includes comprehensive examples demonstrating all features:
# Basic infrastructure setup
cargo run --example 01_basic_infrastructure
# Session lifecycle management
cargo run --example 02_session_lifecycle
# Event handling patterns
cargo run --example 03_event_handling
# Media coordination
cargo run --example 04_media_coordination
# Complete call center example
RUST_LOG=info cargo run --example call_center_demo
# Performance validation
cargo run --release --example performance_validation
The library provides comprehensive error handling with categorized error types:
use rvoip_session_core::errors::SessionError;
match session_result {
Err(SessionError::InvalidUri(uri)) => {
log::error!("Invalid SIP URI: {}", uri);
display_user_error("Please check the phone number format");
}
Err(SessionError::ResourceExhausted) => {
log::warn!("System at capacity");
attempt_load_balancing().await?;
}
Err(SessionError::SessionNotFound(session_id)) => {
log::info!("Session {} not found, may have terminated", session_id);
update_ui_call_ended().await;
}
Ok(session) => {
// Handle successful session creation
start_quality_monitoring(&session).await?;
}
}
Contributions are welcome! Please see the main rvoip contributing guidelines for details.
For session-core specific contributions:
This project is licensed under either of
at your option.