| Crates.io | flow-mumu |
| lib.rs | flow-mumu |
| version | 0.2.0-rc.9 |
| created_at | 2025-07-08 13:56:43.591551+00 |
| updated_at | 2025-10-12 02:10:46.711483+00 |
| description | Stream transform tools plugin for the Lava language |
| homepage | https://lava.nu11.uk |
| repository | https://gitlab.com/tofo/flow-mumu |
| max_upload_size | |
| id | 1742818 |
| size | 194,143 |
Version: 0.2.0-rc.9
Repository: https://gitlab.com/tofo/flow-mumu
License: Dual MIT / Apache-2.0
mumu-flow is a dynamic plugin for the MuMu/Lava interpreter that provides a rich set of streaming combinators and pipeline utilities. It is designed to compose cleanly with MuMu’s iterators and the interpreter’s zero-arg “transform” functions, enabling non-blocking, partial-application-friendly dataflows.
This crate powers flow pipelines such as throttled I/O drains, functional mapping/filtering, compositional chaining, early termination, applicative application, and more — all written in ergonomic MuMu code.
Non-blocking throttling
flow:throttle(ms) and dynamic flow:throttle_with(fn) return AGAIN (temporary no-data) instead of sleeping, so other pipelines continue to run.
Async draining (micro-bursts)
flow:drain_async and flow:drain_then(callback) drain in background poller ticks with item burst and time budget caps; AGAIN is respected as keep-alive.
Functional stages
Mapping (flow:trans / flow:map), filtering (flow:filter), slicing/windowing (flow:slice), collection (flow:to_array), reduction (flow:reduce).
Flow control
flow:stop_when(pred) terminates the stream without emitting the triggering item.
Combinators & utilities
flow:chain (flatMap), flow:concat, flow:of (one-shot), flow:empty (immediate EOF), flow:ap / flow:ap_zip (applicatives), flow:read (chunked file reads).
First-class partials
All stages support placeholders (_) and curried usage. Partials are re-entrant and combine cleanly via flow:compose.
Interoperability
Accept Iterator handles or zero-arg transform functions as sources; return transforms suitable for further composition.
Most primitives follow one of these shapes:
Stage constructor → (…params…) => (source) => transform
e.g. flow:filter(pred) returns a function that accepts an iterator/transform and yields a zero-arg transform that emits a filtered stream.
Eager stage (when fully applied) → Value
flow:trans(func, source) runs eagerly and returns a concrete array (typed where possible). With a single argument (flow:trans(func)) it returns a stage constructor.
Non-blocking gate → returns "AGAIN"
Throttling stages never sleep; if the period hasn't elapsed they yield a temporary no-data error ("AGAIN"). Async drains and the host event loop understand this and keep the interpreter alive until the next window.
EOF → returns "NO_MORE_DATA"
Signifies the upstream has been fully consumed for this run.
Placeholders
_
Many bridges treatValue::Placeholder,"_", or["_"]identically. Mixing positional values and_lets you assemble pipelines incrementally.
Below is a practical overview of the exported functions (as named inside MuMu). Each function also exists as a variable of the same name to allow direct invocation.
| Name | Signature (informal) | Notes |
|---|---|---|
flow:compose |
(...stagesOrPartials) → Function |
Chains stages right-to-left; supports _ as slot. Final arg can be “data” to apply immediately. |
flow:concat |
(a, b) → transform |
Emit a to completion then b. a/b are Iterator or transform. |
flow:chain |
(mapper, source) → transform |
Flat-map: mapper(item) may return value, iterator, or transform — result is flattened. |
flow:stop_when |
(pred[, source]) → transform |
Stops the pipeline when pred(item) is truthy; does not emit the triggering item. |
flow:empty |
() → transform |
Emits no data (EOF). |
flow:of |
(value?) → transform |
Emits value exactly once, then EOF. |
flow:ap_zip |
(fs, xs) → transform |
Zipped applicative: pull one function from fs and one value from xs, apply. |
flow:ap |
(fs, xs) → transform |
Cartesian applicative: buffer all xs, then apply each f ∈ fs to each x ∈ xs. |
| Name | Signature (informal) | Notes |
|---|---|---|
flow:trans / flow:map |
(fn[, source]) → transform or array |
With two args it is eager and returns an array; with one arg it returns a stage. |
flow:filter |
(pred[, source]) → transform |
Predicate may return bool or number truthiness (!= 0). |
flow:slice |
(skip, count[, source]) → transform |
Skips skip items, then emits up to count. Accepts iterator/transform sources. |
flow:reduce |
(fn, init, source) → Value |
Eager reduction: acc = fn(acc, item). Source may be iterator or transform. |
flow:to_array |
(source) → Array |
Collects a homogeneous typed array (int/float/str) or errors on mixed types. |
| Name | Signature (informal) | Notes |
|---|---|---|
flow:throttle |
(ms[, source]) → transform |
Non-blocking gate per instance; first item passes immediately. |
flow:throttle_with |
(period_fn[, source]) → transform |
Period is computed after each successful emit by calling period_fn(). |
flow:drain |
(source) → int |
Blocking drain (in MuMu terms): pulls until EOF; returns count. |
flow:drain_async |
(source[, callback]) → 0 |
Spawns a background poller; micro-burst drain per tick. Respects AGAIN and avoids busy-wait. |
flow:drain_then |
(source, callback) → 0 |
Like drain_async but callback is required and is invoked with the total count exactly once on EOF. |
Tuning
drain_async/drain_then
Set environment variables to adjust poller behaviour:
LAVA_FLOW_BURST(default64) — max items per tick
LAVA_FLOW_BUDGET_US(default500) — time budget per tick (µs)
| Name | Signature (informal) | Notes |
|---|---|---|
flow:read |
(chunkSizeBytes, path) → transform |
Reads fixed-size byte chunks from a file; each chunk is decoded via UTF-8 lossy and emitted as a string. EOF ⇒ "NO_MORE_DATA". |
Most stages accept either:
step(start, end)), or() => value | "NO_MORE_DATA".Internally, the plugin normalizes everything to a transform via to_transform(…).
flow:throttle, flow:throttle_with) never sleep. If the period hasn’t elapsed they immediately return the string error "AGAIN".1 so the scheduler doesn’t idle out just before the next window."NO_MORE_DATA"._ as a placeholder so you can incrementally build up a chain:
p = flow:filter(_, _) # both slots open
p = p(x => x % 2 == 0) # fill predicate
tf = p(ink(1,10)) # fill source
flow:trans(fn, source) runs now and returns a typed array.flow:trans(fn) returns a stage to be used inside flow:compose or called later.flow:throttle_with(period_fn, source)period_fn is invoked only after a successful emission; the next allowed instant becomes now + ms.period_fn must return a non-negative number (int/long/float).flow:stop_when(pred)pred(item) is truthy the stage terminates and does not emit the triggering item (NO_MORE_DATA).examples/)The repo contains many runnable
.muscripts demonstrating the stages end-to-end. Here are a few tiny, illustrative snippets. These assume the MuMu/Lava runtime has the appropriate plugins loaded (e.g.extend("flow"),extend("net")).
extend("flow")
run = flow:compose(
flow:drain_async, # drain in the background
flow:chain(slog), # print items
flow:throttle(10), # non-blocking 10ms gate
flow:filter(x => x % 2) # keep odd numbers
)
run(step(1, 100)) # -> 1,3,5,...
extend("flow")
^delay = 1
pipe = flow:compose(
flow:drain_async,
flow:chain(_ => delay += 50),
flow:throttle_with(() => delay), # period grows after each emit
flow:chain(slog)
)
pipe(step(1, 20))
extend("flow")
^count = 0
flow:compose(
flow:drain_async,
flow:chain(_ => slog(++count)),
flow:stop_when(_ => count > 10)
)(step(1, 99999))
extend("flow")
pipeline = flow:compose(
flow:drain_async,
flow:chain(sput),
flow:throttle(10),
flow:read(64) # 64-byte chunks, UTF-8 lossy
)
pipeline("README.md")
extend("flow")
fs = flow:of(x => x * 10) # emits a single function
xs = step(1, 5) # 1..4
flow:to_array(flow:ap_zip(fs, xs)) # => [10, 20, 30, 40]
See
examples/for more: async drains with callbacks, net ping/LLDP demos, partial composition patterns, reduction, etc.
Cooperative scheduling
Every stage avoids blocking the interpreter loop. Throttles use AGAIN; async drains use tiny micro-bursts governed by LAVA_FLOW_BURST and LAVA_FLOW_BUDGET_US (defaults: 64 items, 500µs).
Typed collections
Collectors/transformers strive to produce the tightest possible array type (IntArray, FloatArray, StrArray). Heterogeneous results fall back to MixedArray (or error in strict collectors).
File I/O
flow:read reads raw bytes via a BufReader and converts each chunk to a string using UTF-8 lossy decoding for maximal robustness.
Correctness guards
For example, flow:throttle rejects negative periods; flow:filter enforces that predicates are functions; flow:to_array errors on mixed element types; flow:slice validates both skip and count.
src/
lib.rs # plugin entry point, function registrations
trans.rs # map (eager on 2-arg), and alias registration
filter.rs # predicate-based filter
slice.rs # windowing (skip/take)
to_array.rs # collectors
reduce.rs # eager reduce
throttle.rs # fixed-period non-blocking throttle
throttle_with.rs # dynamic-period non-blocking throttle
chain.rs # flatMap (value/iterator/transform to stream)
concat.rs # sequential concatenation
of.rs # one-shot iterator
empty.rs # empty iterator (EOF)
ap.rs # applicative: zip/cartesian
drain.rs # blocking drain to count
drain_async.rs # async micro-burst draining
drain_then.rs # async draining with required callback on EOF
read.rs # chunked file reader (UTF-8 lossy)
stop_when.rs # early termination stage
tests/ # MuMu specs covering compose, trans, slice, filter, throttle, etc.
examples/ # many runnable MuMu scripts (pipelines, net demos, etc.)
core-mumu = 0.9.0-rc.3 (aliased as mumu in code).host: enable for native .so/.dll builds; mirrors the MuMu host ABI.web: marker for WASM/web builds (no host-only deps)."AGAIN" and "NO_MORE_DATA" consistently across stages.Contributions are welcome! Please open issues or MRs on GitLab.
Before submitting, please:
This project is dual-licensed under MIT and Apache-2.0. You may choose either license. See LICENSE for details.