Crates.io | fuel-streams |
lib.rs | fuel-streams |
version | |
source | src |
created_at | 2024-08-24 15:55:34.584081+00 |
updated_at | 2025-03-15 00:02:14.72036+00 |
description | A library for working with streams of Fuel blockchain data |
homepage | https://fuel.network/ |
repository | https://github.com/fuellabs/data-systems |
max_upload_size | |
id | 1350386 |
Cargo.toml error: | TOML parse error at line 19, column 1 | 19 | autolib = false | ^^^^^^^ unknown field `autolib`, expected one of `name`, `version`, `edition`, `authors`, `description`, `readme`, `license`, `repository`, `homepage`, `documentation`, `build`, `resolver`, `links`, `default-run`, `default_dash_run`, `rust-version`, `rust_dash_version`, `rust_version`, `license-file`, `license_dash_file`, `license_file`, `licenseFile`, `license_capital_file`, `forced-target`, `forced_dash_target`, `autobins`, `autotests`, `autoexamples`, `autobenches`, `publish`, `metadata`, `keywords`, `categories`, `exclude`, `include` |
size | 0 |
Fuel Streams is a Rust library designed for working with streams of Fuel blockchain data. It provides an efficient and user-friendly interface for developers to interact with real-time and historical blockchain data, offering support for Fuel-specific data types and leveraging NATS for scalable streaming.
First, add these dependencies to your project:
cargo add fuel-streams futures tokio
Here are some examples to get you started with Fuel Streams:
use fuel_streams::prelude::*;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a client and establish connection
let mut client = Client::new(FuelNetwork::Local).with_api_key("your_key");
let mut connection = client.connect().await?;
println!("Listening for blocks...");
// Choose which subjects do you wanna filter
let subjects = vec![BlocksSubject::new().into()];
// Subscribe to blocks with last deliver policy
let mut stream = connection
.subscribe(subjects, DeliverPolicy::New)
.await?;
while let Some(block) = stream.next().await {
println!("Received block: {:?}", block);
}
Ok(())
}
Each data type has its own subject builder for filtering. Here's an example using transaction filtering:
use fuel_streams::prelude::*;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::new(FuelNetwork::Local).with_api_key("your_key");
let mut connection = client.connect().await?;
println!("Listening for transactions...");
// Create a subject for script transactions
let subjects = vec![
TransactionsSubject::new()
.with_tx_type(Some(TransactionType::Script))
.into()
];
// Subscribe to the filtered transaction stream
let mut stream = connection
.subscribe(subjects, DeliverPolicy::New)
.await?;
while let Some(transaction) = stream.next().await {
println!("Received transaction: {:?}", transaction);
}
Ok(())
}
Available subject builders include:
BlocksSubject::new()
TransactionsSubject::new()
InputsSubject::new()
OutputsSubject::new()
LogsSubject::new()
UtxosSubject::new()
Each subject builder provides specific filtering methods relevant to its data type. For example, TransactionsSubject
allows filtering by transaction type using the with_tx_type()
method.
The Fuel Streams library allows you to subscribe to multiple types of data simultaneously. You can create instances of different subjects, such as BlocksSubject
and TransactionsSubject
, and pass them as a vector to the subscribe
method:
use fuel_streams::prelude::*;
use futures::StreamExt;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut client = Client::new(FuelNetwork::Local).with_api_key("test");
let mut connection = client.connect().await?;
println!("Listening for blocks and transactions...");
let block_subject = BlocksSubject::new();
let tx_subject = TransactionsSubject::new();
let filter_subjects = vec![block_subject.into(), tx_subject.into()];
// Subscribe to the block and transaction streams with the specified configuration
let mut stream = connection
.subscribe(filter_subjects, DeliverPolicy::FromBlock {
block_height: 0.into(),
})
.await?;
// Process incoming blocks and transactions
while let Some(msg) = stream.next().await {
let msg = msg?;
match &msg.payload {
MessagePayload::Block(block) => {
println!("Received block: {:?}", block)
}
MessagePayload::Transaction(tx) => {
println!("Received transaction: {:?}", tx)
}
_ => panic!("Wrong data"),
};
}
Ok(())
}
DeliverPolicy
OptionsThe DeliverPolicy
enum provides control over message Deliver in your subscriptions:
New
: Delivers only new messages that arrive after subscriptionFromHeight(u64)
: Delivers messages starting from a specific block heightContributions are welcome! Please feel free to submit a Pull Request.
For more information on contributing, please see the CONTRIBUTING.md file in the root of the repository.
This project is licensed under the Apache-2.0
license. See LICENSE
for more information.