| Crates.io | asterisk-manager |
| lib.rs | asterisk-manager |
| version | 2.1.0 |
| created_at | 2025-01-22 04:31:05.691405+00 |
| updated_at | 2025-09-06 23:46:52.94136+00 |
| description | An asynchronous Rust library for interacting with the Asterisk Manager Interface (AMI), featuring a strongly-typed, stream-based API with resilient connections, automatic reconnection, and heartbeat monitoring. |
| homepage | |
| repository | https://github.com/gabriellramos/rust-asterisk-manager |
| max_upload_size | |
| id | 1526326 |
| size | 173,696 |
A modern, asynchronous, strongly-typed, and stream-based library for interacting with the Asterisk Manager Interface (AMI) in Rust.
This crate simplifies communication with AMI by handling connection, authentication, sending actions, and consuming events in an idiomatic Rust way, using Tokio and a type system that helps prevent errors at compile time. For production applications, it includes an optional resilient module with heartbeat monitoring, automatic reconnection, and fault-tolerant event streaming.
enums and structs, improving type safety and enabling editor autocompletion.send_action method automatically detects actions that return lists of events (e.g., PJSIPShowEndpoints). It transparently collects all related events and bundles them into a single, aggregated AmiResponse.Stream abstraction from tokio_stream.docs feature flag to enable utoipa::ToSchema implementations on all public types, allowing for automatic and accurate API documentation generation.AmiError enum allows robust handling of different failure scenarios.Add asterisk-manager to your Cargo.toml.
[dependencies]
asterisk-manager = "2.1.0" # Or the latest version
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
To enable automatic API documentation support for frameworks like Actix-web or Axum, enable the docs feature:
[dependencies]
asterisk-manager = { version = "2.1.0", features = ["docs"] }
This example connects to AMI, sends a simple action (Ping), and then sends a list-based action (PJSIPShowEndpoints), demonstrating how send_action handles both transparently.
use asterisk_manager::{Manager, ManagerOptions, AmiAction};
use tokio_stream::StreamExt;
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let options = ManagerOptions {
port: 5038,
host: "127.0.0.1".to_string(),
username: "admin".to_string(),
password: "password".to_string(),
events: true,
};
let mut manager = Manager::new();
manager.connect_and_login(options).await?;
println!("Successfully connected to AMI!");
// Send a simple action
println!("\n--- Sending a simple action (Ping) ---");
let ping_action = AmiAction::Ping { action_id: None };
let ping_response = manager.send_action(ping_action).await?;
println!("Ping Response: {:?}", ping_response);
// Send a list-based action
println!("\n--- Sending a list-based action (PJSIPShowEndpoints) ---");
let list_action = AmiAction::Custom {
action: "PJSIPShowEndpoints".to_string(),
params: HashMap::new(),
action_id: None
};
let list_response = manager.send_action(list_action).await?;
if let Some(events) = list_response.fields.get("CollectedEvents") {
println!("Collected {} events.", events.as_array().map_or(0, |a| a.len()));
// println!("Collected Events JSON: {}", events);
}
manager.disconnect().await?;
println!("\nDisconnected.");
Ok(())
}
ManagerThe Manager struct is the main entry point. You first create an empty Manager with Manager::new() and then establish a connection with manager.connect_and_login(options).await. This method starts the internal, non-blocking I/O tasks. Manager is Clone, Send, and Sync, allowing it to be safely shared between multiple tasks.
Use the manager.send_action() method. It intelligently handles different types of responses:
Ping, it returns the direct response immediately.PJSIPShowEndpoints), it automatically collects all related events, bundles them into a CollectedEvents field in the final AmiResponse, and returns after the list is complete.The library uses a broadcast channel to fan-out events. To consume them, obtain a stream with manager.all_events_stream().await. This allows multiple parts of your application to independently listen to the same AMI events.
let mut stream = manager.all_events_stream().await;
while let Some(Ok(event)) = stream.next().await {
println!("Event received: {:?}", event);
}
This library provides optional support for utoipa to automatically generate OpenAPI (Swagger) schemas for your web application.
This feature is available starting from version 2.0.0.
To enable it, add the docs feature in your Cargo.toml:
asterisk-manager = { version = "2.1.0", features = ["docs"] }
With this feature enabled, all public types like AmiResponse and AmiEvent will derive utoipa::ToSchema. You can then reference them directly in your Actix-web or Axum controller documentation.
// In your application's controller
use asterisk_manager::AmiResponse;
use utoipa::OpenApi;
#[derive(OpenApi)]
#[openapi(
//...
components(
schemas(AmiResponse) // This now works directly!
)
)]
pub struct ApiDoc;
For production applications that require high availability and fault tolerance, the library provides a resilient module with automatic reconnection, heartbeat monitoring, and infinite event streams.
Configuration notes:
heartbeat_interval (seconds): controls how often a heartbeat Ping is sent when using the resilient module. Lower values mean faster failure detection but more AMI traffic.max_retries: number of immediate reconnection attempts before adding 5-second delays in cycles. The logic works in cycles: first max_retries attempts are immediate, then one attempt with a 5-second delay, then the cycle repeats. Tune this depending on how aggressive you want reconnection attempts to be.metrics: optional ResilientMetrics instance for collecting reconnection statistics (attempts, successes, failures, timing). Set to Some(ResilientMetrics::new()) to enable metrics collection.use asterisk_manager::resilient::{ResilientOptions, connect_resilient, infinite_events_stream};
use asterisk_manager::ManagerOptions;
use tokio_stream::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let resilient_options = ResilientOptions {
manager_options: ManagerOptions {
port: 5038,
host: "127.0.0.1".to_string(),
username: "admin".to_string(),
password: "password".to_string(),
events: true,
},
buffer_size: 2048,
enable_heartbeat: true,
enable_watchdog: true,
heartbeat_interval: 30,
watchdog_interval: 1,
max_retries: 3,
};
// Option 1: Connect with resilient features
let manager = connect_resilient(resilient_options.clone()).await?;
// Option 2: Create an infinite event stream that handles reconnection
let mut event_stream = infinite_events_stream(resilient_options).await?;
tokio::pin!(event_stream);
while let Some(event_result) = event_stream.next().await {
match event_result {
Ok(event) => println!("Received event: {:?}", event),
Err(e) => println!("Error receiving event: {:?}", e),
}
}
Ok(())
}
The core library does not include built-in automatic reconnection logic by design. The philosophy is to provide robust building blocks so you can implement the reconnection strategy that best fits your application (e.g., fixed delays, exponential backoff, fixed number of attempts, etc.).
When the connection is lost, methods like send_action will return an AmiError::ConnectionClosed, and the event stream will end. Your application should handle these signals by creating a new Manager instance and calling connect_and_login again.
For applications that need automatic reconnection without manual implementation, use the resilient module described in the previous section. This provides production-ready reconnection logic with heartbeat monitoring and automatic retry mechanisms.
For complete examples of both approaches, see:
examples/actix_web_example.rs - Manual reconnection handlingexamples/actix_web_resilient_example.rs - Automatic reconnectionContributions are very welcome! If you find a bug, have a suggestion for improvement, or want to add support for more actions and events, feel free to open an Issue or a Pull Request.
This project is licensed under the MIT License.
This work was inspired by the simplicity and effectiveness of the following libraries: