| Crates.io | prk_eventbus |
| lib.rs | prk_eventbus |
| version | 0.1.2 |
| created_at | 2025-04-10 09:34:50.941688+00 |
| updated_at | 2025-04-10 12:59:24.06796+00 |
| description | A lightweight, WebSocket-based event bus for Rust with SQLite persistence |
| homepage | https://github.com/prk-Jr/prk_eventbus |
| repository | https://github.com/prk-Jr/prk_eventbus |
| max_upload_size | |
| id | 1627957 |
| size | 172,365 |
The Event Bus Service is a versatile, WebSocket-based event bus built in Rust using the prk_eventbus library. It enables asynchronous, decoupled communication for distributed systems or within a single application, with SQLite-backed persistent storage. Whether embedded in an Axum server, run as a standalone service, or used for simple pub-sub scenarios, it supports robust publish-subscribe patterns with features like event batching, TTL, and acknowledgment.
cargo).sqlx; no separate installation needed.Clone the Repository:
git clone https://github.com/prk-Jr/prk_eventbus.git
cd prk_eventbus
Install Dependencies:
cargo build
Run the Service:
cargo run (see Standalone Usage).serve (standalone) or SocketAddr (Axum) (e.g., "127.0.0.1:3000").SQLiteStorage::new (e.g., "eventbus.db")./ws; customize via axum_router nesting.ClientConfig (e.g., reconnect_interval, max_retries).Run the event bus as a standalone server with a publisher and subscriber in one process, ideal for testing or simple applications.
ws://127.0.0.1:3000/ws.chat.user1.chat.*, acknowledges messages, and times out after 3 seconds.use std::time::Duration;
use prk_eventbus::{adapters::{WsConfig, WsTransport}, client::{ClientConfig, EventBusClient}, core::error::EventBusError, storage::dummy_storage::NoStorage};
use tokio::task::JoinHandle;
#[tokio::main]
async fn main() -> Result<(), EventBusError> {
#[cfg(feature = "tracing")]
{
tracing_subscriber::fmt()
.with_env_filter(std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()))
.init();
}
let server_handle: JoinHandle<()> = tokio::spawn(async {
let ws_config = WsConfig {
channel_capacity: 1000,
auto_ack: true,
};
let transport: WsTransport<NoStorage> = WsTransport::new(None, ws_config);
transport.serve("127.0.0.1:3000").await.unwrap();
});
tokio::time::sleep(Duration::from_millis(100)).await;
let config = ClientConfig {
url: "ws://127.0.0.1:3000/ws".to_string(),
reconnect_interval: Duration::from_secs(2),
max_retries: 5,
};
let mut publisher = EventBusClient::connect(config.clone()).await?;
let mut subscriber = EventBusClient::connect(config).await?;
subscriber.subscribe("chat.*", None).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let sub_handle: JoinHandle<Result<(), EventBusError>> = tokio::spawn(async move {
#[cfg(feature = "tracing")]
tracing::info!("Subscriber task started");
let mut received = 0;
loop {
let mut messages = subscriber.messages();
match tokio::time::timeout(Duration::from_secs(5), messages.next()).await {
Ok(Ok(msg)) => {
let payload = String::from_utf8_lossy(&msg.payload);
#[cfg(feature = "tracing")]
if received % 100 == 0 {
tracing::info!(received = received, "Subscriber received message: {}", payload);
}
subscriber.acknowledge(msg.seq, msg.message_id).await?;
received += 1;
if received == 1000 { break; }
}
Ok(Err(e)) => return Err(e),
Err(_) => {
#[cfg(feature = "tracing")]
tracing::warn!(received = received, "Subscriber timed out");
break;
}
}
}
println!("Subscriber received {} messages", received);
Ok(())
});
for i in 0..1000 {
publisher.publish("chat.stress", &format!("Stress message {}", i), None, Some(3600)).await?;
}
tokio::time::sleep(Duration::from_secs(5)).await;
drop(publisher);
let _ = sub_handle.await;
server_handle.abort();
println!("Stress test completed!");
Ok(())
}
cargo run --features storage_tracing
Key Steps:
WsTransport server task.EventBusClient."Hello from User1!") and batches with TTL (3600s).Output:
Subscriber received: [chat.user1] Hello from User1!
Subscriber received: [chat.user1] How's it going?
Subscriber received: [chat.user1] Batch message 1
Subscriber received: [chat.user1] Batch message 2
Test completed!
Run with tracing: RUST_LOG=debug cargo run.
Embed the event bus in an Axum server to host it alongside HTTP routes, publishing events internally.
http://127.0.0.1:3000 with event bus at ws://127.0.0.1:3000/ws.POST /api/users creates users and publishes user.created.EventBusClient connects lazily to publish events.Key Function:
pub fn axum_router<T: Clone + Sync + Send + 'static>(&self, state: T) -> Router<T> {
let storage = self.storage.clone();
let bus = self.bus.clone();
Router::new().route("/ws", get({
let storage = storage.clone();
move |ws| Self::handle_ws(ws, bus.clone(), storage.clone())
})).with_state(state)
}
Usage:
WsTransport::axum_router into the Axum router.curl -X POST http://127.0.0.1:3000/api/users -d '{"id": 1, "username": "alice"}' → user.created.Connect a separate microservice to the event bus to subscribe and process events, enabling cross-service synchronization.
http://127.0.0.1:5000, connects to ws://127.0.0.1:3000/ws.user.* in a background task.user.created, deserializes the payload and saves the user locally.Workflow:
user.created.Output:
Received message: CoreMessage { topic: "user.created", payload: "{\"id\":1,\"username\":\"alice\"}"... }
User created: User { id: 1, username: "alice" }
Events are JSON-serialized:
topic: String (e.g., "user.created", "chat.user1").payload: Bytes/String (e.g., {"id": 1, "username": "alice"}, "Hello from User1!").message_id: Optional string.ttl: Optional integer (seconds).seq: Auto-incremented sequence for acknowledgment.SQLite (eventbus.db):
SELECT * FROM messages;
-- seq | topic | payload | metadata | ttl | status
-- 1 | user.created | {"id": 1, "username": "alice"} | {} | 0 | pending
-- 2 | chat.user1 | Hello from User1! | {} | 3600 | processed
eventbus.db with messages and processed_messages tables.acknowledge to mark events as processed (consumer example).seq (if supported).cargo run → Runs server, publisher, and subscriber in one.cargo run → Hosts at ws://127.0.0.1:3000/ws.curl -X POST http://127.0.0.1:3000/api/users -d '{"id": 1, "username": "alice"}'.cargo run → Connects to ws://127.0.0.1:3000/ws, syncs users.tokio::time::timeout durations in subscribers.RUST_LOG=debug cargo run for detailed logs.Fork, branch, commit, and submit a pull request:
git checkout -b feature/your-featuregit commit -m "Add your feature"git push origin feature/your-featureMIT License. See LICENSE.