| Crates.io | lightstreamer-rs |
| lib.rs | lightstreamer-rs |
| version | 0.2.4 |
| created_at | 2025-05-16 16:43:15.420141+00 |
| updated_at | 2025-12-16 19:08:04.127316+00 |
| description | A Rust client for Lightstreamer, designed to facilitate real-time communication with Lightstreamer servers. |
| homepage | https://github.com/joaquinbejar/lightstreamer-rs |
| repository | https://github.com/joaquinbejar/lightstreamer-rs |
| max_upload_size | |
| id | 1676706 |
| size | 564,369 |
This project is a Rust implementation of the Lightstreamer TLCP (Text-based Live Connections Protocol). It provides a robust client SDK to interact with Lightstreamer servers, enabling real-time data streaming for financial applications, IoT systems, and other use cases requiring live data updates. While it was initially developed to support the ig_trading_api project, it has evolved into a more comprehensive SDK with broader applicability.
Lightstreamer is a high-performance real-time messaging server that provides several key features:
This Rust client SDK provides the following capabilities:
Connection Management:
Subscription Capabilities:
Configuration Options:
Event Handling:
The current implementation supports most core features of the Lightstreamer protocol, with a focus on the WebSocket transport mechanism and the MERGE subscription mode. While initially developed for specific trading API requirements, the library has expanded to include:
Some advanced features that may be implemented in future versions include:
To use this SDK in your Rust project, add the following dependency to your Cargo.toml:
[dependencies]
lightstreamer-rs = "0.1.4"
For most use cases, use the simplified SimpleClient API:
use lightstreamer_rs::client::{ClientConfig, SimpleClient, SubscriptionParams};
use lightstreamer_rs::subscription::SubscriptionMode;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. Create configuration
let config = ClientConfig::new("http://push.lightstreamer.com/lightstreamer")
.adapter_set("DEMO");
// 2. Create client
let client = SimpleClient::new(config)?;
// 3. Subscribe and get channel receiver
let params = SubscriptionParams::new(
SubscriptionMode::Merge,
vec!["item1".to_string(), "item2".to_string()],
vec!["last_price".to_string(), "time".to_string()],
).data_adapter("QUOTE_ADAPTER");
let mut receiver = client.subscribe(params).await?;
// 4. Process updates asynchronously
tokio::spawn(async move {
while let Some(update) = receiver.recv().await {
println!("Price: {:?}", update.get_value("last_price"));
}
});
// 5. Connect and run
client.connect().await?;
Ok(())
}
Here's a comprehensive example of how to use the Lightstreamer Rust Client SDK:
// This example shows how to use the Lightstreamer Rust client
use lightstreamer_rs::client::{LightstreamerClient, Transport};
use lightstreamer_rs::subscription::{Subscription, SubscriptionMode, SubscriptionListener, ItemUpdate};
use std::sync::Arc;
use tokio::sync::Notify;
use std::time::Duration;
// Define a custom subscription listener
struct MySubscriptionListener;
impl SubscriptionListener for MySubscriptionListener {
fn on_subscription(&self) {
info!("Subscription confirmed by the server");
}
fn on_item_update(&self, update: ItemUpdate) {
info!("Received update for item: {}", update.get_item_name());
for field in update.get_fields() {
if let Some(value) = update.get_value(field) {
info!(" {} = {}", field, value);
}
}
}
}
async fn example() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Create a new LightstreamerClient instance
let result = LightstreamerClient::new(
Some("ws://your-lightstreamer-server.com"), // Server address
Some("YOUR_ADAPTER_SET"), // Adapter set
None, // User (optional)
None, // Password (optional)
);
let mut client = match result {
Ok(client) => client,
Err(e) => return Err(e),
};
// Configure the connection details if needed
client.connection_details.set_user("YOUR_USERNAME");
client.connection_details.set_password("YOUR_PASSWORD");
// Configure connection options if needed
client.connection_options.set_content_length(50000000);
client.connection_options.set_keepalive_interval(5);
client.connection_options.set_forced_transport(Some(Transport::WsStreaming));
// Create a shutdown signal for graceful termination
let shutdown_signal = Arc::new(Notify::new());
// Connect to the Lightstreamer server
if let Err(e) = client.connect(shutdown_signal.clone()).await {
return Err(e);
}
// Create a subscription
let subscription_result = Subscription::new(
SubscriptionMode::Merge,
Some(vec!["item1".to_string(), "item2".to_string()]),
Some(vec!["field1".to_string(), "field2".to_string()]),
);
let subscription = match subscription_result {
Ok(sub) => sub,
Err(e) => return Err(e),
};
// Add a listener to the subscription (optional)
let listener = Box::new(MySubscriptionListener);
subscription.add_listener(listener);
// Get the subscription sender from the client
// Note: This method might not exist in the current API, check the documentation
let subscription_sender = client.get_subscriptions()[0].clone();
// Subscribe and get the subscription ID
let subscription_id_result = LightstreamerClient::subscribe_get_id(
subscription_sender.clone(),
subscription
).await;
let subscription_id = match subscription_id_result {
Ok(id) => id,
Err(e) => return Err(e),
};
info!("Subscribed with ID: {}", subscription_id);
// Wait for some time (in a real application, you would wait for a shutdown signal)
tokio::time::sleep(Duration::from_secs(5)).await;
// Unsubscribe before disconnecting
LightstreamerClient::unsubscribe(subscription_sender, subscription_id).await;
Ok(())
}
You can also add listeners to handle client events:
Here's an example of implementing a client listener:
// Note: ClientListener is a private trait, this is just for illustration
use lightstreamer_rs::client::model::ClientStatus;
struct MyClientListener;
// This is just an example of what the ClientListener trait might look like
// The actual implementation is internal to the library
trait ClientListener {
fn on_status_change(&self, status: &ClientStatus);
fn on_server_error(&self, error_code: i32, error_message: &str);
fn on_property_change(&self, property: &str);
}
impl ClientListener for MyClientListener {
fn on_status_change(&self, status: &ClientStatus) {
info!("Client status changed to: {:?}", status);
}
fn on_server_error(&self, error_code: i32, error_message: &str) {
info!("Server error: {} - {}", error_code, error_message);
}
fn on_property_change(&self, property: &str) {
info!("Property changed: {}", property);
}
}
// Then add the listener to your client
// client.add_listener(Box::new(MyClientListener));
For asynchronous processing of updates in separate tasks, you can use the ChannelSubscriptionListener:
use lightstreamer_rs::subscription::ChannelSubscriptionListener;
// Create a channel-based listener
let (listener, mut update_receiver) = ChannelSubscriptionListener::create_channel();
// Add the listener to your subscription
subscription.add_listener(Box::new(listener));
// Process updates asynchronously in a separate task
tokio::spawn(async move {
while let Some(update) = update_receiver.recv().await {
// Process the update
println!("Item: {:?}", update.get_item_name());
println!("Fields: {:?}", update.get_fields());
// Perform any async operations
// e.g., save to database, send to another service, etc.
}
});
This approach allows you to:
Decouple update reception from processing
Process updates asynchronously without blocking the Lightstreamer event loop
Easily integrate with other async workflows
Handle backpressure naturally through channel buffering
We welcome contributions to this project! If you would like to contribute, please follow these steps:
If you have any questions, issues, or would like to provide feedback, please feel free to contact the project maintainer:
Joaquín Béjar García
We appreciate your interest and look forward to your contributions!
Licensed under MIT license