pjson-rs

Crates.iopjson-rs
lib.rspjson-rs
version0.4.1
created_at2025-08-11 14:25:38.601189+00
updated_at2025-08-13 14:38:25.764579+00
descriptionPriority JSON Streaming Protocol - high-performance priority-based JSON streaming (requires nightly Rust)
homepage
repositoryhttps://github.com/bug-ops/pjs
max_upload_size
id1790240
size1,023,517
Andrei G (bug-ops)

documentation

README

PJS - Priority JSON Streaming Protocol

Crates.io Documentation Rust Build codecov License Rust Version

🚀 6.3x faster than serde_json | ðŸŽŊ 5.3x faster progressive loading | ðŸ’ū Bounded memory usage | 🏗ïļ Production Ready

New in v0.3.0: Production-ready code quality with zero clippy warnings, Clean Architecture compliance, and comprehensive test coverage (196 tests). Now requires nightly Rust for zero-cost abstractions.

🌟 Key Features

⚡ Blazing Fast

  • 6.3x faster than serde_json
  • 1.71 GiB/s throughput
  • SIMD-accelerated parsing

ðŸŽŊ Smart Streaming

  • Skeleton-first delivery
  • Priority-based transmission
  • Progressive enhancement

ðŸ’ū Memory Efficient

  • 5.3x faster progressive loading
  • Bounded memory usage
  • Zero-copy operations

🔧 Production Ready

  • All tests passing
  • Clean Architecture

📊 Schema Aware

  • Automatic compression
  • Semantic analysis
  • Type optimization

🚀 Developer Friendly

  • Simple API
  • Drop-in replacement
  • Extensive documentation

ðŸŽŊ The Problem

Modern web applications face a fundamental challenge: large JSON responses block UI rendering.

Current State

Why existing solutions fall short

Solution Problem
Pagination Requires multiple round-trips, complex state management
GraphQL Still sends complete response, just smaller
JSON streaming No semantic understanding, can't prioritize
Compression Reduces size but not time-to-first-byte

âœĻ The Solution: PJS

PJS revolutionizes JSON transmission by understanding your data semantically and prioritizing what matters.

Core Innovation: Semantic Prioritization

#[derive(JsonPriority)]
struct UserDashboard {
    #[priority(critical)]  // Sent in first 10ms
    user_id: u64,
    user_name: String,
    
    #[priority(high)]      // Sent in next 50ms
    recent_activity: Vec<Activity>,
    notifications: Vec<Notification>,
    
    #[priority(low)]       // Sent when available
    detailed_analytics: Analytics,  // 4MB of data
}

Real-World Impact

Traditional JSON Loading:
[████████████████████] 100% - 2000ms - Full UI renders

PJS Loading:
[██░░░░░░░░░░░░░░░░░░] 10%  - 10ms   - Critical UI visible
[██████░░░░░░░░░░░░░░] 30%  - 50ms   - Interactive UI
[████████████████████] 100% - 2000ms - Full data loaded

User Experience: ⚡ Instant → 😊 Happy

Key Features

🚀 Complete HTTP Server Integration

Production-ready Axum integration with full REST API, session management, and real-time streaming.

ðŸŽŊ Advanced Streaming Implementations

🏗ïļ Domain-Driven Design Architecture

Clean architecture with CQRS pattern, event sourcing, and ports & adapters for maximum testability and maintainability.

📊 Production-Ready Infrastructure

🔄 Multiple Response Formats

Automatic format detection supporting JSON, NDJSON, and Server-Sent Events based on client Accept headers.

⚡ SIMD-Accelerated Parsing

Powered by sonic-rs for blazing fast JSON processing with zero-copy operations.

🔄 Real-Time WebSocket Streaming

Complete WebSocket implementation with priority-based frame delivery:

🎉 What's New in v0.3.0

🛠ïļ Production-Ready Code Quality

🏗ïļ Clean Architecture Enforcement

🌐 HTTP/WebSocket Modernization

🔧 Technical Debt Resolution

Benchmarks

🚀 Actual Performance Results

Metric serde_json sonic-rs PJS PJS Advantage
Small JSON (43B) 275ns 129ns 312ns Competitive
Medium JSON (351B) 1,662ns 434ns 590ns 2.8x vs serde
Large JSON (357KB) 1,294Ξs 216Ξs 204Ξs 6.3x vs serde, 1.06x vs sonic
Memory Efficiency Baseline Fast 5.3x faster progressive Bounded memory
Progressive Loading Batch-only Batch-only 37Ξs vs 198Ξs 5.3x faster

ðŸŽŊ Key Performance Achievements

Installation

Add PJS to your Cargo.toml:

