flow-mumu

Crates.ioflow-mumu
lib.rsflow-mumu
version0.2.0-rc.9
created_at2025-07-08 13:56:43.591551+00
updated_at2025-10-12 02:10:46.711483+00
descriptionStream transform tools plugin for the Lava language
homepagehttps://lava.nu11.uk
repositoryhttps://gitlab.com/tofo/flow-mumu
max_upload_size
id1742818
size194,143
(justifiedmumu)

documentation

README

mumu-flow — Stream transforms for the MuMu/Lava runtime

Version: 0.2.0-rc.9
Repository: https://gitlab.com/tofo/flow-mumu License: Dual MIT / Apache-2.0

mumu-flow is a dynamic plugin for the MuMu/Lava interpreter that provides a rich set of streaming combinators and pipeline utilities. It is designed to compose cleanly with MuMu’s iterators and the interpreter’s zero-arg “transform” functions, enabling non-blocking, partial-application-friendly dataflows.

This crate powers flow pipelines such as throttled I/O drains, functional mapping/filtering, compositional chaining, early termination, applicative application, and more — all written in ergonomic MuMu code.


Highlights at a glance

  • Non-blocking throttling
    flow:throttle(ms) and dynamic flow:throttle_with(fn) return AGAIN (temporary no-data) instead of sleeping, so other pipelines continue to run.

  • Async draining (micro-bursts)
    flow:drain_async and flow:drain_then(callback) drain in background poller ticks with item burst and time budget caps; AGAIN is respected as keep-alive.

  • Functional stages
    Mapping (flow:trans / flow:map), filtering (flow:filter), slicing/windowing (flow:slice), collection (flow:to_array), reduction (flow:reduce).

  • Flow control
    flow:stop_when(pred) terminates the stream without emitting the triggering item.

  • Combinators & utilities
    flow:chain (flatMap), flow:concat, flow:of (one-shot), flow:empty (immediate EOF), flow:ap / flow:ap_zip (applicatives), flow:read (chunked file reads).

  • First-class partials
    All stages support placeholders (_) and curried usage. Partials are re-entrant and combine cleanly via flow:compose.

  • Interoperability
    Accept Iterator handles or zero-arg transform functions as sources; return transforms suitable for further composition.


Mental model

Most primitives follow one of these shapes:

  • Stage constructor(…params…) => (source) => transform
    e.g. flow:filter(pred) returns a function that accepts an iterator/transform and yields a zero-arg transform that emits a filtered stream.

  • Eager stage (when fully applied)Value
    flow:trans(func, source) runs eagerly and returns a concrete array (typed where possible). With a single argument (flow:trans(func)) it returns a stage constructor.

  • Non-blocking gate → returns "AGAIN"
    Throttling stages never sleep; if the period hasn't elapsed they yield a temporary no-data error ("AGAIN"). Async drains and the host event loop understand this and keep the interpreter alive until the next window.

  • EOF → returns "NO_MORE_DATA"
    Signifies the upstream has been fully consumed for this run.

Placeholders _
Many bridges treat Value::Placeholder, "_", or ["_"] identically. Mixing positional values and _ lets you assemble pipelines incrementally.


Function catalog

Below is a practical overview of the exported functions (as named inside MuMu). Each function also exists as a variable of the same name to allow direct invocation.

Composition & control

Name Signature (informal) Notes
flow:compose (...stagesOrPartials)Function Chains stages right-to-left; supports _ as slot. Final arg can be “data” to apply immediately.
flow:concat (a, b)transform Emit a to completion then b. a/b are Iterator or transform.
flow:chain (mapper, source)transform Flat-map: mapper(item) may return value, iterator, or transform — result is flattened.
flow:stop_when (pred[, source])transform Stops the pipeline when pred(item) is truthy; does not emit the triggering item.
flow:empty ()transform Emits no data (EOF).
flow:of (value?)transform Emits value exactly once, then EOF.
flow:ap_zip (fs, xs)transform Zipped applicative: pull one function from fs and one value from xs, apply.
flow:ap (fs, xs)transform Cartesian applicative: buffer all xs, then apply each f ∈ fs to each x ∈ xs.

