futures-to-stream

Crates.iofutures-to-stream
lib.rsfutures-to-stream
version0.2.0
sourcesrc
created_at2022-03-19 16:29:29.272932
updated_at2022-03-19 16:29:29.272932
descriptionTurn heterogeneous Futures of same associated Output type to a stream
homepagehttps://github.com/414owen/futures-to-stream/
repositoryhttps://github.com/414owen/futures-to-stream/
max_upload_size
id553231
size11,101
Owen Shepherd (414owen)

documentation

README

Futures to stream

Macros to create streams from heterogeneous futures

Usage

async fn test1() -> u8 { 1 }
async fn test2() -> u8 { 2 }
async fn test3() -> u8 { 3 }
async fn test4() -> u8 { 4 }

fn ordered_stream() -> impl Stream<Item = u8> {
  futures_to_ordered_stream!(test1(), test2(), test3(), test4())
}

fn unordered_stream() -> impl Stream<Item = u8> {
  futures_to_unordered_stream!(test1(), test2(), test3(), test4())
}

Goal

To allow the creation of a stream from heterogeneous Futures with equal associated Items.

Problem

The way to create a Stream from a set of Futures is to create a FuturesOrdered, or a FuturesUnordered. However, as these store Futures of the type they're parameterized over, you can't give them a heterogeneous set of Futuress.

Here's an example that compiles:

async fn test1() -> () { () }

fn to_stream() -> impl Stream<Item = ()> {
  let mut futs = FuturesOrdered::new();
  futs.push(test1());
  futs.push(test1());
  futs
}

Here's an example that doesn't compile:

async fn test1() -> () { () }

async fn test2() -> () { () }

fn to_stream() -> impl Stream<Item = ()> {
  let mut futs = FuturesOrdered::new();
  futs.push(test1());
  futs.push(test2()); // Error: expected opaque type, found a different opaque type
  futs
}

Great, very helpful rustc. We've created the exact same function under a different name, and it's Future is different somehow.

Well, there is a way to combine two different futures pretty easily -- Use future::Either

async fn test1() -> () { () }

async fn test2() -> () { () }

fn to_stream() -> impl Stream<Item = ()> {
  let mut futs = FuturesOrdered::new();
  futs.push(Either::Left(test1()));
  futs.push(Either::Right(test2()));
  futs
}

That's great, now let's try with four Futures.

async fn test1() -> () { () }
async fn test2() -> () { () }
async fn test3() -> () { () }
async fn test4() -> () { () }

fn to_stream() -> impl Stream<Item = ()> {
  let mut futs = FuturesOrdered::new();
  futs.push(Either::Left(test1()));
  futs.push(Either::Right(Either::Left(test2())));
  futs.push(Either::Right(Either::Right(Either::Left(test3()))));
  futs.push(Either::Right(Either::Right(Either::Right(test4()))));
  futs
}

With four, it's already pretty unwieldy. Luckily, this package exports macros to generate this all for you.

Back to Usage

Commit count: 15

cargo fmt