| Crates.io | khonsu |
| lib.rs | khonsu |
| version | 0.1.0 |
| created_at | 2025-05-13 13:43:52.263981+00 |
| updated_at | 2025-05-13 13:43:52.263981+00 |
| description | Khonsu: Software Transactional Memory in Rust for Apache Arrow |
| homepage | https://github.com/psila-ai/khonsu |
| repository | https://github.com/psila-ai/khonsu |
| max_upload_size | |
| id | 1671932 |
| size | 15,026 |
A high-performance Software Transactional Memory (STM) library for Rust, designed for concurrent data access and manipulation using Arrow RecordBatches. Khonsu supports both standalone and distributed operation modes.
To use Khonsu in your Rust project, add it as a dependency in your Cargo.toml:
# Basic usage (without distributed features)
[dependencies]
khonsu = { git = "https://github.com/psila-ai/khonsu.git" }
# With distributed features enabled
[dependencies]
khonsu = { git = "https://github.com/psila-ai/khonsu.git", features = ["distributed"] }
Here's a small example demonstrating basic usage without distributed features:
use khonsu::prelude::*;
use std::sync::Arc;
use arrow::array::{Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
// Define a simple in-memory storage implementation
#[derive(Default)]
struct MockStorage;
impl Storage for MockStorage {
fn apply_mutations(&self, _mutations: Vec<StorageMutation>) -> Result<()> {
// In a real implementation, this would write to durable storage
Ok(())
}
}
fn main() -> Result<()> {
// 1. Set up Khonsu with a storage implementation
let storage = Arc::new(MockStorage::default());
let khonsu = Khonsu::new(
storage,
TransactionIsolation::Serializable, // Choose your desired isolation level
ConflictResolution::Fail, // Choose your desired conflict resolution
);
// Define a simple schema and record batch
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("value", DataType::Int64, false),
]));
let id_array = Arc::new(StringArray::from(vec!["key1"]));
let value_array = Arc::new(Int64Array::from(vec![100]));
let record_batch =
RecordBatch::try_new(schema.clone(), vec![id_array, value_array]).unwrap();
// 2. Start a transaction
let mut txn = khonsu.start_transaction();
// 3. Perform operations within the transaction
txn.write("my_data_key".to_string(), record_batch.clone())?;
println!("Wrote data in transaction {}", txn.id());
// 4. Attempt to commit the transaction
match txn.commit() {
Ok(()) => println!("Transaction {} committed successfully.", txn.id()),
Err(e) => {
eprintln!("Transaction {} failed to commit: {:?}", txn.id(), e);
// Handle conflict or other error
// Note: Rollback is automatic on drop if commit fails
}
}
// 5. Read the data in a new transaction
let mut read_txn = khonsu.start_transaction();
if let Some(read_batch) = read_txn.read(&"my_data_key".to_string())? {
println!("Read data in transaction {}: {:?}", read_txn.id(), read_batch);
assert_eq!(*read_batch, record_batch); // Verify read data matches original
} else {
println!("Data not found in transaction {}", read_txn.id());
}
Ok(())
}
When the distributed feature is enabled, Khonsu can operate in a distributed mode across multiple nodes. Here's how to set up and use Khonsu in a distributed environment:
use khonsu::prelude::*;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use arrow::array::{Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
// Define a simple storage implementation
#[derive(Default)]
struct MockStorage;
impl Storage for MockStorage {
fn apply_mutations(&self, _mutations: Vec<StorageMutation>) -> Result<()> {
// In a real implementation, this would write to durable storage
Ok(())
}
}
fn main() -> Result<()> {
// 1. Set up a distributed Khonsu node
// Define the node's identity and cluster configuration
let node_id = 1; // This node's ID
let cluster_config = ClusterConfig {
configuration_id: 1,
nodes: vec![1, 2, 3], // A 3-node cluster
flexible_quorum: None,
};
// Define peer addresses for gRPC communication
let mut peer_addrs = HashMap::new();
peer_addrs.insert(2, "127.0.0.1:50053".to_string()); // Node 2 uses port 50053 (50051 + 2)
peer_addrs.insert(3, "127.0.0.1:50054".to_string()); // Node 3 uses port 50054 (50051 + 3)
// Create a storage path for the distributed commit log
let storage_path = PathBuf::from("/tmp/khonsu-node1");
// Create the distributed configuration
let dist_config = KhonsuDistConfig {
node_id,
cluster_config,
peer_addrs,
storage_path,
};
// Create the Khonsu instance with distributed capabilities
let storage = Arc::new(MockStorage::default());
let khonsu = Khonsu::new(
storage,
TransactionIsolation::ReadCommitted, // Only ReadCommitted is fully supported in distributed mode
ConflictResolution::Fail,
Some(dist_config), // Pass the distributed configuration
);
// 2. Create a record batch to store
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("value", DataType::Int64, false),
]));
let id_array = Arc::new(StringArray::from(vec!["key1"]));
let value_array = Arc::new(Int64Array::from(vec![100]));
let record_batch =
RecordBatch::try_new(schema.clone(), vec![id_array, value_array]).unwrap();
// 3. Start a distributed transaction
let mut txn = khonsu.start_transaction();
// 4. Perform operations within the transaction
txn.write("distributed_key".to_string(), record_batch.clone())?;
println!("Wrote data in distributed transaction {}", txn.id());
// 5. Commit the transaction - this will be replicated to all nodes in the cluster
match txn.commit() {
Ok(()) => println!("Transaction {} committed and replicated to all nodes", txn.id()),
Err(e) => {
eprintln!("Transaction {} failed: {:?}", txn.id(), e);
// Handle distributed commit error
}
}
// 6. Read the data in a new transaction
let mut read_txn = khonsu.start_transaction();
if let Some(read_batch) = read_txn.read(&"distributed_key".to_string())? {
println!("Read data from distributed storage: {:?}", read_batch);
} else {
println!("Data not found in distributed storage");
}
Ok(())
}
When operating in distributed mode, Khonsu uses a combination of techniques to ensure data consistency across nodes:
Each node in the Khonsu cluster requires a unique node ID and communicates over gRPC. The port allocation follows a simple convention:
For example:
When configuring peer addresses in your distributed setup, you need to specify the full address including the calculated port:
let mut peer_addrs = HashMap::new();
peer_addrs.insert(2, "127.0.0.1:50053".to_string()); // Node 2 uses port 50053
peer_addrs.insert(3, "127.0.0.1:50054".to_string()); // Node 3 uses port 50054
This deterministic port allocation scheme ensures that each node in the cluster gets a unique port without requiring complex port negotiation.
While Khonsu supports multiple isolation levels, there are some considerations for distributed operation:
Khonsu's distributed mode is designed to be crash-resistant. When a node crashes and restarts:
To build and test Khonsu:
# Build with default features (no distributed capabilities)
cargo build
# Build with distributed features
cargo build --features distributed
# Run tests (single-threaded to avoid test interference)
cargo test -- --test-threads=1
# Run distributed tests
cargo test --features distributed -- --test-threads=1
Below are the research papers that inspired Khonsu, along with the various serializable checking mechanisms implemented within it.
@inproceedings{10.1145/3492321.3519561,
author = {Shen, Weihai and Khanna, Ansh and Angel, Sebastian and Sen, Siddhartha and Mu, Shuai},
title = {Rolis: a software approach to efficiently replicating multi-core transactions},
year = {2022},
isbn = {9781450391627},
publisher = {Association for Computing Machinery},
address = {New York, NY, USA},
url = {https://doi.org/10.1145/3492321.3519561},
doi = {10.1145/3492321.3519561},
booktitle = {Proceedings of the Seventeenth European Conference on Computer Systems},
pages = {69–84},
numpages = {16},
keywords = {concurrency, distributed systems, multicore},
location = {Rennes, France},
series = {EuroSys '22}
}
@inproceedings{bailis2014highly,
title={Highly available transactions: Virtues and limitations},
author={Bailis, Peter and Ghodsi, Ali and Hellerstein, Joseph M and Stoica, Ion},
booktitle={Proceedings of the VLDB Endowment},
volume={7},
number={3},
pages={245--256},
year={2014},
organization={VLDB Endowment}
}
@article{fekete2005serializable,
title={Serializable isolation for snapshot databases},
author={Fekete, Alan and Greenwood, David and Kingston, Maurice and Rice, Jeff and Storage, Andrew},
journal={Proc. 29th VLDB Endowment},
volume={32},
pages={12},
year={2005}
}
@article{herlihy2003composable,
title={Composable memory transactions},
author={Herlihy, Maurice and Luchangco, Victor and Moir, Mark and Scherer, William N},
journal={ACM SIGPLAN Notices},
volume={38},
number={10},
pages={80--96},
year={2003},
publisher={ACM}
}
All rights reserved.