Functional transforms

Name Signature (informal) Notes
flow:trans / flow:map (fn[, source])transform or array With two args it is eager and returns an array; with one arg it returns a stage.
flow:filter (pred[, source])transform Predicate may return bool or number truthiness (!= 0).
flow:slice (skip, count[, source])transform Skips skip items, then emits up to count. Accepts iterator/transform sources.
flow:reduce (fn, init, source)Value Eager reduction: acc = fn(acc, item). Source may be iterator or transform.
flow:to_array (source)Array Collects a homogeneous typed array (int/float/str) or errors on mixed types.

Throttling & draining

Name Signature (informal) Notes
flow:throttle (ms[, source])transform Non-blocking gate per instance; first item passes immediately.
flow:throttle_with (period_fn[, source])transform Period is computed after each successful emit by calling period_fn().
flow:drain (source)int Blocking drain (in MuMu terms): pulls until EOF; returns count.
flow:drain_async (source[, callback])0 Spawns a background poller; micro-burst drain per tick. Respects AGAIN and avoids busy-wait.
flow:drain_then (source, callback)0 Like drain_async but callback is required and is invoked with the total count exactly once on EOF.

Tuning drain_async / drain_then
Set environment variables to adjust poller behaviour:
LAVA_FLOW_BURST (default 64) — max items per tick
LAVA_FLOW_BUDGET_US (default 500) — time budget per tick (µs)

File I/O

Name Signature (informal) Notes
flow:read (chunkSizeBytes, path)transform Reads fixed-size byte chunks from a file; each chunk is decoded via UTF-8 lossy and emitted as a string. EOF ⇒ "NO_MORE_DATA".

Behavioural details & contracts

Source & sink interoperability

Most stages accept either:

  • an Iterator handle (e.g., from step(start, end)), or
  • a zero-arg transform () => value | "NO_MORE_DATA".

Internally, the plugin normalizes everything to a transform via to_transform(…).

Non-blocking gates

  • Throttle stages (flow:throttle, flow:throttle_with) never sleep. If the period hasn’t elapsed they immediately return the string error "AGAIN".
  • Async drains treat an only-AGAIN tick as a keep-alive and return 1 so the scheduler doesn’t idle out just before the next window.

EOF & errors

  • EOF is signalled via the string error "NO_MORE_DATA".
  • Any other error is considered terminal by drainers and most stages.

Partials & placeholders

  • All stage constructors support _ as a placeholder so you can incrementally build up a chain:
    p = flow:filter(_, _)       # both slots open
    p = p(x => x % 2 == 0)      # fill predicate
    tf = p(ink(1,10))           # fill source
    
  • Partials are re-entrant: each call returns a fresh partial with the updated defaults.

Eager vs. lazy mapping

  • With two arguments, flow:trans(fn, source) runs now and returns a typed array.
  • With one argument, flow:trans(fn) returns a stage to be used inside flow:compose or called later.

flow:throttle_with(period_fn, source)

  • period_fn is invoked only after a successful emission; the next allowed instant becomes now + ms.
  • The first item is always allowed immediately.
  • period_fn must return a non-negative number (int/long/float).

flow:stop_when(pred)

  • When pred(item) is truthy the stage terminates and does not emit the triggering item (NO_MORE_DATA).

Examples (from examples/)

The repo contains many runnable .mu scripts demonstrating the stages end-to-end. Here are a few tiny, illustrative snippets. These assume the MuMu/Lava runtime has the appropriate plugins loaded (e.g. extend("flow"), extend("net")).

Transform + Filter + Throttle + Drain

extend("flow")

run = flow:compose(
  flow:drain_async,          # drain in the background
  flow:chain(slog),          # print items
  flow:throttle(10),         # non-blocking 10ms gate
  flow:filter(x => x % 2)    # keep odd numbers
)

run(step(1, 100))            # -> 1,3,5,...

