Crates.io | krossbar-rpc |
lib.rs | krossbar-rpc |
version | 0.5.7 |
source | src |
created_at | 2024-06-07 12:40:02.164477 |
updated_at | 2024-06-19 12:48:39.972074 |
description | Krossbar RPC library |
homepage | https://krossbar.rs |
repository | https://github.com/krossbar-platform/krossbar-common |
max_upload_size | |
id | 1264748 |
size | 71,198 |
RPC library used by Krossbar platform for communication.
The library:
Use [rpc::Rpc::poll] method to poll the stream. This includes waiting for a call or subscriptions response.
RPC calls:
use futures::{select, FutureExt};
use tokio::net::UnixStream;
use krossbar_rpc::rpc::Rpc;
async fn call() {
let stream = UnixStream::connect("/tmp/hub.sock").await.unwrap();
let mut rpc = Rpc::new(stream);
let call = rpc.call::<u32, u32>("echo", &42).await.unwrap();
select! {
response = call.fuse() => {
println!("Call response: {response:?}")
},
_ = rpc.poll().fuse() => {}
}
}
RPC subscription:
use futures::{select, FutureExt, StreamExt};
use tokio::net::UnixStream;
use krossbar_rpc::rpc::Rpc;
async fn subscribe() {
let stream = UnixStream::connect("/tmp/hub.sock").await.unwrap();
let mut rpc = Rpc::new(stream);
let subscription = rpc.subscribe::<u32>("signal").await.unwrap();
select! {
response = subscription.take(2).collect::<Vec<krossbar_rpc::Result<u32>>>() => {
println!("Subscription response: {response:?}")
},
_ = rpc.poll().fuse() => {}
}
}
One-way message:
use futures::{select, FutureExt};
use tokio::net::UnixStream;
use krossbar_rpc::rpc::Rpc;
async fn message() {
let stream = UnixStream::connect("/tmp/hub.sock").await.unwrap();
let mut rpc = Rpc::new(stream);
let call = rpc.send_message("echo", &42).await.unwrap();
let incoming_message = rpc.poll().await;
}
Polling imcoming messages:
use futures::{select, FutureExt};
use tokio::net::UnixStream;
use krossbar_rpc::{rpc::Rpc, request::Body};
async fn poll() {
let stream = UnixStream::connect("/tmp/hub.sock").await.unwrap();
let mut rpc = Rpc::new(stream);
loop {
let request = rpc.poll().await;
if request.is_none() {
println!("Client disconnected");
return;
}
let mut request = request.unwrap();
println!("Incoming method call: {}", request.endpoint());
match request.take_body().unwrap() {
Body::Message(bson) => {
println!("Incoming message: {bson:?}");
},
Body::Call(bson) => {
println!("Incoming call: {bson:?}");
request.respond(Ok(bson)).await;
},
Body::Subscription => {
println!("Incoming subscription");
request.respond(Ok(41)).await;
request.respond(Ok(42)).await;
request.respond(Ok(43)).await;
},
Body::Fd { client_name, .. } => {
println!("Incoming connection request from {client_name}");
request.respond(Ok(())).await;
}
}
}
}
See tests/
for more examples.