| Crates.io | kdb_codec |
| lib.rs | kdb_codec |
| version | 1.1.0 |
| created_at | 2025-12-26 04:14:06.318519+00 |
| updated_at | 2026-01-13 18:28:32.691747+00 |
| description | Kdb+ IPC codec library for handling kdb+ wire protocol data with Rust. |
| homepage | |
| repository | https://github.com/yshing/kdb_codec |
| max_upload_size | |
| id | 2005253 |
| size | 713,037 |
Cancellation-safe Rust codec + client for the kdb+ IPC wire protocol (q/kdb+).
Docs: https://yshing.github.io/kdb_codec/
tokio-util::codec::Framed-18!/-19!)QStream) and lower-level codec API (KdbCodec)K value type for building/inspecting q objectsuse futures::{SinkExt, StreamExt};
use kdb_codec::*;
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
#[tokio::main]
async fn main() -> Result<()> {
let stream = TcpStream::connect("127.0.0.1:5000").await?;
let mut framed = Framed::new(stream, KdbCodec::new(true));
let query = K::new_string("1+1".to_string(), 0);
framed
.send(KdbMessage::new(qmsg_type::synchronous, query))
.await?;
if let Some(Ok(resp)) = framed.next().await {
println!("{}", resp.payload);
}
Ok(())
}
This project prioritizes safety when decoding untrusted IPC bytes (no panics, no OOM).
### Table Column Access
Access table columns by name using string indices:
```rust
use kdb_codec::*;
// Create a table
let table = k!(table: {
"fruit" => k!(sym: vec!["apple", "banana", "cherry"]),
"price" => k!(float: vec![1.5, 2.3, 3.8]),
"quantity" => k!(long: vec![100, 150, 75])
});
// Access columns by name
let fruits = &table["fruit"];
let prices = &table["price"];
let quantities = &table["quantity"];
println!("Fruits: {}", fruits); // `apple`banana`cherry
println!("Prices: {}", prices); // 1.5 2.3 3.8
println!("Quantities: {}", quantities); // 100 150 75
// Mutable access
let mut table = k!(table: {
"price" => k!(float: vec![1.5, 2.3])
});
table["price"] = k!(float: vec![2.0, 2.5]); // Update prices
For production code, use the safe try_* methods that return Result instead of panicking:
use kdb_codec::*;
let dict = k!(dict: k!(sym: vec!["x", "y"]) => k!(long: vec![10, 20]));
// Safe dictionary access
match dict.try_index(0) {
Ok(keys) => println!("Keys: {}", keys),
Err(e) => eprintln!("Error: {:?}", e),
}
// Try accessing out of bounds - won't panic
if dict.try_index(2).is_err() {
println!("Index 2 is out of bounds");
}
// Safe table column access
let table = k!(table: {
"name" => k!(sym: vec!["Alice", "Bob"])
});
match table.try_column("name") {
Ok(col) => println!("Names: {}", col),
Err(_) => println!("Column not found"),
}
// Check if column exists before accessing
if table.try_column("nonexistent").is_err() {
println!("Column 'nonexistent' not found");
}
Access elements in compound (heterogeneous) lists:
use kdb_codec::*;
let list = k!([
k!(long: 100),
k!(float: 3.14),
k!(sym: "hello"),
k!(bool: vec![true, false, true])
]);
// Safe access to list elements
if let Ok(first) = list.try_index(0) {
println!("First element: {}", first); // 100
}
if let Ok(second) = list.try_index(1) {
println!("Second element: {}", second); // 3.14
}
Benefits:
[] syntax familiar to Rust developers[]) and safe (try_*) variants availableSee examples/index_trait_demo.rs for more examples.
As for connect method, usually client interfaces of q/kdb+ do not provide a listener due to its protocol. However, sometimes Rust process is connecting to an upstream and q/kdb+ starts afterward or is restarted more frequently. Then providing a listener method is a natural direction and it was achieved here. Following ways are supported to connect to kdb+:
Furthermore, in order to improve inter-operatability some casting, getter and setter methods are provided.
This crate uses q-native or crate-specific environmental variables.
KDBPLUS_ACCOUNT_FILE: A file path to a credential file which an acceptor loads in order to manage access from a q client. This file contains a user name and SHA-1 hashed password in each line which are delimited by ':' without any space. For example, a file containing two credentials "mattew:oracle" and "reluctant:slowday" looks like this:
mattew:431364b6450fc47ccdbf6a2205dfdb1baeb79412
reluctant:d03f5cc1cdb11a77410ee34e26ca1102e67a893c
The hashed password can be generated with q using a function .Q.sha1:
q).Q.sha1 "slowday"
0xd03f5cc1cdb11a77410ee34e26ca1102e67a893c
KDBPLUS_TLS_KEY_FILE and KDBPLUS_TLS_KEY_FILE_SECRET: The pkcs12 file and its password which TLS acceptor uses.
QUDSPATH (optional): q-native environmental variable to define an astract namespace. This environmental variable is used by UDS acceptor too. The abstract nameapace will be @${QUDSPATH}/kx.[server process port] if this environmental variable is defined; otherwise it will be @/tmp/kx.[server process port].
Notes:
KX_SSL_CERT_FILE and KX_SSL_KEY_FILE on q side to make q/kdb+ to work as a TLS server. For details, see the KX website.All types are expressed as K struct which is quite similar to the K struct of api module but its structure is optimized for IPC
usage and for the convenience to interact with. The table below shows the input types of each q type which is used to construct K object.
Note that the input type can be different from the inner type. For example, timestamp has an input type of chrono::DateTime<Utc> but
the inner type is i64 denoting an elapsed time in nanoseconds since 2000.01.01D00:00:00.
| q | Rust |
|---|---|
bool |
bool |
GUID |
[u8; 16] |
byte |
u8 |
short |
i16 |
int |
i32 |
long |
i64 |
real |
f32 |
float |
f64 |
char |
char |
symbol |
String |
timestamp |
chrono::DateTime<Utc> |
month |
chrono::NaiveDate |
date |
chrono::NaiveDate |
datetime |
chrono::DateTime<Utc> |
timespan |
chrono::Duration |
minute |
chrono::Duration |
second |
chrono::Duration |
time |
chrono::Duration |
list |
Vec<Item> (Item is a corrsponding type above) |
compound list |
Vec<K> |
table |
Vec<K> |
dictionary |
Vec<K> |
null |
() |
use kdb_codec::*;
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> Result<()> {
// Connect to qprocess running on localhost:5000 via UDS
let mut socket = QStream::connect(ConnectionMethod::UDS, "", 5000_u16, "ideal:person").await?;
println!("Connection type: {}", socket.get_connection_type());
// Set remote function with asynchronous message
socket.send_async_message(&"collatz:{[n] seq:enlist n; while[not n = 1; seq,: n:$[n mod 2; 1 + 3 * n; `long$n % 2]]; seq}").await?;
// Send a query synchronously
let mut result = socket.send_sync_message(&"collatz[12]").await?;
println!("collatz[12]: {}", result);
result = socket.send_sync_message(&"collatz[`a]").await?;
println!("collatz[`a]: {}", result);
// Send a functional query.
let mut message = K::new_compound_list(vec![K::new_symbol(String::from("collatz")), K::new_long(100)]);
result = socket.send_sync_message(&message).await?;
println!("collatz[100]: {}", result);
// Modify query to (`collatz; 20)
message.pop().unwrap();
message.push(&K::new_long(20)).unwrap();
result=socket.send_sync_message(&message).await?;
println!("collatz[20]: {}", result);
// Send a functional asynchronous query.
message = K::new_compound_list(vec![K::new_string(String::from("show"), qattribute::NONE), K::new_symbol(String::from("goodbye"))]);
socket.send_async_message(&message).await?;
socket.shutdown().await?;
Ok(())
}
use kdb_codec::*;
#[tokio::main]
async fn main() -> Result<()> {
// Start listenening over TCP at the port 7000 with authentication enabled.
let mut socket_tcp = QStream::accept(ConnectionMethod::TCP, "127.0.0.1", 7000).await?;
// Send a query with the socket.
let greeting = socket_tcp.send_sync_message(&"string `Hello").await?;
println!("Greeting: {}", greeting);
socket_tcp.shutdown().await?;
Ok(())
}
Then q client can connect to this acceptor with the acceptor's host, port and the credential configured in KDBPLUS_ACCOUNT_FILE:
q)h:hopen `::7000:reluctant:slowday
The core innovation of this library is its use of tokio-util::codec::Framed which provides automatic buffer management:
This is critical for production systems using patterns like:
tokio::select! for timeouts or concurrent operationsUnlike the original kdbplus crate, we use synchronous deserialization without async-recursion:
async-recursion proc-macro dependencyThe deserialization happens in deserialize_sync.rs and is called from the codec's decode() method after the complete message is buffered.
split() to QStream?While we show how to split the underlying Framed stream in examples, we don't recommend adding a split() method directly to QStream because:
Protocol Semantics: KDB+ IPC is request-response oriented. Splitting would allow sending multiple requests before receiving responses, which can confuse message correlation.
Complexity: Users would need to manually track which response corresponds to which request.
Better Alternatives: For concurrent operations, use multiple QStream instances or the lower-level Framed API directly when you need full control.
If you need independent send/receive channels, access the underlying stream:
let stream = TcpStream::connect("127.0.0.1:5000").await?;
let framed = Framed::new(stream, KdbCodec::new(true));
let (writer, reader) = framed.split();
// Now you have full control
Add kdb_codec to your Cargo.toml:
[dependencies]
kdb_codec = "0.4"
The IPC feature is enabled by default.
Run the standard unit tests (no kdb+ server required):
cargo test --package kdb_codec --lib --tests
Some tests require a running kdb+ server and are marked as #[ignore] by default. To run these tests:
Start a kdb+ server on localhost:5001 with credentials kdbuser:pass:
q -p 5001 -u ./tests/test_kdb_passwd
Run the ignored tests:
cargo test --package kdb_codec --tests -- --ignored
cargo test --package kdb_codec --test e2e_acceptor -- e2e_q_script_to_rust_acceptor_echo_roundtrip --exact --nocapture --ignored
The integration tests include:
functional_message_test: Tests various message types and operationscompression_test: Tests compression functionality with large dataNote: These tests are automatically skipped in CI/CD unless a kdb+ server is explicitly configured.
The full API documentation is available on docs.rs/kdb_codec.
For details of the kdb+ IPC protocol, see:
This library is licensed under Apache-2.0.
See LICENSE.