ex-futures

Crates.ioex-futures
lib.rsex-futures
version0.4.9
sourcesrc
created_at2017-05-10 08:54:42.118506
updated_at2017-12-03 22:09:08.096785
descriptionAn extension of `futures`
homepagehttps://github.com/AtsukiTak/ex-futures
repositoryhttps://github.com/AtsukiTak/ex-futures
max_upload_size
id13971
size57,169
Atsuki Takahashi (AtsukiTak)

documentation

README

futures_ext

I really want your contribution especially English staff. As you see, my English skill is not good. Please help me to improve documentation.

MIT licensed Apache-2.0 licensed Crates.io

An extension of futures.

Document

For now, this crate enables you to

  • create publish subscribe channel (will be removed future)

  • convert any kind of stream/sink into "cloneable"

  • fork any kind of stream

  • convert Error associated type which is ()

How to use

Publish-Subscribe channel

An usage is almost same with futures::unsync::mpsc::unbounded.

use ex_futures::unsync::pubsub::unbounded;

fn main() {
    let (tx, rx) = unbounded::<usize>();
    let rx2 = rx.clone();
    let mut rx = rx.wait();
    let mut rx2 = rx.wait(); // Subscriber is cloneable

    tx.send(1).wait().unwrap();

    assert_eq!(rx.next().unwrap().map(|i| *i), Ok(1));
    assert_eq!(rx2.next().unwrap().map(|i| *i), Ok(1));
}

Cloneable stream/sink

use ex_futures::StreamExt;
use futures::unsycn::mpsc::channel;

fn main() {
    let (tx, rx) = channel(42);;

    let cloneable_rx = rx.unsync_cloneable(); // Convert "rx" into cloneable
    let cloneable_rx2 = cloneable.clone();  // Now you can clone it

    let tx = tx.wait();
    tx.send(0);
    tx.send(1);
    tx.send(2);
    tx.send(3);

    assert_eq!(cloneable_rx.collect().wait().unwrap(), [0, 1, 2, 3]);
    assert_eq!(cloneable_rx2.collect().wait().unwrap(), [0, 1, 2, 3]);
}

Forked stream

use ex_futures::StreamExt;

fn main() {
    let int_stream = gen_stream(); // Somehow you create some stream

    let (even, odd) = int_stream.fork(|i| i % 2 == 0);

    assert_eq!(even.collect().wait().unwrap(), [0, 2]);
    assert_eq!(odd.collect().wait().unwrap(), [1, 3]);
}
Commit count: 59

cargo fmt