Crates.io | hopper |
lib.rs | hopper |
version | 0.4.2 |
source | src |
created_at | 2016-12-18 03:33:24.591322 |
updated_at | 2018-02-28 04:52:57.837121 |
description | an unbounded mpsc with bounded memory |
homepage | https://github.com/postmates/hopper |
repository | https://github.com/postmates/hopper |
max_upload_size | |
id | 7660 |
size | 66,948 |
Hopper provides a version of the rust standard mpsc that is unbounded but consumes a bounded amount of memory. This is done by paging elements to disk at need. The ambition here is to support mpsc style communication without allocating unbounded amounts of memory or dropping inputs on the floor.
Include the hopper library in your Cargo.toml
hopper = "0.4"
and use it in much the same way you'd use stdlib's mpsc:
extern crate tempdir;
extern crate hopper;
let dir = tempdir::TempDir::new("hopper").unwrap();
let (mut snd, mut rcv) = hopper::channel("example", dir.path()).unwrap();
snd.send(9);
assert_eq!(Some(9), rcv.iter().next());
The primary difference here is that you must provide a name for the channel and a directory where hopper can page items to disk.
Hopper is intended to be used in situtations where your system cannot load-shed inputs and must eventually process them. Hopper maintains an in-memory buffer of inputs with disk overflow when the in-memory buffer is full. While hopper does page to disk it will not preserve writes across restarts, much in the same way as stdlib mpsc.
Hopper's channel looks very much like a named pipe in Unix. You supply a
name to either channel_2
or channel_with_max_bytes_3
and you push bytes
in and out. The disk paging adds a complication. In private, the name
supplied to the above two functions is used to create a directory under
data_dir
. This directory gets filled up with monotonically increasing
files in situations where the disk paging is in use. We'll treat this
exclusively from here on.
The on-disk structure look like so:
data-dir/
sink-name0/
0
1
sink-name1/
0
You'll notice exports of Sender and Receiver in this module's
namespace. These are the structures that back the send and receive side of
the named channel. The Senders--there may be multiples of them--are
responsible for creating "queue files". In the above,
data-dir/sink-name*/*
are queue files. These files are treated as
append-only logs by the Senders. The Receivers trawl through these logs to
read the data serialized there.
Maybe! Each Sender has a notion of the maximum bytes it may read--which you
can set explicitly when creating a channel with
channel_with_max_bytes
--and once the Sender has gone over that limit it'll
attempt to mark the queue file as read-only and create a new file. The
Receiver is programmed to read its current queue file until it reaches EOF
and finds the file is read-only, at which point it deletes the file--it is
the only reader--and moves on to the next.
hopper::channel_with_max_bytes
takes a max_disk_files
argument, defining the
total number of overflow files that can exist concurrently. If all memory and
disk buffers are full, sends into the queue will fail. The error result is
written such that the caller can recover ownership of the input value. By
default max_disk_files == usize::max_value()
and so if the Receiver is unable
to keep up with the Senders then, oops, your disk will gradually fill up.
Hopper is intended to work on any wacky old filesystem with any options, even at high concurrency. As common filesystems do not support interleaving small atomic writes hopper limits itself to one exclusive Sender and one exclusive Receiver at a time. This potentially limits the concurrency of mpsc but maintains data integrity. We are open to improvements in this area.
Hopper ships with benchmarks. We've seen performance only 30% slower than stdlib's mpsc all the way up to 70x worse, depending on configuration options, disk IO speeds and the like. We warmly encourage you benchmark on your system.