| Crates.io | synap-sdk |
| lib.rs | synap-sdk |
| version | 0.1.0 |
| created_at | 2025-10-23 23:14:39.240026+00 |
| updated_at | 2025-10-23 23:14:39.240026+00 |
| description | Rust SDK for Synap - High-Performance In-Memory Key-Value Store & Message Broker |
| homepage | |
| repository | https://github.com/hivellm/synap |
| max_upload_size | |
| id | 1897776 |
| size | 301,952 |
Official Rust client library for Synap - High-Performance In-Memory Key-Value Store & Message Broker.
futures::Stream for event-driven consumptionAdd this to your Cargo.toml:
[dependencies]
synap-sdk = "0.1"
tokio = { version = "1", features = ["full"] }
use synap_sdk::{SynapClient, SynapConfig};
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create client
let config = SynapConfig::new("http://localhost:15500");
let client = SynapClient::new(config)?;
// Key-Value operations
client.kv().set("user:1", "John Doe", None).await?;
let value: Option<String> = client.kv().get("user:1").await?;
println!("Value: {:?}", value);
// Queue operations
client.queue().create_queue("tasks", None, None).await?;
let msg_id = client.queue().publish("tasks", b"process-video", Some(9), None).await?;
let message = client.queue().consume("tasks", "worker-1").await?;
if let Some(msg) = message {
println!("Received: {:?}", msg);
client.queue().ack("tasks", &msg.id).await?;
}
// Event Stream (reactive by default)
client.stream().create_room("chat-room-1", None).await?;
client.stream().publish(
"chat-room-1",
"message",
json!({"user": "alice", "text": "Hello!"})
).await?;
// Reactive event consumption
use futures::StreamExt;
let (mut events, handle) = client.stream()
.observe_events("chat-room-1", Some(0), Duration::from_millis(500));
while let Some(event) = events.next().await {
println!("Event {}: {:?}", event.offset, event.data);
if event.offset > 10 { break; }
}
handle.unsubscribe();
// Pub/Sub (reactive by default)
let count = client.pubsub().publish(
"notifications.email",
json!({"to": "user@example.com", "subject": "Welcome"}),
None,
None
).await?;
println!("Delivered to {} subscribers", count);
Ok(())
}
// Set a value
client.kv().set("key", "value", None).await?;
client.kv().set("session", "token", Some(3600)).await?; // with TTL
// Get a value
let value: Option<String> = client.kv().get("key").await?;
let number: Option<i64> = client.kv().get("counter").await?;
// Delete a key
client.kv().delete("key").await?;
// Check existence
let exists = client.kv().exists("key").await?;
// Atomic operations
let new_value = client.kv().incr("counter").await?;
let new_value = client.kv().decr("counter").await?;
// Get statistics
let stats = client.kv().stats().await?;
println!("Total keys: {}", stats.total_keys);
// Create a queue
client.queue().create_queue("tasks", Some(10000), Some(30)).await?;
// Publish a message
let msg_id = client.queue().publish(
"tasks",
b"process-video",
Some(9), // priority (0-9)
Some(3) // max retries
).await?;
// Consume a message
let message = client.queue().consume("tasks", "worker-1").await?;
if let Some(msg) = message {
// Process message
println!("Processing: {:?}", msg);
// Acknowledge (success)
client.queue().ack("tasks", &msg.id).await?;
// Or NACK (requeue)
// client.queue().nack("tasks", &msg.id).await?;
}
// Get queue stats
let stats = client.queue().stats("tasks").await?;
println!("Queue depth: {}", stats.depth);
// List all queues
let queues = client.queue().list().await?;
// Delete a queue
client.queue().delete_queue("tasks").await?;
Event streams are reactive by default - use observe_events() or observe_event() for continuous event consumption.
use futures::StreamExt;
use std::time::Duration;
// Create a stream room
client.stream().create_room("chat-room-1", Some(10000)).await?;
// Publish an event
let offset = client.stream().publish(
"chat-room-1",
"message",
json!({"user": "alice", "text": "Hello!"})
).await?;
// β¨ Reactive: Observe ALL events
let (mut events, handle) = client.stream()
.observe_events("chat-room-1", Some(0), Duration::from_millis(500));
tokio::spawn(async move {
while let Some(event) = events.next().await {
println!("Event {}: {:?}", event.offset, event.data);
}
});
// β¨ Reactive: Observe SPECIFIC event type
let (mut messages, handle2) = client.stream()
.observe_event("chat-room-1", "message", Some(0), Duration::from_millis(500));
while let Some(event) = messages.next().await {
println!("Message: {:?}", event.data);
}
// Stop observing
handle.unsubscribe();
handle2.unsubscribe();
// Get room stats
let stats = client.stream().stats("chat-room-1").await?;
// List all rooms
let rooms = client.stream().list().await?;
// Delete a room
client.stream().delete_room("chat-room-1").await?;
Pub/Sub is reactive by default - use subscribe() for event-driven message consumption.
use std::collections::HashMap;
// Publish to a topic
let delivered_count = client.pubsub().publish(
"notifications.email",
json!({"to": "user@example.com", "subject": "Welcome"}),
Some(5), // priority
None // headers
).await?;
// β¨ Subscribe to topics (with wildcards)
let sub_id = client.pubsub().subscribe_topics(
"user-123", // subscriber ID
vec![
"events.user.*".to_string(), // single-level wildcard
"notifications.#".to_string(), // multi-level wildcard
]
).await?;
// TODO: Reactive subscription (coming soon)
// let (mut messages, handle) = client.pubsub()
// .observe("user-123", vec!["events.*"]);
// Unsubscribe
client.pubsub().unsubscribe("user-123", vec![
"events.user.*".to_string(),
"notifications.#".to_string(),
]).await?;
// List active topics
let topics = client.pubsub().list_topics().await?;
use synap_sdk::SynapConfig;
use std::time::Duration;
let config = SynapConfig::new("http://localhost:15500")
.with_timeout(Duration::from_secs(10))
.with_auth_token("your-api-key")
.with_max_retries(5);
let client = SynapClient::new(config)?;
use synap_sdk::SynapError;
match client.kv().get::<String>("key").await {
Ok(Some(value)) => println!("Found: {}", value),
Ok(None) => println!("Key not found"),
Err(SynapError::HttpError(e)) => eprintln!("HTTP error: {}", e),
Err(SynapError::ServerError(e)) => eprintln!("Server error: {}", e),
Err(e) => eprintln!("Error: {}", e),
}
The SDK now includes RxJS-style reactive patterns via the rx module:
use synap_sdk::rx::{Observable, Subject};
// Observable with operators (like RxJS pipe)
let obs = Observable::from_stream(stream);
obs.filter(|x| *x > 2)
.map(|x| x * 2)
.take(10)
.subscribe_next(|value| {
println!("Value: {}", value);
});
// Subject for multicasting
let subject = Subject::new();
subject.subscribe(|msg| println!("Sub 1: {}", msg));
subject.subscribe(|msg| println!("Sub 2: {}", msg));
subject.next("Hello"); // Both subscribers receive it
See src/rx/README.md for complete guide.
See the examples/ directory for more examples:
basic.rs - Basic KV operationsqueue.rs - Task queue pattern (traditional)reactive_queue.rs - Reactive queue consumption π₯stream.rs - Event stream (traditional)reactive_stream.rs - Reactive event consumption π₯pubsub.rs - Pub/Sub messagingrxjs_style.rs - RxJS-style patterns β NEWRun an example:
cargo run --example basic
cargo run --example queue
cargo run --example reactive_queue # Recommended for queues
cargo run --example reactive_stream # Recommended for streams
cargo run --example rxjs_style # RxJS-style API
cargo run --example pubsub
# Run tests (requires Synap server running on localhost:15500)
cargo test
# Or use a custom server URL
SYNAP_URL=http://localhost:15500 cargo test
MIT License - See LICENSE for details.