stream-body

Crates.iostream-body
lib.rsstream-body
version0.1.1
sourcesrc
created_at2020-04-06 14:47:42.428978
updated_at2020-04-06 18:49:28.066828
descriptionAn HttpBody implementation with efficient streaming support for the Rust HTTP library hyper
homepagehttps://github.com/rousan/stream-body
repositoryhttps://github.com/rousan/stream-body
max_upload_size
id226944
size39,565
Rousan Ali (rousan)

documentation

README

stream-body

crates.io Documentation MIT

An HttpBody implementation with efficient streaming support for the Rust HTTP library hyper.

Docs

Motivation

The existing Body type in hyper uses Bytes as streaming chunk. Hence, a lot of buffer allocation and de-allocation happen during the real-time large data streaming because of the Bytes type. Therefore, StreamBody comes to tackle this kind of situation. The StreamBody implements HttpBody and uses &[u8] slice as the streaming chunk, so it is possible to use the same buffer without allocating a new one; hence it overcomes any allocation/de-allocation overhead.

Also, the channel() method in hyper Body returns a pair of a Sender and a Body. Here, the Sender accepts Bytes as a data chunk which again creates allocation/de-allocation overhead. To solve this, StreamBody has a method named StreamBody::channel() which returns a pair of an AsyncWrite and the StreamBody itself. As the AsyncWrite accepts &[u8] instead of Bytes, there will be no allocation/de-allocation overhead.

Usage

First add this to your Cargo.toml:

[dependencies]
stream-body = "0.1"

An example on handling a large file:

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use std::{convert::Infallible, net::SocketAddr};
use stream_body::StreamBody;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

async fn handle(_: Request<Body>) -> Result<Response<StreamBody>, Infallible> {
    let (mut writer, body) = StreamBody::channel();

    tokio::spawn(async move {
        let mut f = File::open("large-file").await.unwrap();

        // Reuse this buffer
        let mut buf = [0_u8; 1024 * 16];
        loop {
            let read_count = f.read(&mut buf).await.unwrap();
            if read_count == 0 {
                break;
            }
            writer.write_all(&buf[..read_count]).await.unwrap();
        }
    });

    Ok(Response::builder().body(body).unwrap())
}

#[tokio::main]
async fn main() {
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));

    let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });

    let server = Server::bind(&addr).serve(make_svc);

    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

Contributing

Your PRs and stars are always welcome.

Commit count: 11

cargo fmt