dagster_pipes_rust

Crates.iodagster_pipes_rust
lib.rsdagster_pipes_rust
version0.1.8
sourcesrc
created_at2024-11-27 17:57:58.315925+00
updated_at2025-02-07 21:20:54.039856+00
descriptionA Dagster pipes implementation for interfacing with Rust
homepage
repository
max_upload_size
id1463465
size471,980
colton (cmpadden)

documentation

README

dagster-pipes-rust

A pipes implementation for the Rust programming language.

Get full observability into your Rust workloads when orchestrating through Dagster. With this light weight interface, you can retrieve data directly from the Dagster context, report asset materializations, report asset checks, provide structured logging, end more.

Crates.io

Usage

Installation

cargo add dagster_pipes_rust

Example

An example project can be found in ./example-dagster-pipes-rust-project.

In this project there exists a rust_processing_jobs binary, which demonstrates how to use the Dagster context to report materializations to Dagster through the context.report_asset_materialization method.

use dagster_pipes_rust::open_dagster_pipes;
use serde_json::json;

fn main() {
    let mut context = open_dagster_pipes();
    let metadata = json!({"row_count": {"raw_value": 100, "type": "int"}});
    context.report_asset_materialization("example_rust_subprocess_asset", metadata);
}
image

It also demonstrates how to run the Rust binary in a subprocess from Dagster. Note, that it's also possible to launch processes in external compute environments like Kubernetes.

import shutil

import dagster as dg


@dg.asset(
    group_name="pipes",
    kinds={"rust"},
)
def example_rust_subprocess_asset(
    context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:
    """Demonstrates running Rust binary in a subprocess."""
    cmd = [shutil.which("cargo"), "run"]
    cwd = dg.file_relative_path(__file__, "../rust_processing_jobs")
    return pipes_subprocess_client.run(
        command=cmd,
        cwd=cwd,
        context=context,
    ).get_materialize_result()


defs = dg.Definitions(
    assets=[example_rust_subprocess_asset],
    resources={
        "pipes_subprocess_client": dg.PipesSubprocessClient(
            context_injector=dg.PipesEnvContextInjector(),
        )
    },
)

Contributing

Prerequisites

For nix users, these dependencies can be installed with nix develop .#rust.

Installation

  1. Install the Python (Dagster) environment, mainly used for testing.
uv sync

This will automatically create a virtual environment in .venv and install all the Python dependencies.

To use the environment, either activate it manually with source ./.venv/bin/activate, or use uv run to execute commands in the context of this environment.

  1. To build the Rust part of the project, use:
cargo build

Testing

The tests can be run with cargo:

cargo test

The integration tests are written in Python and can be run with pytest. The Rust project will be automatically built before running the tests.

uv run pytest

Pipes Schema

We use jsonschema to define the pipes protocol and quicktype to generate the Rust structs. Currently, the json schemas live in jsonschema/pipes but they should be hosted/defined in a centralized repository in the future.

To generate the Rust structs, make sure to install quicktype with npm install -g quicktype. Then run:

cd community-integrations/pipes
make jsonschema_rust
Commit count: 0

cargo fmt