otlp2pipeline

Crates.iootlp2pipeline
lib.rsotlp2pipeline
version0.4.0
created_at2026-01-12 03:42:29.941646+00
updated_at2026-01-19 03:39:20.386609+00
descriptionOTLP ingestion worker for Cloudflare Pipelines and AWS
homepage
repositoryhttps://github.com/smithclay/otlp2pipeline
max_upload_size
id2036955
size761,961
Clay Smith (smithclay)

documentation

README

otlp2pipeline

CI Crates.io License

Stream OpenTelemetry data to Cloudflare R2 Data Catalog, Amazon S3 Tables, or Azure ADLS Gen2.

Table of Contents

What it does

Receives OpenTelemetry data and routes it to object storage via cloud-native pipelines. Data lands in Parquet format using a Clickhouse-inspired schema.

Cloud services handle batching and format conversion. Catalog maintenance (compaction, snapshot expiration) runs automatically. Only want to do some simple OTLP->Parquet conversion without the cloud bells and whistles? Check out otlp2parquet.

Why?

  • Deploy a managed Iceberg observability backend with one command
  • Query with any tool that reads Iceberg: DuckDB, Pandas, Trino, Athena, AI agents

Quickstart

Install the CLI for your favorite cloud provider:

# requires rust toolchain: `curl https://sh.rustup.rs -sSf | sh`
cargo install otlp2pipeline

# Create a new project to deploy on AWS (requires AWS CLI)
otlp2pipeline init --provider aws --env awstest01 --region us-east-1

# or create a new project with Cloudflare (requires the wrangler CLI)
otlp2pipeline init --provider cf --env cftest01

# or create a new project with Azure (requires Azure CLI)
otlp2pipeline init --provider azure --env azuretest01 --region westus

# see what will be created automatically
otlp2pipeline plan

Deploy to Cloudflare

Requires the wrangler CLI.

# 1. `init` a Cloudflare project as described above

# 2. Create R2 API token (Admin Read & Write)
#    https://dash.cloudflare.com/?to=/:account/r2/api-token

#3. Create pipelines
otlp2pipeline create --r2-token $R2_API_TOKEN --output wrangler.toml

# 3a. Set token for worker to write to pipeline
# Go to https://dash.cloudflare.com/?to=/:account/api-tokens
# Create key with "Workers Pipelines: Edit" permissions
npx wrangler secret put PIPELINE_AUTH_TOKEN

# 3b. Deploy worker defined in wrangler.toml
npx wrangler deploy

Deploy to AWS or Azure

Requires the AWS CLI or Azure CLI configured with appropiate credentials to create resources.

# 1. `init` an AWS/Azure project as described above

# 2. Deploy (authentication is enabled by default)
otlp2pipeline create

Check status and connect Claude Code, Collectors, or Codex

# Verify successful deployment
otlp2pipeline status

# How to stream telemetry from different sources
otlp2pipeline connect

# Query tables with DuckDB, by default data is available after ~5 minutes
otlp2pipeline query

Cloudflare

Worker Architecture

flowchart TB
    subgraph Ingest["Ingest + Store"]
        OTLP[OTLP] --> W[Worker]
        W --> P[Pipelines]
        W --> DOs[(Durable Objects)]
        P --> R2[(R2 Data Catalog / Iceberg)]
    end

    subgraph Query["Query"]
        CLI[CLI / WebSocket] -->|real-time| W
        DuckDB[🦆 DuckDB] --> R2
    end

The worker uses Durable Objects for real-time RED metrics, see openapi.yaml for API details.

Cloudflare-specific CLI commands

Cloudflare has additional features like live tail log/traces and real-time RED metrics.

# List known services
otlp2pipeline services --url https://your-worker.workers.dev

# Stream live logs
otlp2pipeline tail my-service logs

# Stream live traces
otlp2pipeline tail my-service traces

AWS

Lambda Architecture

flowchart TB
    subgraph Ingest["Ingest + Store"]
        OTLP[OTLP] --> L[Lambda]
        L --> F[Data Firehose]
        F --> S3[(S3 Tables / Iceberg)]
    end

    subgraph Query["Query"]
        DuckDB[🦆 DuckDB] --> S3
        Athena[Athena / Trino] --> S3
    end

Azure

Fabric users: Eventstreams can replace Stream Analytics for Azure Data Lake ingestion.

Stream Analytics Architecture

flowchart TB
    subgraph Ingest["Ingest + Store"]
        OTLP[OTLP] --> CA[Container App]
        CA --> EH[Event Hub]
        EH --> SA[Stream Analytics]
        SA --> ADLS[(ADLS Gen2 / Parquet)]
    end

    subgraph Query["Query"]
        DuckDB[🦆 DuckDB] --> ADLS
        Synapse[Synapse / Spark] --> ADLS
    end

Schema

Schemas come from the otlp2records library and generate at build time. The same schema is used by the otlp2parquet and duckdb-otlp projects.

Performance

Compaction and snapshot expiration run automatically where supported.

Security

Authentication

Bearer token authentication is enabled by default. Use --no-auth to disable (not recommended for production).

Input Validation

  • Maximum payload size: 10 MB (after decompression)
  • Invalid JSON or timestamps are rejected with 400 errors
  • Service names: alphanumeric, hyphens, underscores, dots only (max 128 chars)
  • Service registry limit: 10,000 unique services (returns 507 if exceeded)
Commit count: 18

cargo fmt