[dependencies]
pjson-rs = "0.3.0"

# Optional: for HTTP server integration
axum = "0.8"
tokio = { version = "1", features = ["full"] }

Or use cargo:

cargo add pjson-rs

Quick Start

HTTP Server with Axum Integration

use std::sync::Arc;
use pjson_rs::{
    application::{
        handlers::{InMemoryCommandHandler, InMemoryQueryHandler},
        services::{SessionService, StreamingService},
    },
    infrastructure::{
        adapters::{InMemoryStreamRepository, InMemoryEventPublisher, InMemoryMetricsCollector},
        http::axum_adapter::{create_pjs_router, PjsAppState},
    },
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    // Create infrastructure
    let repository = Arc::new(InMemoryStreamRepository::new());
    let event_publisher = Arc::new(InMemoryEventPublisher::new());
    let metrics_collector = Arc::new(InMemoryMetricsCollector::new());
    
    // Create CQRS handlers
    let command_handler = Arc::new(InMemoryCommandHandler::new(
        repository.clone(), event_publisher, metrics_collector.clone()
    ));
    let query_handler = Arc::new(InMemoryQueryHandler::new(repository, metrics_collector));
    
    // Create services
    let session_service = Arc::new(SessionService::new(command_handler.clone(), query_handler.clone()));
    let streaming_service = Arc::new(StreamingService::new(command_handler));
    
    // Build Axum app
    let app = create_pjs_router()
        .with_state(PjsAppState::new(session_service, streaming_service));
    
    // Start server
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
    println!("🚀 PJS Server running on http://127.0.0.1:3000");
    axum::serve(listener, app).await?;
    
    Ok(())
}

JavaScript/TypeScript Client Library

Use the official PJS client library for seamless integration:

npm install @pjs/client

HTTP Streaming

import { PjsClient, createHttpTransport } from '@pjs/client';

// Create client with HTTP transport
const client = new PjsClient({
    transport: createHttpTransport({
        baseUrl: 'http://localhost:3000',
        format: 'sse' // or 'json', 'ndjson'
    })
});

// Stream data with priority-based delivery
await client.stream({
    data: { 
        users: [/* large array */],
        dashboard: { /* complex object */ }
    },
    onFrame: (frame) => {
        // Frames arrive in priority order
        if (frame.priority >= 90) {
            updateUI(frame.data); // Critical data first
        }
    }
});

WebSocket Real-Time Streaming

import { PjsClient, createWebSocketTransport } from '@pjs/client';

const client = new PjsClient({
    transport: createWebSocketTransport({
        url: 'ws://localhost:3001/ws'
    })
});

// Real-time streaming with priority handling
await client.connect();

client.onFrame((frame) => {
    console.log(`Priority ${frame.priority}:`, frame.data);
    
    // Handle based on priority
    switch (frame.priority) {
        case 'critical':
            showImmediate(frame.data);
            break;
        case 'high':
            queueForNextFrame(frame.data);
            break;
        default:
            processInBackground(frame.data);
    }
});

WebSocket Streaming

use pjson_rs::{
    ApplicationResult,
    domain::value_objects::SessionId,
};
use futures::{SinkExt, StreamExt};
use tokio_tungstenite::{connect_async, tungstenite::Message};

#[tokio::main]
async fn main() -> ApplicationResult<()> {
    // Connect to WebSocket streaming server
    let (ws_stream, _) = connect_async("ws://127.0.0.1:3001/ws")
        .await
        .expect("Failed to connect");
    
    let (mut write, mut read) = ws_stream.split();
    
    // Receive prioritized frames
    while let Some(message) = read.next().await {
        match message? {
            Message::Text(text) => {
                let frame: serde_json::Value = serde_json::from_str(&text)?;
                
                match frame["@type"].as_str() {
                    Some("pjs_frame") => {
                        let priority = frame["@priority"].as_u64().unwrap_or(0);
                        
                        if priority >= 200 {
                            println!("ðŸšĻ Critical data: {}", frame["data"]);
                        } else if priority >= 100 {
                            println!("📊 High priority: {}", frame["data"]);
                        } else {
                            println!("📝 Background data received");
                        }
                    }
                    Some("stream_complete") => {
                        println!("✅ Stream completed!");
                        break;
                    }
                    _ => {}
                }
            }
            _ => {}
        }
    }
    
    Ok(())
}

Demo Servers

Start the interactive demos to see PJS in action:

# WebSocket streaming server with priority-based delivery
cargo run --bin websocket_streaming --manifest-path crates/pjs-demo/Cargo.toml

# Interactive demo with HTML interface and real-time visualization
cargo run --bin interactive_demo --manifest-path crates/pjs-demo/Cargo.toml

