kdb_codec

Crates.iokdb_codec
lib.rskdb_codec
version1.1.0
created_at2025-12-26 04:14:06.318519+00
updated_at2026-01-13 18:28:32.691747+00
descriptionKdb+ IPC codec library for handling kdb+ wire protocol data with Rust.
homepage
repositoryhttps://github.com/yshing/kdb_codec
max_upload_size
id2005253
size713,037
Shing. (yshing)

documentation

https://docs.rs/kdb_codec

README

kdb_codec - Kdb+ IPC Codec Library

Tests

Cancellation-safe Rust codec + client for the kdb+ IPC wire protocol (q/kdb+).

Docs: https://yshing.github.io/kdb_codec/

Key features

  • Cancellation-safe message framing via tokio-util::codec::Framed
  • IPC encode/decode, including kdb+ compression (-18!/-19!)
  • Async client API (QStream) and lower-level codec API (KdbCodec)
  • Multiple connection methods: TCP / TLS / Unix Domain Socket
  • Ergonomic K value type for building/inspecting q objects

Quick start

use futures::{SinkExt, StreamExt};
use kdb_codec::*;
use tokio::net::TcpStream;
use tokio_util::codec::Framed;

#[tokio::main]
async fn main() -> Result<()> {
    let stream = TcpStream::connect("127.0.0.1:5000").await?;
    let mut framed = Framed::new(stream, KdbCodec::new(true));

    let query = K::new_string("1+1".to_string(), 0);
    framed
        .send(KdbMessage::new(qmsg_type::synchronous, query))
        .await?;

    if let Some(Ok(resp)) = framed.next().await {
        println!("{}", resp.payload);
    }
    Ok(())
}

Datatype coverage (IPC)

This project prioritizes safety when decoding untrusted IPC bytes (no panics, no OOM).

  • Supported: basic atoms/lists (0–19), mixed lists, table (98), dictionary (99/127), null (101), error (-128)
  • Not supported: enums (20–76), nested/other types (77+), function/derived types (100–112), foreign (112) let mut dict = k!(dict: k!(sym: vec!["x"]) => k!(long: vec![42])); dict[1] = k!(long: vec![100]); // Replace values

### Table Column Access

Access table columns by name using string indices:

```rust
use kdb_codec::*;

// Create a table
let table = k!(table: {
    "fruit" => k!(sym: vec!["apple", "banana", "cherry"]),
    "price" => k!(float: vec![1.5, 2.3, 3.8]),
    "quantity" => k!(long: vec![100, 150, 75])
});

// Access columns by name
let fruits = &table["fruit"];
let prices = &table["price"];
let quantities = &table["quantity"];

println!("Fruits: {}", fruits);        // `apple`banana`cherry
println!("Prices: {}", prices);        // 1.5 2.3 3.8
println!("Quantities: {}", quantities); // 100 150 75

// Mutable access
let mut table = k!(table: {
    "price" => k!(float: vec![1.5, 2.3])
});
table["price"] = k!(float: vec![2.0, 2.5]);  // Update prices

Safe Access Methods

For production code, use the safe try_* methods that return Result instead of panicking:

use kdb_codec::*;

let dict = k!(dict: k!(sym: vec!["x", "y"]) => k!(long: vec![10, 20]));

// Safe dictionary access
match dict.try_index(0) {
    Ok(keys) => println!("Keys: {}", keys),
    Err(e) => eprintln!("Error: {:?}", e),
}

// Try accessing out of bounds - won't panic
if dict.try_index(2).is_err() {
    println!("Index 2 is out of bounds");
}

// Safe table column access
let table = k!(table: {
    "name" => k!(sym: vec!["Alice", "Bob"])
});

match table.try_column("name") {
    Ok(col) => println!("Names: {}", col),
    Err(_) => println!("Column not found"),
}

// Check if column exists before accessing
if table.try_column("nonexistent").is_err() {
    println!("Column 'nonexistent' not found");
}

Compound List Access

Access elements in compound (heterogeneous) lists:

use kdb_codec::*;

let list = k!([
    k!(long: 100),
    k!(float: 3.14),
    k!(sym: "hello"),
    k!(bool: vec![true, false, true])
]);

