| Crates.io | json-register |
| lib.rs | json-register |
| version | 0.2.0 |
| created_at | 2025-11-21 18:40:23.325224+00 |
| updated_at | 2026-01-12 16:41:06.068685+00 |
| description | A Rust library for registering JSON objects in PostgreSQL with canonicalisation and caching |
| homepage | |
| repository | https://github.com/telicent-oss/json-register |
| max_upload_size | |
| id | 1943974 |
| size | 163,194 |
Note: This library is currently in beta. The API is stable but may change in future releases based on user feedback and production usage.
json-register is a caching registry for JSON objects, with storage in a PostgreSQL database, using their JSONB encoding. It ensures that semantically equivalent JSON objects are cached only once by employing a canonicalisation strategy in the cache, and using JSONB comparisons in the database. The database assigns a uniqiue 32-bit integer identifier to each object.
This library is written in Rust and provides native bindings for Python, allowing for seamless integration into applications written in either language.
JSONB type.Add the following to your Cargo.toml:
[dependencies]
json-register = "0.1.0"
tokio = { version = "1.0", features = ["full"] }
serde_json = "1.0"
Ensure you have a compatible Python environment (3.8+) and install the package.
Currently available on TestPyPI:
pip install --index-url https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple/ json-register-rust
Once published to PyPI:
pip install json-register-rust
Before using json-register, create the required table and index in your PostgreSQL database:
CREATE TABLE IF NOT EXISTS json_objects (
id SERIAL PRIMARY KEY,
json_object JSONB UNIQUE NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_json_objects_gin ON json_objects USING GIN (json_object);
The GIN index enables efficient containment and path queries on the JSONB column. You can customise the table name, id column, and jsonb column names - just ensure they match your Register / JsonRegister configuration.
The following example demonstrates how to initialize the registry and register JSON objects using the Rust API.
use json_register::Register;
use serde_json::json;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Configuration parameters
let connection_string = "postgres://user:password@localhost:5432/dbname";
let table_name = "json_objects";
let id_column = "id";
let jsonb_column = "data";
let pool_size = 10;
let lru_cache_size = 1000;
// Initialize the register
let register = Register::new(
connection_string,
table_name,
id_column,
jsonb_column,
pool_size,
lru_cache_size,
None, // acquire_timeout_secs (defaults to 5)
None, // idle_timeout_secs (defaults to 600)
None, // max_lifetime_secs (defaults to 1800)
).await?;
// Register a single object
let object = json!({
"name": "Alice",
"role": "Engineer",
"active": true
});
let id = register.register_object(&object).await?;
println!("Registered object with ID: {}", id);
// Register a batch of objects
let batch = vec![
json!({"name": "Bob", "role": "Manager"}),
json!({"name": "Charlie", "role": "Designer"}),
];
let ids = register.register_batch_objects(&batch).await?;
println!("Registered batch IDs: {:?}", ids);
Ok(())
}
The following example demonstrates how to use the library within a Python application using the synchronous API.
from json_register import JsonRegister
def main():
# Initialize the register
register = JsonRegister(
database_name="dbname",
database_host="localhost",
database_port=5432,
database_user="user",
database_password="password",
lru_cache_size=1000,
table_name="json_objects",
id_column="id",
jsonb_column="data",
pool_size=10
)
# Register a single object
obj = {
"name": "Alice",
"role": "Engineer",
"active": True
}
obj_id = register.register_object(obj)
print(f"Registered object with ID: {obj_id}")
# Register a batch of objects
batch = [
{"name": "Bob", "role": "Manager"},
{"name": "Charlie", "role": "Designer"}
]
batch_ids = register.register_batch_objects(batch)
print(f"Registered batch IDs: {batch_ids}")
if __name__ == "__main__":
main()
For async Python applications (FastAPI, aiohttp, etc.), use the async variants to avoid blocking the event loop.
from json_register import JsonRegister
import asyncio
async def main():
# Initialize the register (constructor is synchronous)
register = JsonRegister(
database_name="dbname",
database_host="localhost",
database_port=5432,
database_user="user",
database_password="password",
lru_cache_size=1000,
table_name="json_objects",
id_column="id",
jsonb_column="data",
pool_size=10
)
# Register a single object asynchronously
obj = {
"name": "Alice",
"role": "Engineer",
"active": True
}
obj_id = await register.register_object_async(obj)
print(f"Registered object with ID: {obj_id}")
# Register a batch of objects asynchronously
batch = [
{"name": "Bob", "role": "Manager"},
{"name": "Charlie", "role": "Designer"}
]
batch_ids = await register.register_batch_objects_async(batch)
print(f"Registered batch IDs: {batch_ids}")
if __name__ == "__main__":
asyncio.run(main())
Optional timeout parameters can be specified when initializing the register. All timeouts are in seconds.
acquire_timeout_secs: Timeout for acquiring a connection from the pool (default: 5)idle_timeout_secs: Timeout before closing idle connections (default: 600)max_lifetime_secs: Maximum lifetime of a connection (default: 1800)let register = Register::new(
connection_string,
table_name,
id_column,
jsonb_column,
pool_size,
lru_cache_size,
Some(10), // 10 second acquire timeout
Some(300), // 5 minute idle timeout
Some(3600), // 1 hour max lifetime
).await?;
register = JsonRegister(
database_name="dbname",
database_host="localhost",
database_port=5432,
database_user="user",
database_password="password",
acquire_timeout_secs=10, # 10 second acquire timeout
idle_timeout_secs=300, # 5 minute idle timeout
max_lifetime_secs=3600, # 1 hour max lifetime
)
The library provides comprehensive telemetry metrics for integration with monitoring systems such as Prometheus, OpenTelemetry, or custom logging. All metrics can be retrieved individually or as a complete snapshot.
pool_size(): Total number of connections in the pool (idle and active)idle_connections(): Number of idle connections available for useactive_connections(): Number of connections currently in useis_closed(): Whether the connection pool is closedcache_hits(): Total number of successful cache lookupscache_misses(): Total number of unsuccessful cache lookupscache_hit_rate(): Hit rate as a percentage (0.0 to 100.0)cache_size(): Current number of items in the cachecache_capacity(): Maximum cache capacitycache_evictions(): Total number of items evicted from the cachedb_queries_total(): Total number of database queries executeddb_query_errors(): Total number of failed database queriesregister_single_calls(): Number of times register_object was calledregister_batch_calls(): Number of times register_batch_objects was calledtotal_objects_registered(): Total number of objects registered across all callsThe telemetry_metrics() method (Rust only) returns a complete snapshot of all metrics in a single call, which is useful for OpenTelemetry exporters
// Get all metrics at once (recommended for OpenTelemetry)
let metrics = register.telemetry_metrics();
println!("Cache: {}/{} items, {} evictions", metrics.cache_size, metrics.cache_capacity, metrics.cache_evictions);
println!("Cache performance: {} hits, {} misses ({:.2}% hit rate)",
metrics.cache_hits, metrics.cache_misses, metrics.cache_hit_rate);
println!("Pool: {} total, {} active, {} idle",
metrics.pool_size, metrics.active_connections, metrics.idle_connections);
println!("Database: {} queries, {} errors",
metrics.db_queries_total, metrics.db_query_errors);
println!("Operations: {} objects registered ({} single + {} batch calls)",
metrics.total_objects_registered, metrics.register_single_calls, metrics.register_batch_calls);
// Or query individual metrics
let hit_rate = register.cache_hit_rate();
let active = register.active_connections();
# Individual metrics
print(f"Cache: {register.cache_size()}/{register.cache_capacity()} items")
print(f"Cache evictions: {register.cache_evictions()}")
print(f"Active connections: {register.active_connections()}")
print(f"DB queries: {register.db_queries_total()}, errors: {register.db_query_errors()}")
print(f"Objects registered: {register.total_objects_registered()}")
print(f"Single calls: {register.register_single_calls()}, Batch calls: {register.register_batch_calls()}")
The library uses the tracing crate for structured logging. Logs include connection info, cache hit/miss statistics, and batch sizes.
Use tracing-subscriber to see logs:
use tracing_subscriber::EnvFilter;
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
Set the RUST_LOG environment variable to control log levels:
# See debug logs from json-register
RUST_LOG=json_register=debug cargo run
# See trace logs (cache hits/misses)
RUST_LOG=json_register=trace cargo run
Logs are automatically bridged to Python's logging module:
import logging
# Configure Python logging as usual
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(name)s: %(message)s'
)
# Logs from json-register will appear with logger name 'json_register'
# You can also configure just the json_register logger:
logging.getLogger('json_register').setLevel(logging.DEBUG)
| Level | Content |
|---|---|
INFO |
Connection events, configuration |
DEBUG |
Cache statistics, batch sizes, database queries |
TRACE |
Individual cache hits/misses (verbose) |
This project is licensed under the Apache-2.0 License.