#hyper #stream #writer #async #data-streaming #rust

stream-body

An HttpBody implementation with efficient streaming support for the Rust HTTP library hyper

2 releases

0.1.1 Apr 6, 2020
0.1.0 Apr 6, 2020

#2173 in Asynchronous


Used in json-response

MIT license

21KB
309 lines

stream-body

crates.io Documentation MIT

An HttpBody implementation with efficient streaming support for the Rust HTTP library hyper.

Docs

Motivation

The existing Body type in hyper uses Bytes as streaming chunk. Hence, a lot of buffer allocation and de-allocation happen during the real-time large data streaming because of the Bytes type. Therefore, StreamBody comes to tackle this kind of situation. The StreamBody implements HttpBody and uses &[u8] slice as the streaming chunk, so it is possible to use the same buffer without allocating a new one; hence it overcomes any allocation/de-allocation overhead.

Also, the channel() method in hyper Body returns a pair of a Sender and a Body. Here, the Sender accepts Bytes as a data chunk which again creates allocation/de-allocation overhead. To solve this, StreamBody has a method named StreamBody::channel() which returns a pair of an AsyncWrite and the StreamBody itself. As the AsyncWrite accepts &[u8] instead of Bytes, there will be no allocation/de-allocation overhead.

Usage

First add this to your Cargo.toml:

[dependencies]
stream-body = "0.1"

An example on handling a large file:

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use std::{convert::Infallible, net::SocketAddr};
use stream_body::StreamBody;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

async fn handle(_: Request<Body>) -> Result<Response<StreamBody>, Infallible> {
    let (mut writer, body) = StreamBody::channel();

    tokio::spawn(async move {
        let mut f = File::open("large-file").await.unwrap();

        // Reuse this buffer
        let mut buf = [0_u8; 1024 * 16];
        loop {
            let read_count = f.read(&mut buf).await.unwrap();
            if read_count == 0 {
                break;
            }
            writer.write_all(&buf[..read_count]).await.unwrap();
        }
    });

    Ok(Response::builder().body(body).unwrap())
}

#[tokio::main]
async fn main() {
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));

    let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });

    let server = Server::bind(&addr).serve(make_svc);

    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

Contributing

Your PRs and stars are always welcome.

Dependencies

~4MB
~59K SLoC