Dynamic period throttling

extend("flow")

^delay = 1
pipe = flow:compose(
  flow:drain_async,
  flow:chain(_ => delay += 50),
  flow:throttle_with(() => delay),  # period grows after each emit
  flow:chain(slog)
)

pipe(step(1, 20))

Stop on condition

extend("flow")

^count = 0
flow:compose(
  flow:drain_async,
  flow:chain(_ => slog(++count)),
  flow:stop_when(_ => count > 10)
)(step(1, 99999))

Chunked file reading

extend("flow")

pipeline = flow:compose(
  flow:drain_async,
  flow:chain(sput),
  flow:throttle(10),
  flow:read(64)            # 64-byte chunks, UTF-8 lossy
)

pipeline("README.md")

Zipped applicative

extend("flow")

fs = flow:of(x => x * 10)       # emits a single function
xs = step(1, 5)                  # 1..4

flow:to_array(flow:ap_zip(fs, xs))   # => [10, 20, 30, 40]

See examples/ for more: async drains with callbacks, net ping/LLDP demos, partial composition patterns, reduction, etc.


Design notes

  • Cooperative scheduling
    Every stage avoids blocking the interpreter loop. Throttles use AGAIN; async drains use tiny micro-bursts governed by LAVA_FLOW_BURST and LAVA_FLOW_BUDGET_US (defaults: 64 items, 500µs).

  • Typed collections
    Collectors/transformers strive to produce the tightest possible array type (IntArray, FloatArray, StrArray). Heterogeneous results fall back to MixedArray (or error in strict collectors).

  • File I/O
    flow:read reads raw bytes via a BufReader and converts each chunk to a string using UTF-8 lossy decoding for maximal robustness.

  • Correctness guards
    For example, flow:throttle rejects negative periods; flow:filter enforces that predicates are functions; flow:to_array errors on mixed element types; flow:slice validates both skip and count.


Project structure

src/
  lib.rs            # plugin entry point, function registrations
  trans.rs          # map (eager on 2-arg), and alias registration
  filter.rs         # predicate-based filter
  slice.rs          # windowing (skip/take)
  to_array.rs       # collectors
  reduce.rs         # eager reduce
  throttle.rs       # fixed-period non-blocking throttle
  throttle_with.rs  # dynamic-period non-blocking throttle
  chain.rs          # flatMap (value/iterator/transform to stream)
  concat.rs         # sequential concatenation
  of.rs             # one-shot iterator
  empty.rs          # empty iterator (EOF)
  ap.rs             # applicative: zip/cartesian
  drain.rs          # blocking drain to count
  drain_async.rs    # async micro-burst draining
  drain_then.rs     # async draining with required callback on EOF
  read.rs           # chunked file reader (UTF-8 lossy)
  stop_when.rs      # early termination stage
tests/              # MuMu specs covering compose, trans, slice, filter, throttle, etc.
examples/           # many runnable MuMu scripts (pipelines, net demos, etc.)

Compatibility & features

  • MuMu core: targets core-mumu = 0.9.0-rc.3 (aliased as mumu in code).
  • Features:
    • Default: no features (lightweight; avoids host-only deps).
    • host: enable for native .so/.dll builds; mirrors the MuMu host ABI.
    • web: marker for WASM/web builds (no host-only deps).
  • Error constants: uses string errors "AGAIN" and "NO_MORE_DATA" consistently across stages.

Contributing

Contributions are welcome! Please open issues or MRs on GitLab.

  • Repository: https://gitlab.com/tofo/mumu-flow
  • Primary author: Tom Fotheringham
  • Contributors: the many folks filing issues, writing tests, and improving the MuMu ecosystem — thank you ❤️

Before submitting, please:

  • keep functions non-blocking and compatible with the interpreter’s poll loop,
  • prefer partials and underscore placeholders where it improves ergonomics,
  • include examples and tests for new stages.

License

This project is dual-licensed under MIT and Apache-2.0. You may choose either license. See LICENSE for details.


Commit count: 0

cargo fmt