Crates.io | stream-body |
lib.rs | stream-body |
version | 0.1.1 |
source | src |
created_at | 2020-04-06 14:47:42.428978 |
updated_at | 2020-04-06 18:49:28.066828 |
description | An HttpBody implementation with efficient streaming support for the Rust HTTP library hyper |
homepage | https://github.com/rousan/stream-body |
repository | https://github.com/rousan/stream-body |
max_upload_size | |
id | 226944 |
size | 39,565 |
An HttpBody implementation with efficient streaming support for the Rust HTTP library hyper.
The existing Body type in hyper uses Bytes
as streaming chunk. Hence, a lot of buffer allocation and de-allocation happen during the real-time large data streaming because of the Bytes type.
Therefore, StreamBody
comes to tackle this kind of situation. The StreamBody
implements HttpBody and uses &[u8]
slice as the streaming chunk, so it is possible to use the same buffer without allocating a new one; hence it overcomes any allocation/de-allocation overhead.
Also, the channel() method in hyper Body returns
a pair of a Sender and a Body.
Here, the Sender accepts Bytes as a data chunk which again
creates allocation/de-allocation overhead.
To solve this, StreamBody
has a method named StreamBody::channel()
which returns a pair of an AsyncWrite and the StreamBody
itself. As the AsyncWrite accepts &[u8]
instead of Bytes, there will
be no allocation/de-allocation overhead.
First add this to your Cargo.toml:
[dependencies]
stream-body = "0.1"
An example on handling a large file:
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use std::{convert::Infallible, net::SocketAddr};
use stream_body::StreamBody;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
async fn handle(_: Request<Body>) -> Result<Response<StreamBody>, Infallible> {
let (mut writer, body) = StreamBody::channel();
tokio::spawn(async move {
let mut f = File::open("large-file").await.unwrap();
// Reuse this buffer
let mut buf = [0_u8; 1024 * 16];
loop {
let read_count = f.read(&mut buf).await.unwrap();
if read_count == 0 {
break;
}
writer.write_all(&buf[..read_count]).await.unwrap();
}
});
Ok(Response::builder().body(body).unwrap())
}
#[tokio::main]
async fn main() {
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });
let server = Server::bind(&addr).serve(make_svc);
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}
Your PRs and stars are always welcome.