Crates.io | ex-futures |
lib.rs | ex-futures |
version | 0.4.9 |
source | src |
created_at | 2017-05-10 08:54:42.118506 |
updated_at | 2017-12-03 22:09:08.096785 |
description | An extension of `futures` |
homepage | https://github.com/AtsukiTak/ex-futures |
repository | https://github.com/AtsukiTak/ex-futures |
max_upload_size | |
id | 13971 |
size | 57,169 |
I really want your contribution especially English staff. As you see, my English skill is not good. Please help me to improve documentation.
An extension of futures
.
For now, this crate enables you to
create publish subscribe channel (will be removed future)
convert any kind of stream/sink into "cloneable"
fork any kind of stream
convert Error
associated type which is ()
An usage is almost same with futures::unsync::mpsc::unbounded
.
use ex_futures::unsync::pubsub::unbounded;
fn main() {
let (tx, rx) = unbounded::<usize>();
let rx2 = rx.clone();
let mut rx = rx.wait();
let mut rx2 = rx.wait(); // Subscriber is cloneable
tx.send(1).wait().unwrap();
assert_eq!(rx.next().unwrap().map(|i| *i), Ok(1));
assert_eq!(rx2.next().unwrap().map(|i| *i), Ok(1));
}
use ex_futures::StreamExt;
use futures::unsycn::mpsc::channel;
fn main() {
let (tx, rx) = channel(42);;
let cloneable_rx = rx.unsync_cloneable(); // Convert "rx" into cloneable
let cloneable_rx2 = cloneable.clone(); // Now you can clone it
let tx = tx.wait();
tx.send(0);
tx.send(1);
tx.send(2);
tx.send(3);
assert_eq!(cloneable_rx.collect().wait().unwrap(), [0, 1, 2, 3]);
assert_eq!(cloneable_rx2.collect().wait().unwrap(), [0, 1, 2, 3]);
}
use ex_futures::StreamExt;
fn main() {
let int_stream = gen_stream(); // Somehow you create some stream
let (even, odd) = int_stream.fork(|i| i % 2 == 0);
assert_eq!(even.collect().wait().unwrap(), [0, 2]);
assert_eq!(odd.collect().wait().unwrap(), [1, 3]);
}