tokio-process-stream

Crates.iotokio-process-stream
lib.rstokio-process-stream
version0.4.0
sourcesrc
created_at2022-01-05 20:48:13.373978
updated_at2023-04-02 17:24:27.11613
descriptionSimple crate that wraps a tokio::process into a tokio::stream
homepagehttps://github.com/lpenz/tokio-process-stream
repositoryhttps://github.com/lpenz/tokio-process-stream
max_upload_size
id508678
size19,214
Leandro Lisboa Penz (lpenz)

documentation

README

CI coveralls crates.io doc.rs

tokio-process-stream

tokio-process-stream is a simple crate that wraps a tokio::process into a tokio::stream

Having a stream interface to processes is useful when we have multiple sources of data that we want to merge and start processing from a single entry point.

This crate provides a futures::stream::Stream wrapper for tokio::process::Child. The main struct is ProcessLineStream, which implements the trait, yielding one Item enum at a time, each containing one line from either stdout (Item::Stdout) or stderr (Item::Stderr) of the underlying process until it exits. At this point, the stream yields a single Item::Done and finishes.

Example usage:

use tokio_process_stream::ProcessLineStream;
use tokio::process::Command;
use tokio_stream::StreamExt;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut sleep_cmd = Command::new("sleep");
    sleep_cmd.args(&["1"]);
    let ls_cmd = Command::new("ls");

    let sleep_procstream = ProcessLineStream::try_from(sleep_cmd)?;
    let ls_procstream = ProcessLineStream::try_from(ls_cmd)?;
    let mut procstream = sleep_procstream.merge(ls_procstream);

    while let Some(item) = procstream.next().await {
        println!("{:?}", item);
    }

    Ok(())
}

Streaming chunks

It is also possible to stream Item<Bytes> chunks with ProcessChunkStream.

use tokio_process_stream::{Item, ProcessChunkStream};
use tokio::process::Command;
use tokio_stream::StreamExt;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut procstream: ProcessChunkStream = Command::new("/bin/sh")
        .arg("-c")
        .arg(r#"printf "1/2"; sleep 0.1; printf "\r2/2 done\n""#)
        .try_into()?;

    while let Some(item) = procstream.next().await {
        println!("{:?}", item);
    }
    Ok(())
}
Commit count: 17

cargo fmt