Crates.io | kiteticker-async-manager |
lib.rs | kiteticker-async-manager |
version | 0.3.4 |
created_at | 2025-06-21 00:41:46.485169+00 |
updated_at | 2025-09-04 01:21:51.735952+00 |
description | High-performance async WebSocket client for Kite Connect API with multi-connection support, dynamic subscription management, and optimized data processing. |
homepage | |
repository | https://github.com/SPRAGE/kiteticker-async-manager |
max_upload_size | |
id | 1720412 |
size | 604,796 |
High-performance async WebSocket client for the Kite Connect API with multi-connection support and dynamic subscription management.
π Documentation | π Getting Started | π Examples | π§ API Reference
Add to your Cargo.toml
:
[dependencies]
kiteticker-async-manager = "0.2.1"
tokio = { version = "1.0", features = ["full"] }
use kiteticker_async_manager::{KiteTickerManager, KiteManagerConfig, Mode, TickerMessage};
#[tokio::main]
async fn main() -> Result<(), String> {
// Setup credentials
let api_key = std::env::var("KITE_API_KEY").unwrap();
let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap();
// Create high-performance manager
let config = KiteManagerConfig {
max_connections: 3,
max_symbols_per_connection: 3000,
enable_dedicated_parsers: true,
default_mode: Mode::LTP,
..Default::default()
};
// Start manager
let mut manager = KiteTickerManager::new(api_key, access_token, config);
manager.start().await?;
// Subscribe to symbols (automatically distributed across connections)
let symbols = vec![256265, 408065, 738561]; // NIFTY 50, HDFC Bank, Reliance
manager.subscribe_symbols(&symbols, Some(Mode::Quote)).await?;
// Process data from independent channels
let channels = manager.get_all_channels();
for (channel_id, mut receiver) in channels {
tokio::spawn(async move {
while let Ok(message) = receiver.recv().await {
if let TickerMessage::Ticks(ticks) = message {
for tick in ticks {
println!("Channel {:?}: {} @ βΉ{:.2}",
channel_id,
tick.instrument_token,
tick.content.last_price.unwrap_or(0.0));
}
}
}
});
}
// Add symbols dynamically
manager.subscribe_symbols(&[5633, 884737], Some(Mode::Full)).await?;
// Remove symbols
manager.unsubscribe_symbols(&[408065]).await?;
// Change subscription mode
manager.change_mode(&[256265], Mode::Full).await?;
Ok(())
}
Feature | Single Connection | Multi-Connection Manager | Improvement |
---|---|---|---|
Max Symbols | 3,000 | 9,000 | 3x capacity |
Throughput | Limited by 1 connection | 3 parallel connections | 3x throughput |
Latency | ~5-10Β΅s | ~1-2Β΅s | 5x faster |
Resilience | Single point of failure | 3 independent connections | High availability |
Dynamic Ops | Manual reconnection | Runtime add/remove | Zero downtime |
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β KiteTickerManager β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β π Symbol Distribution (9,000 symbols max) β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β βConnection 1 β βConnection 2 β βConnection 3 β β
β β3,000 symbolsβ β3,000 symbolsβ β3,000 symbolsβ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β‘ Dedicated Parser Tasks (CPU Optimized) β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Parser 1 β β Parser 2 β β Parser 3 β β
β β ~1Β΅s latencyβ β ~1Β΅s latencyβ β ~1Β΅s latencyβ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β π‘ Independent Output Channels β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Channel 1 β β Channel 2 β β Channel 3 β β
β βbroadcast::Rxβ βbroadcast::Rxβ βbroadcast::Rxβ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Use Case | Configuration | Symbols | Example |
---|---|---|---|
Portfolio Monitoring | 1 connection, Quote mode | 10-50 | Track personal investments |
Algorithmic Trading | 3 connections, Quote mode | 100-1,000 | Trading strategies |
Market Scanner | 3 connections, LTP mode | 1,000-9,000 | Scan entire market |
High-Frequency Trading | 3 connections, Full mode | 500-3,000 | Order book analysis |
let config = KiteManagerConfig {
max_connections: 1,
max_symbols_per_connection: 100,
default_mode: Mode::Full,
..Default::default()
};
let config = KiteManagerConfig {
max_connections: 3,
max_symbols_per_connection: 3000,
connection_buffer_size: 20000,
parser_buffer_size: 50000,
enable_dedicated_parsers: true,
default_mode: Mode::LTP,
..Default::default()
};
Feature | Official kiteconnect-rs | kiteticker-async-manager |
---|---|---|
Maintenance | β Unmaintained | β Actively maintained |
Async Support | β Callback-based | β Full async/await |
Type Safety | β Untyped JSON | β Fully typed structs |
Multi-Connection | β Single connection | β Up to 3 connections |
Dynamic Subscriptions | β Manual reconnection | β Runtime add/remove |
Performance | β Basic | β High-performance optimized |
Error Handling | β Limited | β Comprehensive |
# Install Rust and tools
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
cargo install just # Task runner
# Clone and build
git clone https://github.com/SPRAGE/kiteticker-async-manager.git
cd kiteticker-async-manager
just build
# Set API credentials
export KITE_API_KEY=your_api_key
export KITE_ACCESS_TOKEN=your_access_token
# Run examples
cargo run --example single_connection
cargo run --example dynamic_subscription_demo
cargo run --example raw_full_peek --release
cargo run --example raw_vs_parsed --release
just --list
Contributions are welcome! Please see our contribution guidelines.
Use just to run development tasks:
just --list # Show available tasks
just build # Build the project
just check # Check code formatting and lints
Licensed under the Apache License, Version 2.0. See LICENSE for details.
β Star this repository if you find it useful!
For maximum throughput with minimal allocations, you can work directly with raw WebSocket frame bytes and view packet bodies using endian-safe, zero-copy structs.
Key points:
subscribe_raw_frames()
on KiteTickerAsync
, or
via the manager using get_raw_frame_channel(ChannelId)
or get_all_raw_frame_channels()
as_tick_raw
, as_index_quote_32
, or as_inst_header_64
zerocopy::Ref<&[u8], T>
dereferences to &T
and is valid while the backing bytes live (store Bytes
to keep alive)Example snippet:
use kiteticker_async_manager::{KiteTickerAsync, Mode, as_tick_raw};
use bytes::Bytes;
# async fn demo(mut ticker: KiteTickerAsync) -> Result<(), String> {
let mut frames = ticker.subscribe_raw_frames();
let frame: Bytes = frames.recv().await.unwrap();
let num = u16::from_be_bytes([frame[0], frame[1]]) as usize;
let mut off = 2usize;
for _ in 0..num {
let len = u16::from_be_bytes([frame[off], frame[off+1]]) as usize;
let body = frame.slice(off+2..off+2+len);
if len == 184 {
if let Some(view_ref) = as_tick_raw(&body) {
let tick = &*view_ref;
println!("token={} ltp_scaled={}", tick.header.instrument_token.get(), tick.header.last_price.get());
}
}
off += 2 + len;
}
Ok(())
# }
Safety: All raw structs derive Unaligned
and use big-endian wrappers; no unsafe
is required.
use kiteticker_async_manager::{KiteTickerManagerBuilder, Mode, ChannelId, as_tick_raw};
# #[tokio::main]
# async fn main() -> Result<(), String> {
let api_key = std::env::var("KITE_API_KEY").unwrap();
let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap();
let mut mgr = KiteTickerManagerBuilder::new(api_key, access_token)
.raw_only(true)
.build();
mgr.start().await?;
mgr.subscribe_symbols(&[256265], Some(Mode::Full)).await?;
for (id, mut rx) in mgr.get_all_raw_frame_channels() {
tokio::spawn(async move {
while let Ok(frame) = rx.recv().await {
if frame.len() < 2 { continue; }
let mut off = 2usize;
let num = u16::from_be_bytes([frame[0], frame[1]]) as usize;
for _ in 0..num {
if off + 2 > frame.len() { break; }
let len = u16::from_be_bytes([frame[off], frame[off+1]]) as usize;
let body = frame.slice(off+2..off+2+len);
if len == 184 {
if let Some(view) = as_tick_raw(&body) {
let token = view.header.instrument_token.get();
println!("conn={:?} token={}", id, token);
}
}
off += 2 + len;
}
}
});
}
# Ok(()) }
Or, if you only want Full depth packets, use the helper:
use kiteticker_async_manager::{KiteTickerManagerBuilder, Mode, ChannelId, KiteTickerRawSubscriber184};
# #[tokio::main]
# async fn main() -> Result<(), String> {
let api_key = std::env::var("KITE_API_KEY").unwrap();
let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap();
let mut mgr = KiteTickerManagerBuilder::new(api_key, access_token)
.raw_only(true)
.build();
mgr.start().await?;
mgr.subscribe_symbols(&[256265], Some(Mode::Full)).await?;
if let Some(mut sub) = mgr.get_full_raw_subscriber(ChannelId::Connection1) {
tokio::spawn(async move {
while let Ok(Some(view)) = sub.recv_raw_tickraw().await {
let t = &*view; // &TickRaw
println!("token={} ltp={}", t.header.instrument_token.get(), t.header.last_price.get());
}
});
}
# Ok(()) }