use futures::{ future, stream::{self, StreamExt as _}, }; use par_stream::{rt, Shared}; use rand::{prelude::*, rngs::OsRng}; use std::time::Duration; use structopt::StructOpt; #[derive(StructOpt)] struct Opts { pub num_jobs: usize, pub num_workers: usize, pub in_buf_size: usize, pub out_buf_size: usize, pub pow: u32, #[structopt(long)] pub spawn: bool, } fn main() { par_stream::rt::block_on_executor(async move { let opts = Opts::from_args(); let elapsed_notifier = shared_stream_by_notifier_test(&opts).await; println!("elapsed for notifier\t{:?}ms", elapsed_notifier.as_millis()); let elapsed_channel = shared_stream_by_channel_test(&opts).await; println!("elapsed for channel\t{:?}ms", elapsed_channel.as_millis()); }); } async fn shared_stream_by_notifier_test(opts: &Opts) -> Duration { let pow = opts.pow; let spawn = opts.spawn; let stream = stream::repeat(()) .take(opts.num_jobs) .map(|()| -> u64 { OsRng.gen() }); let stream = Shared::new(stream); let (out_tx, out_rx) = flume::bounded(opts.out_buf_size); let worker_futures = (0..(opts.num_workers)).map(move |_| { let out_tx = out_tx.clone(); let stream = stream.clone(); rt::spawn(async move { let _ = stream .then(|val| task(val, pow, spawn)) .map(Ok) .forward(out_tx.into_sink()) .await; }) }); let output_future = rt::spawn(async move { out_rx .into_stream() .fold(0u64, |sum, val| future::ready(sum.wrapping_add(val))) .await }); let instant = std::time::Instant::now(); futures::join!(output_future, future::join_all(worker_futures)); instant.elapsed() } async fn shared_stream_by_channel_test(opts: &Opts) -> Duration { let pow = opts.pow; let spawn = opts.spawn; let stream = stream::repeat(()) .take(opts.num_jobs) .map(|()| -> u64 { OsRng.gen() }); let (in_tx, in_rx) = flume::bounded(opts.in_buf_size); let (out_tx, out_rx) = flume::bounded(opts.out_buf_size); let input_future = rt::spawn(async move { let _ = stream.map(Ok).forward(in_tx.into_sink()).await; }); let worker_futures = (0..(opts.num_workers)).map(move |_| { let in_rx = in_rx.clone(); let out_tx = out_tx.clone(); rt::spawn(async move { let _ = in_rx .into_stream() .then(|val| task(val, pow, spawn)) .map(Ok) .forward(out_tx.into_sink()) .await; }) }); let output_future = rt::spawn(async move { out_rx .into_stream() .fold(0u64, |sum, val| future::ready(sum.wrapping_add(val))) .await }); let instant = std::time::Instant::now(); futures::join!( input_future, output_future, future::join_all(worker_futures) ); instant.elapsed() } async fn task(input: u64, pow: u32, spawn: bool) -> u64 { if spawn { rt::spawn_blocking(move || compute(input, pow)).await } else { compute(input, pow) } } fn compute(input: u64, pow: u32) -> u64 { (0..pow).fold(1u64, move |product, _| product.wrapping_mul(input)) }