Crates.io | pjson-rs |
lib.rs | pjson-rs |
version | 0.4.1 |
created_at | 2025-08-11 14:25:38.601189+00 |
updated_at | 2025-08-13 14:38:25.764579+00 |
description | Priority JSON Streaming Protocol - high-performance priority-based JSON streaming (requires nightly Rust) |
homepage | |
repository | https://github.com/bug-ops/pjs |
max_upload_size | |
id | 1790240 |
size | 1,023,517 |
ð 6.3x faster than serde_json | ðŊ 5.3x faster progressive loading | ðū Bounded memory usage | ðïļ Production Ready
New in v0.3.0: Production-ready code quality with zero clippy warnings, Clean Architecture compliance, and comprehensive test coverage (196 tests). Now requires nightly Rust for zero-cost abstractions.
⥠Blazing Fast
|
ðŊ Smart Streaming
|
ðū Memory Efficient
|
ð§ Production Ready
|
ð Schema Aware
|
ð Developer Friendly
|
Modern web applications face a fundamental challenge: large JSON responses block UI rendering.
Solution | Problem |
---|---|
Pagination | Requires multiple round-trips, complex state management |
GraphQL | Still sends complete response, just smaller |
JSON streaming | No semantic understanding, can't prioritize |
Compression | Reduces size but not time-to-first-byte |
PJS revolutionizes JSON transmission by understanding your data semantically and prioritizing what matters.
#[derive(JsonPriority)]
struct UserDashboard {
#[priority(critical)] // Sent in first 10ms
user_id: u64,
user_name: String,
#[priority(high)] // Sent in next 50ms
recent_activity: Vec<Activity>,
notifications: Vec<Notification>,
#[priority(low)] // Sent when available
detailed_analytics: Analytics, // 4MB of data
}
Traditional JSON Loading:
[ââââââââââââââââââââ] 100% - 2000ms - Full UI renders
PJS Loading:
[ââââââââââââââââââââ] 10% - 10ms - Critical UI visible
[ââââââââââââââââââââ] 30% - 50ms - Interactive UI
[ââââââââââââââââââââ] 100% - 2000ms - Full data loaded
User Experience: ⥠Instant â ð Happy
Production-ready Axum integration with full REST API, session management, and real-time streaming.
Clean architecture with CQRS pattern, event sourcing, and ports & adapters for maximum testability and maintainability.
Automatic format detection supporting JSON, NDJSON, and Server-Sent Events based on client Accept headers.
Powered by sonic-rs
for blazing fast JSON processing with zero-copy operations.
Complete WebSocket implementation with priority-based frame delivery:
format!("{var}")
syntax throughoutJsonData
value object replacing serde_json::Value
From
trait implementations for JsonData â serde_json::Value
:param
to {param}
format-D warnings
)Metric | serde_json | sonic-rs | PJS | PJS Advantage |
---|---|---|---|---|
Small JSON (43B) | 275ns | 129ns | 312ns | Competitive |
Medium JSON (351B) | 1,662ns | 434ns | 590ns | 2.8x vs serde |
Large JSON (357KB) | 1,294Ξs | 216Ξs | 204Ξs | 6.3x vs serde, 1.06x vs sonic |
Memory Efficiency | Baseline | Fast | 5.3x faster progressive | Bounded memory |
Progressive Loading | Batch-only | Batch-only | 37Ξs vs 198Ξs | 5.3x faster |
Add PJS to your Cargo.toml
:
[dependencies]
pjson-rs = "0.3.0"
# Optional: for HTTP server integration
axum = "0.8"
tokio = { version = "1", features = ["full"] }
Or use cargo:
cargo add pjson-rs
use std::sync::Arc;
use pjson_rs::{
application::{
handlers::{InMemoryCommandHandler, InMemoryQueryHandler},
services::{SessionService, StreamingService},
},
infrastructure::{
adapters::{InMemoryStreamRepository, InMemoryEventPublisher, InMemoryMetricsCollector},
http::axum_adapter::{create_pjs_router, PjsAppState},
},
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Create infrastructure
let repository = Arc::new(InMemoryStreamRepository::new());
let event_publisher = Arc::new(InMemoryEventPublisher::new());
let metrics_collector = Arc::new(InMemoryMetricsCollector::new());
// Create CQRS handlers
let command_handler = Arc::new(InMemoryCommandHandler::new(
repository.clone(), event_publisher, metrics_collector.clone()
));
let query_handler = Arc::new(InMemoryQueryHandler::new(repository, metrics_collector));
// Create services
let session_service = Arc::new(SessionService::new(command_handler.clone(), query_handler.clone()));
let streaming_service = Arc::new(StreamingService::new(command_handler));
// Build Axum app
let app = create_pjs_router()
.with_state(PjsAppState::new(session_service, streaming_service));
// Start server
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
println!("ð PJS Server running on http://127.0.0.1:3000");
axum::serve(listener, app).await?;
Ok(())
}
Use the official PJS client library for seamless integration:
npm install @pjs/client
import { PjsClient, createHttpTransport } from '@pjs/client';
// Create client with HTTP transport
const client = new PjsClient({
transport: createHttpTransport({
baseUrl: 'http://localhost:3000',
format: 'sse' // or 'json', 'ndjson'
})
});
// Stream data with priority-based delivery
await client.stream({
data: {
users: [/* large array */],
dashboard: { /* complex object */ }
},
onFrame: (frame) => {
// Frames arrive in priority order
if (frame.priority >= 90) {
updateUI(frame.data); // Critical data first
}
}
});
import { PjsClient, createWebSocketTransport } from '@pjs/client';
const client = new PjsClient({
transport: createWebSocketTransport({
url: 'ws://localhost:3001/ws'
})
});
// Real-time streaming with priority handling
await client.connect();
client.onFrame((frame) => {
console.log(`Priority ${frame.priority}:`, frame.data);
// Handle based on priority
switch (frame.priority) {
case 'critical':
showImmediate(frame.data);
break;
case 'high':
queueForNextFrame(frame.data);
break;
default:
processInBackground(frame.data);
}
});
use pjson_rs::{
ApplicationResult,
domain::value_objects::SessionId,
};
use futures::{SinkExt, StreamExt};
use tokio_tungstenite::{connect_async, tungstenite::Message};
#[tokio::main]
async fn main() -> ApplicationResult<()> {
// Connect to WebSocket streaming server
let (ws_stream, _) = connect_async("ws://127.0.0.1:3001/ws")
.await
.expect("Failed to connect");
let (mut write, mut read) = ws_stream.split();
// Receive prioritized frames
while let Some(message) = read.next().await {
match message? {
Message::Text(text) => {
let frame: serde_json::Value = serde_json::from_str(&text)?;
match frame["@type"].as_str() {
Some("pjs_frame") => {
let priority = frame["@priority"].as_u64().unwrap_or(0);
if priority >= 200 {
println!("ðĻ Critical data: {}", frame["data"]);
} else if priority >= 100 {
println!("ð High priority: {}", frame["data"]);
} else {
println!("ð Background data received");
}
}
Some("stream_complete") => {
println!("â
Stream completed!");
break;
}
_ => {}
}
}
_ => {}
}
}
Ok(())
}
Start the interactive demos to see PJS in action:
# WebSocket streaming server with priority-based delivery
cargo run --bin websocket_streaming --manifest-path crates/pjs-demo/Cargo.toml
# Interactive demo with HTML interface and real-time visualization
cargo run --bin interactive_demo --manifest-path crates/pjs-demo/Cargo.toml
# Simple demo server with basic streaming
cargo run --bin simple_demo --manifest-path crates/pjs-demo/Cargo.toml
# Performance comparison demo (PJS vs traditional JSON)
cargo run --bin performance_comparison --manifest-path crates/pjs-demo/Cargo.toml
Or run root-level examples:
# Complete Axum HTTP server
cargo run --example axum_server
# Advanced streaming demo server
cargo run --example streaming_demo_server
# Simple usage patterns
cargo run --example simple_usage
Then visit http://127.0.0.1:3000
to see priority-based streaming in action.
Perfect for:
PJS implements a clean, layered architecture following Domain-Driven Design principles:
Core business logic with value objects (Priority, SessionId, JsonPath) and aggregates (StreamSession) ensuring data consistency.
CQRS pattern with separate Command and Query handlers, plus high-level services (SessionService, StreamingService) orchestrating workflows.
Adapters implementing domain ports:
Multi-format streaming support:
Intelligent frame processing:
pjs/
âââ crates/
â âââ pjs-core/ # Core protocol, domain logic, and HTTP integration
â â âââ src/
â â â âââ application/ # CQRS handlers, services, DTOs
â â â âââ domain/ # Value objects, entities, aggregates
â â â âââ infrastructure/ # HTTP, WebSocket, repositories, adapters
â â â âââ parser/ # SIMD, zero-copy, buffer pools
â â â âââ stream/ # Priority streaming, reconstruction
â â â âââ compression/ # Schema-based compression
â â âââ examples/ # Standalone demos (zero-copy, compression)
â â âââ tests/ # Integration tests
â âââ pjs-demo/ # Interactive demo servers with WebSocket streaming
â â âââ src/
â â âââ servers/ # Demo server implementations
â â âââ clients/ # WebSocket client demos
â â âââ data/ # Sample data generators (analytics, ecommerce)
â â âââ static/ # HTML interfaces
â âââ pjs-js-client/ # JavaScript/TypeScript client library â
IMPLEMENTED
â â âââ src/ # TypeScript source code with transport layers
â â âââ tests/ # Jest test suite with full coverage
â â âââ package.json # NPM configuration and dependencies
â âââ pjs-bench/ # Benchmarking suite
â âââ benches/ # Criterion.rs performance benchmarks
âââ examples/ # Root-level usage examples
âââ axum_server.rs # Complete HTTP server demo
âââ simple_usage.rs # Basic usage patterns
âââ streaming_demo_server.rs # Advanced streaming demo
The server provides a complete REST API:
# Create a new session
POST /pjs/sessions
Content-Type: application/json
{
"max_concurrent_streams": 10,
"timeout_seconds": 3600,
"client_info": "My App v1.0"
}
# Response: { "session_id": "sess_abc123", "expires_at": "..." }
# Get session info
GET /pjs/sessions/{session_id}
# Start streaming data
POST /pjs/stream/{session_id}
Content-Type: application/json
{
"data": { "users": [...], "products": [...] },
"priority_threshold": 50,
"max_frames": 100
}
# Stream frames (JSON format)
GET /pjs/stream/{session_id}/frames?format=json&priority=80
# Real-time Server-Sent Events
GET /pjs/stream/{session_id}/sse
Accept: text/event-stream
# System health check
GET /pjs/health
# Response: { "status": "healthy", "version": "0.3.0" }
A complete working server is available at examples/axum_server.rs
. To run it:
# Start the server
cargo run --example axum_server
# Test endpoints
curl -X POST http://localhost:3000/pjs/sessions \
-H "Content-Type: application/json" \
-d '{"max_concurrent_streams": 5}'
# Check health
curl http://localhost:3000/pjs/health
# View metrics
curl http://localhost:3000/examples/metrics
impl Trait
in associated types)# Install nightly Rust
rustup install nightly
# Set nightly for this project
rustup override set nightly
# Or use nightly globally
rustup default nightly
# Clone repository
git clone https://github.com/bug-ops/pjs
cd pjs
# Ensure nightly Rust is active
rustup override set nightly
# Build with optimizations
cargo build --release
# Run tests
cargo test --workspace
# Run the complete HTTP server example
cargo run --example axum_server
# Build with optional features
cargo build --features "http-client,prometheus-metrics"
http-client
: Enable HTTP-based event publishingprometheus-metrics
: Enable Prometheus metrics collectionsimd-auto
: Auto-detect best SIMD support (default)compression
: Enable compression middlewarePJS provides true zero-cost abstractions using nightly Rust features for maximum performance. The Universal Framework Integration Layer uses Generic Associated Types (GATs) with impl Trait
to eliminate all runtime overhead:
use pjson_rs::infrastructure::integration::StreamingAdapter;
use std::future::Future;
// Zero-cost framework integration with GATs
impl StreamingAdapter for YourFramework {
type Request = YourRequest;
type Response = YourResponse;
type Error = YourError;
// TRUE zero-cost futures - no Box allocation!
type StreamingResponseFuture<'a> = impl Future<Output = IntegrationResult<Self::Response>> + Send + 'a
where
Self: 'a;
fn create_streaming_response<'a>(
&'a self,
session_id: SessionId,
frames: Vec<StreamFrame>,
format: StreamingFormat,
) -> Self::StreamingResponseFuture<'a> {
// Direct async block - compiler generates optimal Future type
async move {
// Your framework-specific logic here
Ok(your_response)
}
}
fn framework_name(&self) -> &'static str {
"your_framework"
}
}
Zero-Cost Abstractions:
// Axum (native support)
use pjson_rs::infrastructure::http::axum_adapter::create_pjs_router;
let app = create_pjs_router().with_state(app_state);
// Custom framework integration
use pjson_rs::infrastructure::adapters::UniversalAdapter;
let adapter = UniversalAdapter::new()
.with_serializer(your_serializer)
.with_transport(your_transport);
The HTTP server includes production-ready middleware:
use pjson_rs::infrastructure::http::middleware::*;
let app = create_pjs_router()
.layer(axum::middleware::from_fn(pjs_cors_middleware))
.layer(axum::middleware::from_fn(security_middleware))
.layer(axum::middleware::from_fn(health_check_middleware))
.layer(PjsMiddleware::new()
.with_compression(true)
.with_metrics(true)
.with_max_request_size(10 * 1024 * 1024))
.with_state(app_state);
Built-in Prometheus metrics support:
// Automatically tracks:
// - pjs_active_sessions
// - pjs_total_sessions_created
// - pjs_frames_processed_total
// - pjs_bytes_streamed_total
// - pjs_frame_processing_time_ms
let metrics = collector.export_prometheus();
// Expose at /metrics endpoint for Prometheus scraping
Comprehensive domain event tracking:
// Events automatically generated:
// - SessionCreated, SessionActivated, SessionEnded
// - StreamStarted, StreamCompleted, FrameGenerated
// - PriorityAdjusted, ErrorOccurred
publisher.subscribe("SessionCreated", |event| {
println!("New session: {}", event.session_id());
});
We welcome contributions! See CONTRIBUTING.md for guidelines.
# Install development tools
rustup component add clippy rustfmt
# Run checks
cargo clippy --workspace
cargo fmt --check
# Run all tests
cargo test --workspace --all-features
Licensed under either of:
at your option.
Want to try PJS immediately? Here's the fastest way:
# Clone and run
git clone https://github.com/bug-ops/pjs
cd pjs
# Set nightly Rust (required)
rustup override set nightly
# Run the server
cargo run --example axum_server
# In another terminal, test the API
curl -X POST http://localhost:3000/pjs/sessions \
-H "Content-Type: application/json" \
-d '{"max_concurrent_streams": 5}'
# Try Server-Sent Events streaming
curl -N -H "Accept: text/event-stream" \
http://localhost:3000/pjs/stream/{session_id}/sse
To verify the performance claims, run the comprehensive benchmark suite:
# Run all benchmarks
cargo bench -p pjs-bench
# Or run specific benchmarks:
cargo bench -p pjs-bench --bench simple_throughput # Core parsing speed
cargo bench -p pjs-bench --bench memory_benchmarks # Memory efficiency
cargo bench -p pjs-bench --bench streaming_benchmarks # Progressive loading
Results show PJS 6.3x faster than serde_json and 1.06x faster than sonic-rs on large JSON.
The server will show:
Built with:
PJS: Because users shouldn't wait for data they don't need yet.