// Safe access to list elements
if let Ok(first) = list.try_index(0) {
    println!("First element: {}", first);  // 100
}

if let Ok(second) = list.try_index(1) {
    println!("Second element: {}", second); // 3.14
}

Benefits:

  • ✅ Ergonomic [] syntax familiar to Rust developers
  • ✅ Type-safe with compile-time borrow checking
  • ✅ Both panicking ([]) and safe (try_*) variants available
  • ✅ Works seamlessly with mutable access
  • ✅ Supports dictionaries, tables, and compound lists

See examples/index_trait_demo.rs for more examples.

Connection Methods

As for connect method, usually client interfaces of q/kdb+ do not provide a listener due to its protocol. However, sometimes Rust process is connecting to an upstream and q/kdb+ starts afterward or is restarted more frequently. Then providing a listener method is a natural direction and it was achieved here. Following ways are supported to connect to kdb+:

  • TCP
  • TLS
  • Unix domain socket

Furthermore, in order to improve inter-operatability some casting, getter and setter methods are provided.

Environmental Variables

This crate uses q-native or crate-specific environmental variables.

  • KDBPLUS_ACCOUNT_FILE: A file path to a credential file which an acceptor loads in order to manage access from a q client. This file contains a user name and SHA-1 hashed password in each line which are delimited by ':' without any space. For example, a file containing two credentials "mattew:oracle" and "reluctant:slowday" looks like this:

    mattew:431364b6450fc47ccdbf6a2205dfdb1baeb79412
    reluctant:d03f5cc1cdb11a77410ee34e26ca1102e67a893c
    
    

    The hashed password can be generated with q using a function .Q.sha1:

    q).Q.sha1 "slowday"
    0xd03f5cc1cdb11a77410ee34e26ca1102e67a893c
    
  • KDBPLUS_TLS_KEY_FILE and KDBPLUS_TLS_KEY_FILE_SECRET: The pkcs12 file and its password which TLS acceptor uses.

  • QUDSPATH (optional): q-native environmental variable to define an astract namespace. This environmental variable is used by UDS acceptor too. The abstract nameapace will be @${QUDSPATH}/kx.[server process port] if this environmental variable is defined; otherwise it will be @/tmp/kx.[server process port].

Notes:

  • Messages will be sent with OS native endian.
  • When using this crate for a TLS client you need to set two environmental variables KX_SSL_CERT_FILE and KX_SSL_KEY_FILE on q side to make q/kdb+ to work as a TLS server. For details, see the KX website.

Type Mapping

All types are expressed as K struct which is quite similar to the K struct of api module but its structure is optimized for IPC usage and for the convenience to interact with. The table below shows the input types of each q type which is used to construct K object. Note that the input type can be different from the inner type. For example, timestamp has an input type of chrono::DateTime<Utc> but the inner type is i64 denoting an elapsed time in nanoseconds since 2000.01.01D00:00:00.

q Rust
bool bool
GUID [u8; 16]
byte u8
short i16
int i32
long i64
real f32
float f64
char char
symbol String
timestamp chrono::DateTime<Utc>
month chrono::NaiveDate
date chrono::NaiveDate
datetime chrono::DateTime<Utc>
timespan chrono::Duration
minute chrono::Duration
second chrono::Duration
time chrono::Duration
list Vec<Item> (Item is a corrsponding type above)
compound list Vec<K>
table Vec<K>
dictionary Vec<K>
null ()

Examples

Client

use kdb_codec::*;

#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> Result<()> {

    // Connect to qprocess running on localhost:5000 via UDS
    let mut socket = QStream::connect(ConnectionMethod::UDS, "", 5000_u16, "ideal:person").await?;
    println!("Connection type: {}", socket.get_connection_type());

    // Set remote function with asynchronous message
    socket.send_async_message(&"collatz:{[n] seq:enlist n; while[not n = 1; seq,: n:$[n mod 2; 1 + 3 * n; `long$n % 2]]; seq}").await?;

    // Send a query synchronously
    let mut result = socket.send_sync_message(&"collatz[12]").await?;
    println!("collatz[12]: {}", result);

    result = socket.send_sync_message(&"collatz[`a]").await?;
    println!("collatz[`a]: {}", result);

    // Send a functional query.
    let mut message = K::new_compound_list(vec![K::new_symbol(String::from("collatz")), K::new_long(100)]);
    result = socket.send_sync_message(&message).await?;
    println!("collatz[100]: {}", result);

    // Modify query to (`collatz; 20)
    message.pop().unwrap();
    message.push(&K::new_long(20)).unwrap();
    result=socket.send_sync_message(&message).await?;
    println!("collatz[20]: {}", result);

    // Send a functional asynchronous query.
    message = K::new_compound_list(vec![K::new_string(String::from("show"), qattribute::NONE), K::new_symbol(String::from("goodbye"))]);
    socket.send_async_message(&message).await?;

    socket.shutdown().await?;

    Ok(())
}

