Crates.io | rvoip-transaction-core |
lib.rs | rvoip-transaction-core |
version | 0.1.26 |
created_at | 2025-07-03 05:47:10.717899+00 |
updated_at | 2025-08-15 17:49:20.454071+00 |
description | SIP transaction layer for the rvoip stack |
homepage | https://github.com/eisenzopf/rvoip |
repository | https://github.com/eisenzopf/rvoip |
max_upload_size | |
id | 1735846 |
size | 1,433,153 |
The transaction-core
library implements the SIP transaction layer as defined in RFC 3261. It provides reliable message delivery and proper state management for SIP request-response exchanges, even over unreliable transports like UDP.
This library follows a message-passing architecture using Tokio's async/await model:
┌───────────────────┐ ┌───────────────────┐
│ Transaction User │ │ SIP Transport │
│ (Application) │◄────┤ (Network Layer) │
└─────────┬─────────┘ └─────────▲─────────┘
│ │
▼ │
┌────────────────────────────────────────────┐
│ │
│ TransactionManager │
│ │
└────────────┬───────────────────┬───────────┘
│ │
┌────────────▼────────┐ ┌────────▼────────────┐
│ Client Transactions │ │ Server Transactions │
└─────────────────────┘ └─────────────────────┘
One of the key architectural principles in SIP is the separation between transaction and dialog layers:
Transaction Layer (this library): Handles individual request-response exchanges with defined lifecycles. Responsible for message reliability, retransmissions, and state tracking for a single exchange.
Dialog Layer (implemented separately): Maintains long-lived application state across multiple transactions. Manages the relationship between endpoints using Call-ID, tags, and sequence numbers.
This separation allows the transaction layer to focus on protocol-level reliability while letting the dialog layer handle the application logic and session state.
In the RVOIP stack, the transaction-core
library provides the foundation for the session-core
library, which implements dialog and session management:
┌─────────────────────────┐
│ Application Layer │
│ (SIP Client/Server) │
└───────────┬─────────────┘
│
┌───────────▼─────────────┐
│ Session Core │
│ (Dialog & Call Sessions)│
└───────────┬─────────────┘
│
┌───────────▼─────────────┐
│ Transaction Core │
│ (Message Reliability) │
└───────────┬─────────────┘
│
┌───────────▼─────────────┐
│ SIP Transport Layer │
│ (UDP, TCP, TLS, etc.) │
└─────────────────────────┘
The relationship between these libraries follows these principles:
This layered approach allows each library to focus on its specific responsibilities while providing a clean API for higher-level application logic.
The library code is organized into several modules:
TransactionManager
implementation, the main entry point for applicationsThe transaction system manages state through a combination of state machines, command channels, and event broadcasting:
Transactions are identified by a TransactionKey
containing:
The TransactionManager
stores transactions in two separate HashMaps:
client_transactions
: For client-side transactionsserver_transactions
: For server-side transactionsAccording to RFC 3261 sections 17.1.3 and 17.2.3, transactions are matched as follows:
The TransactionManager
implements these rules to properly route incoming messages.
Each transaction implements a state machine according to RFC 3261:
AtomicTransactionState
objects for thread-safe accessThe system uses an actor-like pattern where each transaction runs in its own task:
Command Flow:
TransactionManager
TransactionManager
sends commands to transactions via command channelsEvent Broadcasting:
TransactionEvent
objectssubscribe()
Transaction Runner:
The library handles RFC 3261 timers automatically:
The system provides specialized handling for certain SIP methods:
CANCEL Handling:
method/cancel.rs
validate and create CANCEL requestsACK Handling:
create_ack_for_2xx
method helps TUs generate correct ACK requestsUPDATE Support:
The library includes an extensive test suite covering various aspects of RFC 3261 compliance:
All tests run serially with comprehensive logging to aid in debugging and understanding the protocol flow.
Timer A Timer B
(Resend) (Timeout)
| |
V V
Initial ----> Calling ---------> Terminated
| \
1xx | \ 2xx
| \
V \
Proceeding --┘
|
3xx-6xx|
V
Completed ---------> Terminated
Timer D
Timer E Timer F
(Resend) (Timeout)
| |
V V
Initial ----> Trying ----------> Terminated
| \
1xx | \ 2xx-6xx
| \
V \
Proceeding \
| |
2xx-6xx| |
V V
Completed ---------> Terminated
Timer K
INVITE
|
V
Proceeding --------> Terminated
/ \ (2xx sent)
/ \
2xx / \ 3xx-6xx
/ \
V V
Terminated Completed
/ | \
/ | \
ACK / TimerG \ Timer H
/ (Resend) \(Timeout)
V V
Confirmed ----------> Terminated
\
\ Timer I
\
V
Terminated
Request
|
V
Trying
/ \
1xx / \ 2xx-6xx
/ \
V V
Proceeding Completed
| / |
2xx-6xx| / |
| / |
V V V
Completed Terminated
| ^
Timer J| |
| /
V /
Terminated ----/
// Create a transport implementation
let transport = Arc::new(YourTransportImplementation::new());
// Create channels for transport events
let (transport_tx, transport_rx) = mpsc::channel(100);
// Initialize the transaction manager
let (manager, events_rx) = TransactionManager::new(
transport,
transport_rx,
Some(100) // Event channel capacity
).await.unwrap();
// Create a SIP request
let request = RequestBuilder::new(Method::Register, "sip:example.com")?
.from("Alice", "sip:alice@atlanta.com", Some("tag123"))
.to("Bob", "sip:bob@biloxi.com", None)
.call_id("callid123@atlanta.com")
.cseq(1)
.build();
// Create and initiate a client transaction
let tx_id = manager.create_client_transaction(request, remote_addr).await?;
manager.send_request(&tx_id).await?;
// Process events from the events_rx channel
while let Some(event) = events_rx.recv().await {
match event {
TransactionEvent::ProvisionalResponse { transaction_id, response, .. } => {
println!("Received 1xx response: {}", response.status_code());
},
TransactionEvent::SuccessResponse { transaction_id, response, .. } => {
println!("Received 2xx response: {}", response.status_code());
// For INVITE, handle ACK for 2xx responses at the TU level
if is_invite {
let ack_request = manager.create_ack_for_2xx(&transaction_id, &response).await?;
// Send the ACK (typically via a new transport method or direct send)
}
},
TransactionEvent::FailureResponse { transaction_id, response, .. } => {
println!("Received 3xx-6xx response: {}", response.status_code());
},
TransactionEvent::TransactionTerminated { transaction_id, .. } => {
println!("Transaction terminated: {}", transaction_id);
},
// Handle other events
_ => {}
}
}
// Handle incoming requests via transport_tx events
transport_tx.send(TransportEvent::MessageReceived {
message: Message::Request(request),
source: remote_addr,
destination: local_addr,
}).await?;
// Process events to get the transaction ID of the new server transaction
let tx_id = match events_rx.recv().await.unwrap() {
TransactionEvent::NewRequest { transaction_id, request, source, .. } => {
println!("New request: {}", request.method());
// Create a server transaction
let server_tx = manager.create_server_transaction(
request.clone(),
source
).await.expect("Failed to create server transaction");
server_tx.id().clone()
},
_ => panic!("Expected NewRequest event"),
};
// Send a response through the transaction
let response = ResponseBuilder::new(StatusCode::Ok, Some("OK"))?
// Add headers...
.build();
manager.send_response(&tx_id, response).await?;
// To cancel an ongoing INVITE transaction:
let cancel_tx_id = manager.cancel_invite_transaction(&invite_tx_id).await?;
// The cancel_invite_transaction method handles:
// 1. Creating the correct CANCEL request
// 2. Creating a new transaction for the CANCEL
// 3. Sending the CANCEL request
// 4. Managing the relationship between the INVITE and CANCEL transactions
// When the server transaction has sent a non-2xx response to an INVITE,
// the client will send an ACK directly to the server transaction.
// This ACK needs to be processed by the server transaction:
// Assuming you have the server transaction ID and the ACK request:
let server_invite_tx_id = /* ID of the INVITE server transaction */;
let ack_request = /* The ACK request received from the client */;
// Process the ACK in the server transaction
manager.process_request(&server_invite_tx_id, ack_request).await?;
// This will cause the server INVITE transaction to transition to Confirmed state,
// and eventually to Terminated after Timer I expires.
The library provides specific error types for different failure scenarios:
Error::TransactionNotFound
: When a transaction ID doesn't existError::InvalidStateTransition
: For illegal state transitionsError::TransactionTimeout
: When a transaction times outError::TransportError
: For network-related failuresTransaction timers can be configured through the TimerSettings
struct:
let custom_timers = TimerSettings {
t1: Duration::from_millis(500), // Base timer value
t2: Duration::from_secs(4), // Max retransmit interval
transaction_timeout: Duration::from_secs(32), // Client transaction timeout
wait_time_d: Duration::from_secs(32), // Timer D duration
wait_time_h: Duration::from_secs(32), // Timer H duration
wait_time_i: Duration::from_secs(5), // Timer I duration
wait_time_j: Duration::from_secs(32), // Timer J duration
wait_time_k: Duration::from_secs(5), // Timer K duration
};
let (manager, events_rx) = TransactionManager::new_with_config(
transport,
transport_rx,
Some(100), // Event channel capacity
Some(custom_timers), // Custom timer settings
).await?;
events_rx
channelTransactionTerminated
events to avoid resource leakscleanup_terminated_transactions
periodically for long-running applicationssubscribe()
method when implementing multiple consumerstransaction_state()
before critical operationsrvoip-sip-core
: Provides SIP message types, parsing, and protocol definitionsrvoip-sip-transport
: Transport layer for message delivery and network I/Otokio
: Async runtime for concurrent transaction processingasync-trait
: Async trait support for transport abstractionsuuid
: Transaction ID generation and uniquenessrand
: Random number generation for timer variationstracing
: Structured logging and debugging support┌─────────────────────────────────────────┐
│ Application Layer │
├─────────────────────────────────────────┤
│ rvoip-session-core │
├─────────────────────────────────────────┤
│ rvoip-transaction-core ⬅️ YOU ARE HERE
├─────────────────────────────────────────┤
│ rvoip-sip-transport │
├─────────────────────────────────────────┤
│ Network Layer │
└─────────────────────────────────────────┘
The transaction layer serves as the reliability layer in the SIP stack, providing:
Run the comprehensive test suite:
# Run all tests
cargo test -p rvoip-transaction-core
# Run with detailed logging
RUST_LOG=debug cargo test -p rvoip-transaction-core
# Run specific test categories
cargo test -p rvoip-transaction-core --test integration_tests
cargo test -p rvoip-transaction-core client_transaction
cargo test -p rvoip-transaction-core server_transaction
cargo test -p rvoip-transaction-core cancel_transaction
# Run tests serially (recommended for debugging)
cargo test -p rvoip-transaction-core -- --test-threads=1
The library includes extensive test coverage for:
All tests are designed to run deterministically with comprehensive logging for debugging.
See TODO.md for a comprehensive roadmap including:
Contributions are welcome! Please see the main rvoip contributing guidelines for details on:
For transaction-core specific contributions:
This project is licensed under either of
at your option.
Transaction Management
INVITE Transaction Support
Non-INVITE Transaction Support
Special Method Handling
Timer Management
Error Handling & Reliability
Event System
Performance Optimizations
Advanced Reliability
Monitoring & Diagnostics
Enhanced Integration