#tokio #process #stream #async-stream #data-streaming

tokio-process-stream

Simple crate that wraps a tokio::process into a tokio::stream

4 releases (breaking)

0.4.0 Apr 2, 2023
0.3.0 Jun 1, 2022
0.2.0 Jan 29, 2022
0.1.0 Jan 5, 2022

#937 in Asynchronous

Download history 504/week @ 2024-07-22 464/week @ 2024-07-29 529/week @ 2024-08-05 379/week @ 2024-08-12 420/week @ 2024-08-19 585/week @ 2024-08-26 401/week @ 2024-09-02 331/week @ 2024-09-09 345/week @ 2024-09-16 419/week @ 2024-09-23 631/week @ 2024-09-30 341/week @ 2024-10-07 422/week @ 2024-10-14 545/week @ 2024-10-21 588/week @ 2024-10-28 400/week @ 2024-11-04

1,974 downloads per month
Used in 4 crates

MIT license

15KB
172 lines

CI coveralls crates.io doc.rs

tokio-process-stream

tokio-process-stream is a simple crate that wraps a tokio::process into a tokio::stream

Having a stream interface to processes is useful when we have multiple sources of data that we want to merge and start processing from a single entry point.

This crate provides a futures::stream::Stream wrapper for tokio::process::Child. The main struct is ProcessLineStream, which implements the trait, yielding one Item enum at a time, each containing one line from either stdout (Item::Stdout) or stderr (Item::Stderr) of the underlying process until it exits. At this point, the stream yields a single Item::Done and finishes.

Example usage:

use tokio_process_stream::ProcessLineStream;
use tokio::process::Command;
use tokio_stream::StreamExt;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut sleep_cmd = Command::new("sleep");
    sleep_cmd.args(&["1"]);
    let ls_cmd = Command::new("ls");

    let sleep_procstream = ProcessLineStream::try_from(sleep_cmd)?;
    let ls_procstream = ProcessLineStream::try_from(ls_cmd)?;
    let mut procstream = sleep_procstream.merge(ls_procstream);

    while let Some(item) = procstream.next().await {
        println!("{:?}", item);
    }

    Ok(())
}

Streaming chunks

It is also possible to stream Item<Bytes> chunks with ProcessChunkStream.

use tokio_process_stream::{Item, ProcessChunkStream};
use tokio::process::Command;
use tokio_stream::StreamExt;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut procstream: ProcessChunkStream = Command::new("/bin/sh")
        .arg("-c")
        .arg(r#"printf "1/2"; sleep 0.1; printf "\r2/2 done\n""#)
        .try_into()?;

    while let Some(item) = procstream.next().await {
        println!("{:?}", item);
    }
    Ok(())
}

Dependencies

~4–12MB
~134K SLoC