#![cfg(feature = "macros")] use std::time::{Duration, Instant}; use actix::{clock::sleep, prelude::*}; use futures_util::stream::StreamExt; #[derive(Clone, Copy, Message)] #[rtype(usize)] struct Num(usize); struct MyActor(usize); impl Actor for MyActor { type Context = Context; } impl Handler for MyActor { type Result = AtomicResponse; fn handle(&mut self, msg: Num, _: &mut Self::Context) -> Self::Result { AtomicResponse::new(Box::pin( sleep(Duration::from_millis(msg.0 as u64)) .into_actor(self) .map(move |_res, this, _| { this.0 += msg.0; this.0 }), )) } } #[actix::test] async fn test_atomic_response() { let items = vec![Num(7), Num(6), Num(5), Num(4), Num(3), Num(2), Num(1)]; let addr = MyActor(0).start(); let fut = futures_util::stream::iter(items) .map(|i| addr.send(i)) .buffer_unordered(7) .fold(Vec::default(), |mut acc, res| { acc.push(res.unwrap()); async { acc } }); let start = Instant::now(); let result = fut.await; assert!(Instant::now().duration_since(start).as_millis() >= 28); assert_eq!(result, vec![7, 13, 18, 22, 25, 27, 28]); }