| Crates.io | flamewire-bittensor-indexer |
| lib.rs | flamewire-bittensor-indexer |
| version | 0.1.3 |
| created_at | 2025-07-10 14:59:55.746185+00 |
| updated_at | 2025-07-16 13:44:22.083159+00 |
| description | Rust Flamewire Indexer for Bittensor |
| homepage | |
| repository | https://github.com/unitone-labs/bittensor-indexer |
| max_upload_size | |
| id | 1746605 |
| size | 277,394 |
A high-performance, production-ready Rust indexer for the Bittensor blockchain with advanced event processing, resilient error handling, and flexible storage options.
Flamewire Bittensor Indexer is an enterprise-grade solution for indexing and processing Bittensor blockchain data. Built with Rust for maximum performance and safety, it provides a comprehensive framework for real-time blockchain data processing with advanced features like circuit breakers, automatic retries, parallel processing, and multiple storage backends.
Add this to your Cargo.toml:
[dependencies]
flamewire-bittensor-indexer = "0.1.0"
# With optional features
flamewire-bittensor-indexer = { version = "0.1.0", features = ["postgres", "sqlite"] }
json-storage (default): JSON file-based checkpoint storagepostgres: PostgreSQL database backendsqlite: SQLite database backendtesting: Additional testing utilitiesuse flamewire_bittensor_indexer::prelude::*;
struct EventLogger;
#[async_trait]
impl Handler<SubstrateConfig> for EventLogger {
async fn handle_event(
&self,
event: &ChainEvent<SubstrateConfig>,
ctx: &Context<SubstrateConfig>,
) -> Result<(), IndexerError> {
println!(
"Block {}: {}.{}",
ctx.block_number,
event.pallet_name(),
event.variant_name()
);
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut indexer = IndexerBuilder::<SubstrateConfig>::new()
.connect(WebSocketUrl::parse("wss://archive.chain.opentensor.ai:443")?)
.start_from_block(1000)
.add_handler(EventLogger)
.build()
.await?;
indexer.run().await?;
Ok(())
}
use flamewire_bittensor_indexer::prelude::*;
#[derive(Debug, Decode, DecodeAsType)]
struct TransferEvent {
from: AccountId32,
to: AccountId32,
amount: u128,
}
impl StaticEvent for TransferEvent {
const PALLET: &'static str = "Balances";
const EVENT: &'static str = "Transfer";
}
struct TransferProcessor;
#[async_trait]
impl Handler<SubstrateConfig> for TransferProcessor {
fn event_filter(&self) -> EventFilter {
EventFilter::event("Balances", "Transfer")
}
async fn handle_event(
&self,
event: &ChainEvent<SubstrateConfig>,
ctx: &Context<SubstrateConfig>,
) -> Result<(), IndexerError> {
if let Some(transfer) = event.as_event::<TransferEvent>()? {
println!(
"Transfer: {} -> {} (Amount: {})",
transfer.from, transfer.to, transfer.amount
);
}
Ok(())
}
async fn handle_error(&self, error: &IndexerError, ctx: &Context<SubstrateConfig>) {
eprintln!("Error processing transfer at block {}: {}", ctx.block_number, error);
}
}
let pipeline = HandlerGroup::new()
.add(DataExtractor) // Extract data from events
.pipe_to(DataTransformer) // Transform extracted data
.pipe_to(DataSaver); // Save to database
let mut indexer = IndexerBuilder::<SubstrateConfig>::new()
.connect(WebSocketUrl::parse("wss://node.url")?)
.add_handler_group(pipeline)
.build()
.await?;
let parallel_handlers = HandlerGroup::parallel()
.add(DatabaseSaver) // Save to primary database
.add(BackupSaver) // Save to backup storage
.add(MetricsCollector) // Update metrics
.add(CacheUpdater); // Update cache
let mut indexer = IndexerBuilder::<SubstrateConfig>::new()
.connect(WebSocketUrl::parse("wss://node.url")?)
.add_handler_group(parallel_handlers)
.build()
.await?;
let critical_pipeline = HandlerGroup::new()
.strict() // Stop on first error
.add(DataValidator)
.add(CriticalDataSaver);
let conditional_group = HandlerGroup::new()
.add_conditional(TransferHandler, |event| {
event.pallet_name() == "Balances" &&
event.variant_name() == "Transfer"
})
.add_conditional(StakingHandler, |event| {
event.pallet_name() == "Staking"
});
let indexer = IndexerBuilder::<SubstrateConfig>::new()
.connect(WebSocketUrl::parse("wss://node.url")?)
// JSON storage in ./database/checkpoint.json (default)
.build()
.await?;
let indexer = IndexerBuilder::<SubstrateConfig>::new()
.connect(WebSocketUrl::parse("wss://node.url")?)
.with_sqlite("sqlite://./indexer.db")
.build()
.await?;
let indexer = IndexerBuilder::<SubstrateConfig>::new()
.connect(WebSocketUrl::parse("wss://node.url")?)
.with_postgres("postgres://user:password@localhost:5432/bittensor_data")
.build()
.await?;
let indexer = IndexerBuilder::<SubstrateConfig>::new()
.connect(WebSocketUrl::parse("wss://node.url")?)
.start_from_block(1_000_000) // Start from specific block
.end_at_block(2_000_000) // Process until this block
.build()
.await?;
use flamewire_bittensor_indexer::{RetryConfig, CircuitBreaker};
use std::time::Duration;
let retry_config = RetryConfig {
max_retries: 5,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(30),
backoff_multiplier: 2.0,
};
let circuit_breaker = CircuitBreaker::new(3, Duration::from_secs(60));
#[async_trait]
impl Handler<SubstrateConfig> for RobustHandler {
async fn handle_event(
&self,
event: &ChainEvent<SubstrateConfig>,
ctx: &Context<SubstrateConfig>,
) -> Result<(), IndexerError> {
// Your processing logic
Ok(())
}
async fn handle_error(&self, error: &IndexerError, ctx: &Context<SubstrateConfig>) {
match error {
IndexerError::ConnectionFailed { url, source } => {
eprintln!("Connection failed to {}: {}", url, source);
}
IndexerError::EventDecodingFailed { pallet, event, block, .. } => {
eprintln!("Failed to decode {}.{} at block {}", pallet, event, block);
}
IndexerError::HandlerFailed { handler, block, .. } => {
eprintln!("Handler {} failed at block {}", handler, block);
}
IndexerError::CheckpointError { operation, backend, .. } => {
eprintln!("Checkpoint {} failed on {}", operation, backend);
}
_ => eprintln!("Other error: {}", error),
}
}
}
use flamewire_bittensor_indexer::{CircuitBreaker, retry_with_backoff, RetryConfig};
struct ExternalServiceHandler {
circuit_breaker: Arc<CircuitBreaker>,
}
#[async_trait]
impl Handler<SubstrateConfig> for ExternalServiceHandler {
async fn handle_event(
&self,
event: &ChainEvent<SubstrateConfig>,
ctx: &Context<SubstrateConfig>,
) -> Result<(), IndexerError> {
if self.circuit_breaker.is_open() {
println!("Circuit breaker open - skipping external service call");
return Ok(());
}
// Attempt external service call with retry
let result = retry_with_backoff(
|| async { /* external service call */ Ok(()) },
&RetryConfig::default(),
&self.circuit_breaker,
).await;
match result {
Ok(_) => self.circuit_breaker.record_success(),
Err(_) => self.circuit_breaker.record_failure(),
}
result
}
}
// Process all events
EventFilter::all()
// Process all events from a specific pallet
EventFilter::pallet("Balances")
// Process specific events only
EventFilter::event("Balances", "Transfer")
struct DynamicHandler {
target_pallets: Vec<String>,
}
#[async_trait]
impl Handler<SubstrateConfig> for DynamicHandler {
fn event_filter(&self) -> EventFilter {
EventFilter::all() // We'll filter manually
}
async fn handle_event(
&self,
event: &ChainEvent<SubstrateConfig>,
ctx: &Context<SubstrateConfig>,
) -> Result<(), IndexerError> {
if self.target_pallets.contains(&event.pallet_name().to_string()) {
// Process this event
println!("Processing {} event", event.pallet_name());
}
Ok(())
}
}
struct DataExtractor;
#[async_trait]
impl Handler<SubstrateConfig> for DataExtractor {
async fn handle_event(
&self,
event: &ChainEvent<SubstrateConfig>,
ctx: &Context<SubstrateConfig>,
) -> Result<(), IndexerError> {
if let Some(transfer) = event.as_event::<TransferEvent>()? {
// Store data for next handler in pipeline
ctx.set_pipeline_data("transfer", transfer);
}
Ok(())
}
}
struct DataProcessor;
#[async_trait]
impl Handler<SubstrateConfig> for DataProcessor {
async fn handle_event(
&self,
_event: &ChainEvent<SubstrateConfig>,
ctx: &Context<SubstrateConfig>,
) -> Result<(), IndexerError> {
// Retrieve data from previous handler
if let Some(transfer) = ctx.get_pipeline_data::<TransferEvent>("transfer") {
println!("Processing transfer: {:?}", transfer);
}
Ok(())
}
}
# Run all tests
cargo test --all-features
# Run unit tests only
cargo test --test unit
# Run integration tests
cargo test --test integration
# Run with logging
RUST_LOG=debug cargo test --all-features
The indexer includes comprehensive property-based tests using proptest:
# Run property-based tests
cargo test prop_ --all-features
// CPU-intensive handlers benefit from parallel execution
let cpu_intensive = HandlerGroup::parallel()
.add(DataAnalyzer)
.add(MetricsCalculator)
.add(ReportGenerator);
// For high-throughput scenarios, use PostgreSQL
let indexer = IndexerBuilder::<SubstrateConfig>::new()
.connect(WebSocketUrl::parse("wss://node.url")?)
.with_postgres("postgres://user:pass@localhost/db?sslmode=require")
.build()
.await?;
We welcome contributions! Please see CONTRIBUTING.md for guidelines.
Clone the repository:
git clone https://github.com/unitone-labs/bittensor-indexer.git
cd bittensor-indexer
Install Rust (1.88.0 or later):
rustup update stable
Run tests:
cargo test --all-features
Check formatting and linting:
cargo fmt --all -- --check
cargo clippy --all-targets --all-features -- -D warnings
cargo fmt for formattingThis project is licensed under the Apache License 2.0 - see the LICENSE file for details.
If you encounter issues or have questions:
Built with โค๏ธ for the Bittensor ecosystem by Flamewire.
Special thanks to the Bittensor community and the Rust ecosystem maintainers.