Crates.io | pallas-multiplexer |
lib.rs | pallas-multiplexer |
version | 0.18.3 |
source | src |
created_at | 2021-12-03 00:06:25.215829 |
updated_at | 2024-10-23 20:19:39.697668 |
description | Multithreaded Ouroboros multiplexer implementation using mpsc channels |
homepage | https://github.com/txpipe/pallas |
repository | https://github.com/txpipe/pallas |
max_upload_size | |
id | 491477 |
size | 188,678 |
This is an implementation of the Ouroboros multiplexer logic as defined in the The Shelley Networking Protocol specs.
The following architectural decisions were made for this particular Rust implementation:
Given the above definitions, Rust's mpsc channels seem like the correct artifact to orchestrate the communication between the different threads in the multiplexer process.
The following diagram provides an overview of the components involved:
The following code provides a very rough example of how to setup a client that connects to a node and spawns two concurrent threads running independently, both communication over the same bearer using Pallas multiplexer.
// Setup a new bearer. In this case, we use a unix socket to connect
// to a node running on the local machine.
let bearer = UnixStream::connect("/tmp/pallas").unwrap();
// Setup a new multiplexer using the created bearer and a specification
// of the mini-protocol IDs that we'll be using for our session. In this case, we
// pass id #0 (handshake) and #2 (chainsync).
let muxer = Multiplexer::setup(tcp, &[0, 2])
// Ask the multiplexer to provide us with the channel for the miniprotocol #0.
let mut channel_0 = muxer.use_channel(0);
// Spawn a thread and pass the ownership of the channel.
thread::spawn(move || {
// Deconstruct the channel to get a handle for sending data into the muxer
// ingress and a handle to receive data from the demuxer egress.
let Channel(mux_tx, demux_rx) = channel_0;
// Do something with the channel. In this case, we just keep sending
// dumb data every 50 millis.
loop {
let payload = vec![1; 65545];
tx.send(payload).unwrap();
thread::sleep(Duration::from_millis(50));
}
});
// Ask the multiplexer to provide us with the channel for the miniprotocol #2.
let mut channel_2 = muxer.use_channel(2);
// Spawn a different thread and pass the ownership of the 2nd channel.
thread::spawn(move || {
// Deconstruct the channel to get a handle for sending data into the muxer
// ingress and a handle to receive data from the demuxer egress.
let Channel(mux_tx, demux_rx) = channel_2;
// Do something with the channel. In this case, we just print in stdout
// whatever get received for this mini-protocol.
loop {
let payload = rx.recv().unwrap();
println!("id:{protocol}, length:{}", payload.len());
}
});
For a working example of a two peers communicating (a sender and a listener), check the examples folder. To run the examples, open two different terminals and run a different peer in each one:
# on terminal 1, start the listener
RUST_LOG=info cargo run --example listener
# on terminal 2, start the sender
RUST_LOG=info cargo run --example sender
For a more complex, real-world example, check the Oura repo, it provides a full-blown client tool designed to live-stream block data from a local or remote node.