| Crates.io | arrow-zerobus-sdk-wrapper |
| lib.rs | arrow-zerobus-sdk-wrapper |
| version | 0.8.1 |
| created_at | 2025-12-11 12:04:50.367895+00 |
| updated_at | 2025-12-12 12:42:16.046133+00 |
| description | Cross-platform Rust SDK wrapper for Databricks Zerobus with Python bindings |
| homepage | |
| repository | https://github.com/pixie79/arrow-zerobus-sdk-wrapper |
| max_upload_size | |
| id | 1979557 |
| size | 3,168,016 |
Cross-platform Rust SDK wrapper for Databricks Zerobus with Python bindings. Provides a unified API for sending Arrow RecordBatch data to Zerobus with automatic protocol conversion, authentication, retry logic, and observability.
Add to your Cargo.toml:
[dependencies]
arrow-zerobus-sdk-wrapper = { version = "0.1.0", path = "../arrow-zerobus-sdk-wrapper" }
arrow = "57"
tokio = { version = "1.35", features = ["full"] }
pip install arrow-zerobus-sdk-wrapper
Or from source:
pip install -e /path/to/arrow-zerobus-sdk-wrapper
use arrow_zerobus_sdk_wrapper::{ZerobusWrapper, WrapperConfiguration};
use arrow::record_batch::RecordBatch;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = WrapperConfiguration::new(
"https://your-workspace.cloud.databricks.com".to_string(),
"my_table".to_string(),
)
.with_credentials("client_id".to_string(), "client_secret".to_string())
.with_unity_catalog("https://unity-catalog-url".to_string());
let wrapper = ZerobusWrapper::new(config).await?;
let batch = create_record_batch()?;
let result = wrapper.send_batch(batch).await?;
if result.success {
println!("Batch sent successfully!");
// Handle per-row errors (if any)
if result.is_partial_success() {
println!("⚠️ Partial success: {} succeeded, {} failed",
result.successful_count, result.failed_count);
// Extract and quarantine failed rows
if let Some(failed_batch) = result.extract_failed_batch(&batch) {
// Quarantine failed_batch
println!("Quarantining {} failed rows", failed_batch.num_rows());
}
// Extract and write successful rows
if let Some(successful_batch) = result.extract_successful_batch(&batch) {
// Write successful_batch to main table
println!("Writing {} successful rows", successful_batch.num_rows());
}
}
// Analyze error patterns
if result.has_failed_rows() {
let stats = result.get_error_statistics();
println!("Success rate: {:.1}%", stats.success_rate * 100.0);
let grouped = result.group_errors_by_type();
for (error_type, indices) in &grouped {
println!(" {}: {} rows", error_type, indices.len());
}
}
}
wrapper.shutdown().await?;
Ok(())
}
See examples/python_example.py for a complete example.
import asyncio
import pyarrow as pa
from arrow_zerobus_sdk_wrapper import ZerobusWrapper
async def main():
wrapper = ZerobusWrapper(
endpoint="https://your-workspace.cloud.databricks.com",
table_name="my_table",
client_id="client_id",
client_secret="client_secret",
unity_catalog_url="https://unity-catalog-url",
)
# Create Arrow RecordBatch
schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
])
arrays = [
pa.array([1, 2, 3], type=pa.int64()),
pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
]
batch = pa.RecordBatch.from_arrays(arrays, schema=schema)
# Send batch
result = await wrapper.send_batch(batch)
if result.success:
print("Batch sent successfully!")
# Handle per-row errors (if any)
if result.is_partial_success():
print(f"⚠️ Partial success: {result.successful_count} succeeded, {result.failed_count} failed")
# Extract and quarantine failed rows
failed_batch = result.extract_failed_batch(batch)
if failed_batch is not None:
print(f"Quarantining {failed_batch.num_rows} failed rows")
# Extract and write successful rows
successful_batch = result.extract_successful_batch(batch)
if successful_batch is not None:
print(f"Writing {successful_batch.num_rows} successful rows")
# Analyze error patterns
if result.has_failed_rows():
stats = result.get_error_statistics()
print(f"Success rate: {stats['success_rate'] * 100:.1}%")
grouped = result.group_errors_by_type()
for error_type, indices in grouped.items():
print(f" {error_type}: {len(indices)} rows")
await wrapper.shutdown()
asyncio.run(main())
The wrapper supports a "writer disabled" mode that allows you to test data conversion logic and write debug files without making network calls to Zerobus. This is useful for:
Rust:
use arrow_zerobus_sdk_wrapper::{ZerobusWrapper, WrapperConfiguration};
use std::path::PathBuf;
let config = WrapperConfiguration::new(
"https://workspace.cloud.databricks.com".to_string(),
"my_table".to_string(),
)
.with_debug_output(PathBuf::from("./debug_output"))
.with_zerobus_writer_disabled(true); // Enable disabled mode
let wrapper = ZerobusWrapper::new(config).await?;
// No credentials required when writer is disabled
let result = wrapper.send_batch(batch).await?;
// Debug files written, no network calls made
Python:
wrapper = ZerobusWrapper(
endpoint="https://workspace.cloud.databricks.com",
table_name="my_table",
debug_enabled=True,
debug_output_dir="./debug_output",
zerobus_writer_disabled=True, # Enable disabled mode
# No credentials required when writer is disabled
)
result = await wrapper.send_batch(batch)
# Debug files written, no network calls made
Note: When zerobus_writer_disabled is true, at least one debug format must be enabled. Credentials are optional when writer is disabled.
The wrapper supports flexible debug output configuration with independent control over Arrow and Protobuf file generation, automatic file retention, and improved file rotation.
You can enable Arrow and Protobuf debug output independently:
Rust:
use arrow_zerobus_sdk_wrapper::{ZerobusWrapper, WrapperConfiguration};
use std::path::PathBuf;
// Enable only Arrow debug files
let config = WrapperConfiguration::new(
"https://workspace.cloud.databricks.com".to_string(),
"my_table".to_string(),
)
.with_debug_arrow_enabled(true) // Enable Arrow files (.arrows)
.with_debug_protobuf_enabled(false) // Disable Protobuf files
.with_debug_output(PathBuf::from("./debug_output"));
// Enable only Protobuf debug files
let config = WrapperConfiguration::new(...)
.with_debug_arrow_enabled(false)
.with_debug_protobuf_enabled(true) // Enable Protobuf files (.proto)
.with_debug_output(PathBuf::from("./debug_output"));
// Enable both formats
let config = WrapperConfiguration::new(...)
.with_debug_arrow_enabled(true)
.with_debug_protobuf_enabled(true)
.with_debug_output(PathBuf::from("./debug_output"));
Python:
# Enable only Arrow debug files
config = PyWrapperConfiguration(
endpoint="https://workspace.cloud.databricks.com",
table_name="my_table",
debug_arrow_enabled=True, # Enable Arrow files
debug_protobuf_enabled=False, # Disable Protobuf files
debug_output_dir="./debug_output"
)
# Enable only Protobuf debug files
config = PyWrapperConfiguration(
endpoint="https://workspace.cloud.databricks.com",
table_name="my_table",
debug_arrow_enabled=False,
debug_protobuf_enabled=True, # Enable Protobuf files
debug_output_dir="./debug_output"
)
Control how many rotated debug files are retained to manage disk space:
Rust:
// Keep last 20 rotated files per type (default: 10)
let config = WrapperConfiguration::new(...)
.with_debug_arrow_enabled(true)
.with_debug_output(PathBuf::from("./debug_output"))
.with_debug_max_files_retained(Some(20));
// Unlimited retention (no automatic cleanup)
let config = WrapperConfiguration::new(...)
.with_debug_arrow_enabled(true)
.with_debug_output(PathBuf::from("./debug_output"))
.with_debug_max_files_retained(None);
Python:
# Keep last 20 rotated files per type
config = PyWrapperConfiguration(
endpoint="https://workspace.cloud.databricks.com",
table_name="my_table",
debug_arrow_enabled=True,
debug_output_dir="./debug_output",
debug_max_files_retained=20 # Default: 10, None = unlimited
)
# Unlimited retention
config = PyWrapperConfiguration(
endpoint="https://workspace.cloud.databricks.com",
table_name="my_table",
debug_arrow_enabled=True,
debug_output_dir="./debug_output",
debug_max_files_retained=None # No automatic cleanup
)
debug:
arrow_enabled: true # Enable Arrow debug files
protobuf_enabled: false # Disable Protobuf debug files
output_dir: "/tmp/debug"
max_files_retained: 20 # Keep last 20 rotated files (default: 10)
flush_interval_secs: 5
max_file_size: 10485760
export DEBUG_ARROW_ENABLED=true
export DEBUG_PROTOBUF_ENABLED=false
export DEBUG_OUTPUT_DIR=/tmp/debug
export DEBUG_MAX_FILES_RETAINED=20
export DEBUG_FLUSH_INTERVAL_SECS=5
The legacy debug_enabled flag still works. When set to true and new flags are not explicitly set, both Arrow and Protobuf formats are enabled:
// Legacy code - still works, enables both formats
let config = WrapperConfiguration::new(...)
.with_debug_output(PathBuf::from("./debug_output"));
// Note: debug_enabled must be set to true explicitly; with_debug_output() does not enable debugging by itself
# Legacy Python code - still works
config = PyWrapperConfiguration(
endpoint="https://workspace.cloud.databricks.com",
table_name="my_table",
debug_enabled=True, # Enables both Arrow and Protobuf when new flags not set
debug_output_dir="./debug_output"
)
File rotation has been improved to prevent recursive timestamp appending and filename length errors:
file_20250101_120000_20250101_120001_1, _2, etc.) is used instead of timestampsThe wrapper automatically validates and enforces Zerobus service limits to prevent API errors and ensure compatibility:
// Records exceeding 4MB will be rejected with a clear error
let result = wrapper.send_batch(large_batch).await?;
if !result.success {
// Check for size limit errors in failed_rows
for (idx, error) in &result.failed_rows {
if let ZerobusError::ConversionError(msg) = error {
if msg.contains("exceeds Zerobus limit") {
println!("Row {} exceeds 4MB limit", idx);
}
}
}
}
table (simple name)schema.table (2-part name)catalog.schema.table (3-part name).) are allowed as separators between parts// Valid Unity Catalog table names
let config1 = WrapperConfiguration::new(
"https://workspace.cloud.databricks.com".to_string(),
"my_table".to_string(), // ✅ Valid: simple table name
);
let config2 = WrapperConfiguration::new(
"https://workspace.cloud.databricks.com".to_string(),
"my_schema.my_table".to_string(), // ✅ Valid: schema.table format
);
let config3 = WrapperConfiguration::new(
"https://workspace.cloud.databricks.com".to_string(),
"my_catalog.my_schema.my_table".to_string(), // ✅ Valid: catalog.schema.table format
);
// Invalid table names will be rejected
let config4 = WrapperConfiguration::new(
"https://workspace.cloud.databricks.com".to_string(),
"table-name".to_string(), // ❌ Invalid: hyphen in part
);
let config5 = WrapperConfiguration::new(
"https://workspace.cloud.databricks.com".to_string(),
"schema..table".to_string(), // ❌ Invalid: double dot
);
let result = config4.validate();
// Returns ConfigurationError with specific part that failed validation
The wrapper ensures correct type mappings per Zerobus specification:
Int32 (days since epoch) ✅Int64 (milliseconds since epoch)Int64 (microseconds since epoch) ✅Int32 or Int64 as appropriate ✅String ✅Bytes ✅repeated TYPE ✅message Nested { FIELDS } ✅All type mappings are validated to ensure compatibility with Zerobus requirements.
cargo build --release
pip install maturin
maturin build --release
pip install target/wheels/*.whl
Note: When running tests with the python feature enabled, PyO3 needs to link against the Python library. You can either:
Use the helper script (recommended):
./scripts/test.sh --all-features
Set PYO3_PYTHON manually:
# Find your Python executable
which python3.11 || which python3
# Run tests with Python feature
PYO3_PYTHON=/path/to/python3 cargo test --all-features
The helper script automatically detects and uses Python 3.11+ from your PATH.
# Run all tests (without Python feature)
cargo test
# Run all tests including Python bindings
./scripts/test.sh --all-features
# Run with coverage (requires cargo-tarpaulin)
cargo tarpaulin --out Xml --output-dir coverage
Note: Python tests require the Python extension to be built first. The tests use pytest-forked to work around PyO3 GIL issues that can cause pytest to hang after tests.
# Recommended: Use the helper script (handles setup automatically)
./scripts/test-python.sh
# Manual setup:
# 1. Build Python extension
maturin develop --release
# 2. Install test dependencies
pip install pytest pytest-cov pytest-forked
# 3. Run tests with PyO3 workaround
export PYO3_NO_PYTHON_VERSION_CHECK=1
pytest tests/python/ -v --forked
# Run with coverage
pytest --cov=arrow_zerobus_sdk_wrapper tests/python/ --forked
PyO3 Pytest Workaround: The --forked flag ensures each test runs in a separate process, preventing GIL (Global Interpreter Lock) deadlocks that can cause pytest to hang. The conftest.py file includes additional fixtures to ensure proper Python initialization and cleanup.
# Run latency benchmarks
cargo bench --bench latency
# Run throughput benchmarks
cargo bench --bench throughput
When debug output is enabled, the wrapper writes Arrow and Protobuf files to disk for inspection. You can use DuckDB to read and analyze these files using the Arrow IPC support in DuckDB.
First, install and load the DuckDB Arrow community extension:
INSTALL arrow FROM community;
LOAD arrow;
Arrow files are written in Arrow IPC stream format (.arrow or .arrows extension) and can be read directly by DuckDB:
-- Read Arrow IPC file using read_arrow() function
SELECT * FROM read_arrow('debug_output/zerobus/arrow/table.arrow');
-- DuckDB supports replacement scans - you can omit read_arrow()
-- if the filename ends with .arrow or .arrows
SELECT * FROM 'debug_output/zerobus/arrow/table.arrow';
-- Read multiple Arrow files (including rotated files)
SELECT * FROM read_arrow('debug_output/zerobus/arrow/*.arrow');
-- Query specific columns
SELECT id, name, score
FROM 'debug_output/zerobus/arrow/table.arrow'
WHERE score > 90;
-- Aggregate data
SELECT
COUNT(*) as total_rows,
AVG(score) as avg_score,
MAX(score) as max_score
FROM 'debug_output/zerobus/arrow/table.arrow';
You can also read Arrow IPC files over HTTP using DuckDB's httpfs extension:
INSTALL httpfs;
LOAD httpfs;
LOAD arrow;
-- Read Arrow IPC file from HTTP server
SELECT * FROM read_arrow('http://localhost:8008/table.arrow');
You can pipe Arrow IPC data directly to DuckDB:
# Using curl to fetch and pipe to DuckDB
URL="http://localhost:8008/table.arrow"
SQL="LOAD arrow; FROM read_arrow('/dev/stdin') SELECT count(*);"
curl -s "$URL" | duckdb -c "$SQL"
-- Load the Arrow extension
LOAD arrow;
-- Read all Arrow files from debug output
SELECT
COUNT(*) as total_rows,
COUNT(DISTINCT file_name) as num_files
FROM read_arrow('debug_output/zerobus/arrow/*.arrow');
-- Analyze specific data
SELECT
id,
name,
score,
COUNT(*) OVER () as total_count
FROM 'debug_output/zerobus/arrow/table.arrow'
WHERE score > 90
ORDER BY score DESC;
import duckdb
# Connect to DuckDB
conn = duckdb.connect()
# Install and load Arrow extension
conn.execute("INSTALL arrow FROM community")
conn.execute("LOAD arrow")
# Read Arrow debug files (using replacement scan)
result = conn.execute("""
SELECT *
FROM 'debug_output/zerobus/arrow/table.arrow'
LIMIT 100
""").fetchdf()
print(result)
# Analyze data across multiple files
stats = conn.execute("""
SELECT
COUNT(*) as total_rows,
COUNT(DISTINCT file_name) as num_files
FROM read_arrow('debug_output/zerobus/arrow/*.arrow')
""").fetchdf()
print(stats)
Protobuf files contain binary Protobuf messages. To read them with DuckDB, you'll need to:
For Protobuf files, you can use Python to convert to Arrow IPC format, then read with DuckDB:
import duckdb
import pyarrow as pa
from google.protobuf.message import Message
# Read Protobuf file and convert to Arrow IPC
# (This requires knowledge of your Protobuf schema)
def protobuf_to_arrow_ipc(proto_file, schema):
# Parse Protobuf messages and convert to Arrow
# Then write as Arrow IPC format
# Implementation depends on your specific Protobuf schema
pass
# Convert Protobuf to Arrow IPC file
protobuf_to_arrow_ipc('debug_output/zerobus/proto/table.proto', schema)
# Then use DuckDB to read the converted Arrow IPC file
conn = duckdb.connect()
conn.execute("INSTALL arrow FROM community")
conn.execute("LOAD arrow")
result = conn.execute("SELECT * FROM 'converted_table.arrow'").fetchdf()
Alternatively, if you have the Protobuf schema definition, you can use tools like protoc to generate code for parsing, then convert to Arrow IPC format that DuckDB can read.
.arrows).arrow or .arrows extensions directly (replacement scans)*.arrow)table_20251212_143022.arrows) can be read using glob patternsdebug_max_files_retained setting (default: 10 files per type)wrapper.flush() before readingFor more details, see the official DuckDB Arrow IPC support documentation.
The repository includes a pre-commit hook to ensure version consistency across all configuration files. Before each commit, the hook verifies that version numbers match in:
Cargo.tomlpyproject.tomlCHANGELOG.md (latest release)To install the pre-commit hook:
./scripts/install_pre_commit_hook.sh
To manually check version consistency:
./scripts/check_version.sh
When releasing a new version, ensure all version numbers are updated:
Cargo.toml: version = "X.Y.Z"pyproject.toml: version = "X.Y.Z"CHANGELOG.md: Add new release section ## [X.Y.Z] - YYYY-MM-DDThe pre-commit hook and CI pipeline will verify version consistency automatically.
For more details, see Version Management Guide.
MIT OR Apache-2.0