# Simple demo server with basic streaming
cargo run --bin simple_demo --manifest-path crates/pjs-demo/Cargo.toml

# Performance comparison demo (PJS vs traditional JSON)
cargo run --bin performance_comparison --manifest-path crates/pjs-demo/Cargo.toml

Or run root-level examples:

# Complete Axum HTTP server
cargo run --example axum_server

# Advanced streaming demo server  
cargo run --example streaming_demo_server

# Simple usage patterns
cargo run --example simple_usage

Then visit http://127.0.0.1:3000 to see priority-based streaming in action.

Use Cases

Perfect for:

Architecture

PJS implements a clean, layered architecture following Domain-Driven Design principles:

1. Domain Layer

Core business logic with value objects (Priority, SessionId, JsonPath) and aggregates (StreamSession) ensuring data consistency.

2. Application Layer

CQRS pattern with separate Command and Query handlers, plus high-level services (SessionService, StreamingService) orchestrating workflows.

3. Infrastructure Layer

Adapters implementing domain ports:

4. Transport Abstraction

Multi-format streaming support:

5. Advanced Streaming

Intelligent frame processing:

Technical Architecture

pjs/
├── crates/
│   ├── pjs-core/           # Core protocol, domain logic, and HTTP integration
│   │   ├── src/
│   │   │   ├── application/    # CQRS handlers, services, DTOs
│   │   │   ├── domain/         # Value objects, entities, aggregates
│   │   │   ├── infrastructure/ # HTTP, WebSocket, repositories, adapters
│   │   │   ├── parser/         # SIMD, zero-copy, buffer pools
│   │   │   ├── stream/         # Priority streaming, reconstruction
│   │   │   └── compression/    # Schema-based compression
│   │   ├── examples/           # Standalone demos (zero-copy, compression)
│   │   └── tests/              # Integration tests
│   ├── pjs-demo/           # Interactive demo servers with WebSocket streaming
│   │   └── src/
│   │       ├── servers/        # Demo server implementations
│   │       ├── clients/        # WebSocket client demos  
│   │       ├── data/           # Sample data generators (analytics, ecommerce)
│   │       └── static/         # HTML interfaces
│   ├── pjs-js-client/      # JavaScript/TypeScript client library ✅ IMPLEMENTED
│   │   ├── src/            # TypeScript source code with transport layers
│   │   ├── tests/          # Jest test suite with full coverage
│   │   └── package.json    # NPM configuration and dependencies
│   └── pjs-bench/          # Benchmarking suite
│       └── benches/        # Criterion.rs performance benchmarks
└── examples/               # Root-level usage examples
    ├── axum_server.rs      # Complete HTTP server demo
    ├── simple_usage.rs     # Basic usage patterns  
    └── streaming_demo_server.rs # Advanced streaming demo

Current Implementation Status

Implemented Components

API Examples

HTTP Endpoints

The server provides a complete REST API:

# Create a new session
POST /pjs/sessions
Content-Type: application/json
{
  "max_concurrent_streams": 10,
  "timeout_seconds": 3600,
  "client_info": "My App v1.0"
}

# Response: { "session_id": "sess_abc123", "expires_at": "..." }

# Get session info  
GET /pjs/sessions/{session_id}

# Start streaming data
POST /pjs/stream/{session_id}
Content-Type: application/json
{
  "data": { "users": [...], "products": [...] },
  "priority_threshold": 50,
  "max_frames": 100
}

# Stream frames (JSON format)
GET /pjs/stream/{session_id}/frames?format=json&priority=80

# Real-time Server-Sent Events
GET /pjs/stream/{session_id}/sse
Accept: text/event-stream

# System health check
GET /pjs/health
# Response: { "status": "healthy", "version": "0.3.0" }

Working Example

A complete working server is available at examples/axum_server.rs. To run it:

# Start the server
cargo run --example axum_server

# Test endpoints
curl -X POST http://localhost:3000/pjs/sessions \
  -H "Content-Type: application/json" \
  -d '{"max_concurrent_streams": 5}'

# Check health  
curl http://localhost:3000/pjs/health

# View metrics
curl http://localhost:3000/examples/metrics

Performance Goals

Building

Prerequisites

Setting up Rust Nightly

# Install nightly Rust
rustup install nightly

# Set nightly for this project
rustup override set nightly

# Or use nightly globally
rustup default nightly

Quick Start

# Clone repository
git clone https://github.com/bug-ops/pjs
cd pjs

# Ensure nightly Rust is active
rustup override set nightly

# Build with optimizations
cargo build --release

# Run tests
cargo test --workspace