Listener

use kdb_codec::*;

#[tokio::main]
async fn main() -> Result<()> {

  // Start listenening over TCP at the port 7000 with authentication enabled.
  let mut socket_tcp = QStream::accept(ConnectionMethod::TCP, "127.0.0.1", 7000).await?;

  // Send a query with the socket.
  let greeting = socket_tcp.send_sync_message(&"string `Hello").await?;
  println!("Greeting: {}", greeting);

  socket_tcp.shutdown().await?;

  Ok(())
}

Then q client can connect to this acceptor with the acceptor's host, port and the credential configured in KDBPLUS_ACCOUNT_FILE:

q)h:hopen `::7000:reluctant:slowday

Architecture & Design

Cancellation Safety

The core innovation of this library is its use of tokio-util::codec::Framed which provides automatic buffer management:

  • Buffer Preservation: Partial reads are stored in the codec's internal buffer
  • Resumable Operations: Cancelled reads can be safely retried without data loss
  • No Manual State Management: The Framed wrapper handles all buffer lifecycle

This is critical for production systems using patterns like:

  • tokio::select! for timeouts or concurrent operations
  • Graceful shutdown with cancellation
  • Request racing or fallback logic

Synchronous Deserialization

Unlike the original kdbplus crate, we use synchronous deserialization without async-recursion:

  • Simpler: No async recursion complexity
  • Faster: Eliminates async overhead for CPU-bound deserialization
  • Smaller: No async-recursion proc-macro dependency
  • Safer: Avoids potential stack overflow from deep async recursion

The deserialization happens in deserialize_sync.rs and is called from the codec's decode() method after the complete message is buffered.

Why Not Add split() to QStream?

While we show how to split the underlying Framed stream in examples, we don't recommend adding a split() method directly to QStream because:

  1. Protocol Semantics: KDB+ IPC is request-response oriented. Splitting would allow sending multiple requests before receiving responses, which can confuse message correlation.

  2. Complexity: Users would need to manually track which response corresponds to which request.

  3. Better Alternatives: For concurrent operations, use multiple QStream instances or the lower-level Framed API directly when you need full control.

If you need independent send/receive channels, access the underlying stream:

let stream = TcpStream::connect("127.0.0.1:5000").await?;
let framed = Framed::new(stream, KdbCodec::new(true));
let (writer, reader) = framed.split();
// Now you have full control

Installation

Add kdb_codec to your Cargo.toml:

[dependencies]
kdb_codec = "0.4"

The IPC feature is enabled by default.

Testing

Unit Tests

Run the standard unit tests (no kdb+ server required):

cargo test --package kdb_codec --lib --tests

Integration Tests

Some tests require a running kdb+ server and are marked as #[ignore] by default. To run these tests:

  1. Start a kdb+ server on localhost:5001 with credentials kdbuser:pass:

    q -p 5001 -u ./tests/test_kdb_passwd
    
  2. Run the ignored tests:

    cargo test --package kdb_codec --tests -- --ignored
    cargo test --package kdb_codec --test e2e_acceptor -- e2e_q_script_to_rust_acceptor_echo_roundtrip --exact --nocapture --ignored
    

The integration tests include:

  • functional_message_test: Tests various message types and operations
  • compression_test: Tests compression functionality with large data

Note: These tests are automatically skipped in CI/CD unless a kdb+ server is explicitly configured.

Documentation

The full API documentation is available on docs.rs/kdb_codec.

For details of the kdb+ IPC protocol, see:

License

This library is licensed under Apache-2.0.

See LICENSE.

Commit count: 130

cargo fmt