use std::{ alloc::{GlobalAlloc, System}, future::ready, sync::atomic::AtomicUsize, }; use futures::{ stream::{self, FuturesUnordered}, FutureExt, StreamExt, }; use futures_buffered::{BufferedStreamExt, FuturesUnorderedBounded}; struct TrackingAllocator { alloc_count: AtomicUsize, dealloc_count: AtomicUsize, alloc: AtomicUsize, dealloc: AtomicUsize, } unsafe impl GlobalAlloc for TrackingAllocator { unsafe fn alloc(&self, layout: std::alloc::Layout) -> *mut u8 { self.alloc .fetch_add(layout.size(), std::sync::atomic::Ordering::Relaxed); self.alloc_count .fetch_add(1, std::sync::atomic::Ordering::Relaxed); System.alloc(layout) } unsafe fn dealloc(&self, ptr: *mut u8, layout: std::alloc::Layout) { self.dealloc .fetch_add(layout.size(), std::sync::atomic::Ordering::Relaxed); self.dealloc_count .fetch_add(1, std::sync::atomic::Ordering::Relaxed); System.dealloc(ptr, layout) } } impl TrackingAllocator { fn reset(&self) { self.alloc_count .store(0, std::sync::atomic::Ordering::Relaxed); self.dealloc_count .store(0, std::sync::atomic::Ordering::Relaxed); self.alloc.store(0, std::sync::atomic::Ordering::Relaxed); self.dealloc.store(0, std::sync::atomic::Ordering::Relaxed); } fn report(&self) { let alloc_count = self.alloc_count.load(std::sync::atomic::Ordering::Relaxed); let dealloc_count = self .dealloc_count .load(std::sync::atomic::Ordering::Relaxed); let alloc = self.alloc.load(std::sync::atomic::Ordering::Relaxed); let dealloc = self.dealloc.load(std::sync::atomic::Ordering::Relaxed); println!("count1: {:?}", alloc_count); println!("count2: {:?}", dealloc_count); println!("alloc: {:?}", alloc); println!("dealloc: {:?}", dealloc); self.reset(); } } #[global_allocator] static ALLOCATOR: TrackingAllocator = TrackingAllocator { alloc_count: AtomicUsize::new(0), dealloc_count: AtomicUsize::new(0), alloc: AtomicUsize::new(0), dealloc: AtomicUsize::new(0), }; #[cfg(not(miri))] const BATCH: usize = 256; #[cfg(not(miri))] const TOTAL: usize = 512000; #[cfg(miri)] const BATCH: usize = 32; #[cfg(miri)] const TOTAL: usize = 128; #[test] fn futures_unordered() { ALLOCATOR.reset(); let mut queue = FuturesUnordered::new(); for i in 0..BATCH { queue.push(ready(i)) } for i in BATCH..TOTAL { queue.next().now_or_never().unwrap(); queue.push(ready(i)) } for _ in 0..BATCH { queue.next().now_or_never().unwrap(); } ALLOCATOR.report(); } #[test] fn futures_unordered_bounded() { ALLOCATOR.reset(); let mut queue = FuturesUnorderedBounded::new(BATCH); for i in 0..BATCH { queue.push(ready(i)) } for i in BATCH..TOTAL { queue.next().now_or_never().unwrap(); queue.push(ready(i)) } for _ in 0..BATCH { queue.next().now_or_never().unwrap(); } drop(queue); ALLOCATOR.report(); } #[test] fn futures_unordered2() { ALLOCATOR.reset(); let mut queue = futures_buffered::FuturesUnordered::new(); for i in 0..BATCH { queue.push(ready(i)) } for i in BATCH..TOTAL { queue.next().now_or_never().unwrap(); queue.push(ready(i)) } for _ in 0..BATCH { queue.next().now_or_never().unwrap(); } drop(queue); ALLOCATOR.report(); } #[test] fn buffer_unordered() { ALLOCATOR.reset(); let mut s = stream::iter((0..TOTAL).map(ready)).buffer_unordered(BATCH); while s.next().now_or_never().unwrap().is_some() {} ALLOCATOR.report(); } #[test] fn buffered_unordered() { ALLOCATOR.reset(); let mut s = stream::iter((0..TOTAL).map(ready)).buffered_unordered(BATCH); while s.next().now_or_never().unwrap().is_some() {} ALLOCATOR.report(); } #[test] fn futures_join_all() { ALLOCATOR.reset(); let _ = futures::future::join_all((0..BATCH).map(ready)) .now_or_never() .unwrap(); ALLOCATOR.report(); } #[test] fn buffered_join_all() { ALLOCATOR.reset(); let _ = futures_buffered::join_all((0..BATCH).map(ready)) .now_or_never() .unwrap(); ALLOCATOR.report(); }