# Run the complete HTTP server example
cargo run --example axum_server

# Build with optional features
cargo build --features "http-client,prometheus-metrics"

Feature Flags

Framework Integration

Universal Integration Layer with Zero-Cost Abstractions

PJS provides true zero-cost abstractions using nightly Rust features for maximum performance. The Universal Framework Integration Layer uses Generic Associated Types (GATs) with impl Trait to eliminate all runtime overhead:

use pjson_rs::infrastructure::integration::StreamingAdapter;
use std::future::Future;

// Zero-cost framework integration with GATs
impl StreamingAdapter for YourFramework {
    type Request = YourRequest;
    type Response = YourResponse;
    type Error = YourError;

    // TRUE zero-cost futures - no Box allocation!
    type StreamingResponseFuture<'a> = impl Future<Output = IntegrationResult<Self::Response>> + Send + 'a
    where
        Self: 'a;

    fn create_streaming_response<'a>(
        &'a self,
        session_id: SessionId,
        frames: Vec<StreamFrame>,
        format: StreamingFormat,
    ) -> Self::StreamingResponseFuture<'a> {
        // Direct async block - compiler generates optimal Future type
        async move {
            // Your framework-specific logic here
            Ok(your_response)
        }
    }

    fn framework_name(&self) -> &'static str {
        "your_framework"
    }
}

Performance Benefits of Nightly Rust

Zero-Cost Abstractions:

Currently Supported

Integration Examples

// Axum (native support)
use pjson_rs::infrastructure::http::axum_adapter::create_pjs_router;
let app = create_pjs_router().with_state(app_state);

// Custom framework integration
use pjson_rs::infrastructure::adapters::UniversalAdapter;
let adapter = UniversalAdapter::new()
    .with_serializer(your_serializer)
    .with_transport(your_transport);

Production Features

Middleware Stack

The HTTP server includes production-ready middleware:

use pjson_rs::infrastructure::http::middleware::*;

let app = create_pjs_router()
    .layer(axum::middleware::from_fn(pjs_cors_middleware))
    .layer(axum::middleware::from_fn(security_middleware))
    .layer(axum::middleware::from_fn(health_check_middleware))
    .layer(PjsMiddleware::new()
        .with_compression(true)
        .with_metrics(true)
        .with_max_request_size(10 * 1024 * 1024))
    .with_state(app_state);

Monitoring & Metrics

Built-in Prometheus metrics support:

// Automatically tracks:
// - pjs_active_sessions
// - pjs_total_sessions_created  
// - pjs_frames_processed_total
// - pjs_bytes_streamed_total
// - pjs_frame_processing_time_ms

let metrics = collector.export_prometheus();
// Expose at /metrics endpoint for Prometheus scraping

Event System

Comprehensive domain event tracking:

// Events automatically generated:
// - SessionCreated, SessionActivated, SessionEnded
// - StreamStarted, StreamCompleted, FrameGenerated
// - PriorityAdjusted, ErrorOccurred

publisher.subscribe("SessionCreated", |event| {
    println!("New session: {}", event.session_id());
});

Contributing

We welcome contributions! See CONTRIBUTING.md for guidelines.

Development Setup

# Install development tools
rustup component add clippy rustfmt

# Run checks
cargo clippy --workspace
cargo fmt --check

# Run all tests
cargo test --workspace --all-features

License

Licensed under either of:

at your option.

Getting Started Right Now

Want to try PJS immediately? Here's the fastest way:

# Clone and run
git clone https://github.com/bug-ops/pjs
cd pjs

# Set nightly Rust (required)
rustup override set nightly

# Run the server
cargo run --example axum_server

# In another terminal, test the API
curl -X POST http://localhost:3000/pjs/sessions \
  -H "Content-Type: application/json" \
  -d '{"max_concurrent_streams": 5}'

# Try Server-Sent Events streaming  
curl -N -H "Accept: text/event-stream" \
  http://localhost:3000/pjs/stream/{session_id}/sse

Running Performance Benchmarks

To verify the performance claims, run the comprehensive benchmark suite:

# Run all benchmarks
cargo bench -p pjs-bench

# Or run specific benchmarks:
cargo bench -p pjs-bench --bench simple_throughput    # Core parsing speed
cargo bench -p pjs-bench --bench memory_benchmarks    # Memory efficiency  
cargo bench -p pjs-bench --bench streaming_benchmarks # Progressive loading

Results show PJS 6.3x faster than serde_json and 1.06x faster than sonic-rs on large JSON.

The server will show:

Roadmap

Next Steps

Acknowledgments

Built with:

Community


PJS: Because users shouldn't wait for data they don't need yet.

Commit count: 0

cargo fmt