| Crates.io | join_me_maybe |
| lib.rs | join_me_maybe |
| version | 0.4.0 |
| created_at | 2025-12-03 06:04:43.329589+00 |
| updated_at | 2026-01-21 06:06:28.407942+00 |
| description | an async `join!` macro with `select!`-like features |
| homepage | |
| repository | https://github.com/oconnor663/join_me_maybe |
| max_upload_size | |
| id | 1963428 |
| size | 74,424 |
join_me_maybe! join_me_maybe provides an expanded version of the futures::join!/tokio::join! macro,
with added features for cancellation and working with streams. Programs that need this sort of
control flow often resort to "select! in a loop" and/or "select! by reference", but those
come with a notoriously long list of footguns.[1][2][3]
The goal of join_me_maybe is to be more convenient and less error-prone than select! in its
most common applications. The stretch goal is to make the case that select!-by-reference in
particular isn't usually necessary and should be considered harmful.
The basic use case works like other join! macros, polling each of its arguments to completion
and returning their outputs in a tuple.
use join_me_maybe::join;
use tokio::time::{sleep, Duration};
// Create a couple futures, one that's ready immediately, and another that takes some time.
let future1 = std::future::ready(1);
let future2 = async { sleep(Duration::from_millis(100)).await; 2 };
// Run them concurrently and wait for both of them to finish.
let (a, b) = join!(future1, future2);
assert_eq!((a, b), (1, 2));
maybe cancellationIf you don't want to wait for all of your futures finish, you can use the maybe keyword. This
can be useful with infinite loops of background work that never actually exit. The outputs of
maybe futures are wrapped in Option.
let outputs = join!(
// This future isn't `maybe`, so we'll definitely wait for it to finish.
async { sleep(Duration::from_millis(100)).await; 1 },
// Same here.
async { sleep(Duration::from_millis(200)).await; 2 },
// We won't necessarily wait for this `maybe` future, but in practice it'll finish before
// the "definitely" futures above, and we'll get its output wrapped in `Some()`.
maybe async { sleep(Duration::from_millis(10)).await; 3 },
// This `maybe` future never finishes. We'll cancel it when the "definitely" work is done.
maybe async {
loop {
// Some periodic work...
sleep(Duration::from_millis(10)).await;
}
},
);
assert_eq!(outputs, (1, 2, Some(3), None));
label: and .cancel()You can also cancel futures by name if you label: them. The outputs of labeled futures are
wrapped in Option too.
let mutex = tokio::sync::Mutex::new(42);
let outputs = join!(
// The `foo:` label here means that all future expressions (including this one) have a
// `foo` object in scope, which provides a `.cancel()` method.
foo: async {
let mut guard = mutex.lock().await;
*guard += 1;
// Selfishly hold the lock for a long time.
sleep(Duration::from_secs(1_000_000)).await;
},
async {
// Give `foo` a little bit of time...
sleep(Duration::from_millis(100)).await;
if mutex.try_lock().is_err() {
// Hmm, `foo` is taking way too long. Cancel it!
foo.cancel();
}
// Cancelling `foo` drops it promptly, which releases the lock. Note that if it only
// stopped polling `foo`, but didn't drop it, this would be a deadlock. This is a
// common footgun with `select!`-in-a-loop.
*mutex.lock().await
},
);
assert_eq!(outputs, (None, 43));
A .cancel()ed future won't be polled again, and it'll be dropped promptly, freeing any locks
or other resources that it might be holding. Note that if a future cancels itself, its
execution still continues as normal after .cancel() returns, up until the next .await
point. This can be useful in closure bodies or nested async blocks, where return or break
doesn't work.
=>One of the powerful features of select! is that its arm bodies get exclusive mutable access
to the enclosing scope. join_me_maybe! supports an expanded => syntax that works similarly:
let mut counter = 0;
let output = join!(
n = std::future::ready(1) => {
counter += n;
42
},
m = async {
sleep(Duration::from_millis(1)).await;
1
} => counter += m, // Mutate the same `counter` in both arms.
);
assert_eq!(counter, 2);
// When a `=>` body is present, the output is the value of the body.
assert_eq!(output, (42, ()));
Also like select!, it's possible to .await or return in an arm body. (However, break or
continue in a containing loop are not supported.) Note that a return short-circuits the
whole containing function, not just the join!. This is useful for error handling:
async fn foo() -> std::io::Result<()> {
let _: ((), bool) = join!(
_ = std::future::ready(1) => {
sleep(Duration::from_millis(1)).await;
return Ok(()); // Return from `foo` (the whole function, not just the `join!`).
},
_ = std::future::ready(2) => {
std::fs::exists("fallible.txt")? // Error handling with `?` also works.
},
);
unreachable!("`return` short-circuits above.");
}
Shared mutation from different arm bodies wouldn't be possible if they ran concurrently.
Instead, join! only runs one arm body at a time. This is a potential source of surprising
timing bugs, and it's best to avoid .awaiting in arm bodies if you have a choice. However,
arm bodies and "scrutinees" (the futures to the left of the =>) run concurrently.
Similar to the => syntax for futures above, you can also drive a stream, using <pattern> in <stream> instead of <pattern> = <future>. In this case the following expression executes for
each item in the stream. As above, bodies get mutable access to the environment and can
.await or return:
use futures::stream;
let mut total = 0;
join!(
n in stream::iter([1, 2, 3]) => total += n,
m in stream::iter([4, 5, 6]) => total += m,
);
assert_eq!(total, 21);
You can optionally follow this syntax with the finally keyword and another expression that
executes after the stream is finished (if it's not cancelled). Streams have no return value by
default, but streams with a finally expression take the value of that expression:
let ret = join!(
// This stream has no `finally` expression, so it returns `()`.
_ in stream::iter([42]) => {},
// This arm's `finally` expression is `1`, and the `maybe` means we get `Some(1)`.
maybe _ in stream::iter([42]) => {} finally 1,
// Same, but without `maybe` we get the unwrapped value.
_ in stream::iter([42]) => {} finally 2,
// All the streams above finish immediately, so this `maybe` stream gets cancelled and
// returns `None` instead of evaluating its `finally` expression.
maybe _ in stream::iter([42]) => {} finally 3,
);
assert_eq!(ret, ((), Some(1), 2, None));
Here's an example of driving a stream together with label:/.cancel(), which works with
streams like it does with futures:
use futures::stream::{self, StreamExt};
let mut counter = 0;
join!(
my_stream: _ in stream::iter(0..5).then(async |_| {
sleep(Duration::from_millis(10)).await
}) => {
// This stream gets cancelled below, so this only executes three times.
counter += 1;
} finally {
// This stream gets cancelled below, so this will never execute.
counter += 1_000_000;
},
async {
// Wait long enough for the stream to yield three items, then cancel it.
sleep(Duration::from_millis(35)).await;
my_stream.cancel();
},
);
assert_eq!(counter, 3);
This feature is even more experimental than everything else above. In arm bodies
(blocks/expressions after => and finally), label: cancellers support an additional
method: with_pin_mut. This takes a closure and invokes it with an Option<Pin<&mut T>>
pointing to the corresponding future or stream. (None if that arm is already completed or
cancelled.) You can use this to mutate e.g. a FuturesUnordered or a StreamMap to add
more work to it while it's being polled. (Not literally while it's being polled -- everything
in a join! runs on one thread -- but while it's owned by join! and guaranteed not to be
"snoozed".) This is intended as an alternative to patterns that await futures by reference,
which tends to be prone to "snoozing" mistakes.
Unfortunately, streams that you can add work to dynamically are usually "poorly behaved" in the
sense that they often return Ready(None) for a while, until more work is eventually added and
they start returning Ready(Some(_)) again. This is at odds with the usual rule that you
shouldn't poll a stream again after it returns Ready(None), but it does work with
select!-in-a-loop. (In Tokio it requires an if guard, and with futures::select! it leans
on the "fused" requirement.) However, it does not naturally work with join_me_maybe, which
interprets Ready(None) as "end of stream" and promptly drops the whole stream. (Like it's
supposed to!) For a stream to work well with this feature, it needs to do two
things that as far as I know none of the dynamic streams currently do:
Ready(Some(_)) or Pending, until you somehow inform
it that no more work is coming, using e.g. a .close() method or something like that. After
that the stream should probably drain its remaining work before returning Ready(None). (If
the caller doesn't want to wait for remaining work, they can cancel the stream instead.)Pending, the
stream should stash a Waker and invoke it whenever work is added.Adapting a stream that doesn't behave this way is complicated and not obviously a good idea.
See tests/test.rs for some examples. Manually tracking Wakers is exactly the
sort of error-prone business that this crate wants to discourage, and this whole feature will
need a lot of baking before I can recommend it. However, this approach is necessary to solve
cases like Niko Matsakis' case study of pub-sub in mini-redis.
no_stdjoin_me_maybe doesn't heap allocate and is compatible with #![no_std].
As far as I know, there's no way for the join! macro to support something like this today
(this works in arm bodies, but not as here in the "scrutinee" position):
let mut x = 0;
join!(
async { x += 1; },
async { x += 1; }, // error: cannot borrow `x` as mutable more than once at a time
);
assert_eq!(x, 2);
The problem is that both futures want to capture &mut x, which violates the mutable aliasing
rule. However, that arguably borrows too much. Consider:
&mut x across an .await point. In other words,
the borrow checker would accept any interleaving of their "basic blocks" in a
single-threaded context.Instead of each inner future capturing &mut x, the outer join future could capture it once,
and the inner futures could "reborrow" it in some sense when they're polled. My guess is that
there's no practical way for a macro to express this in Rust today (corrections welcome!), but
the Rust compiler could add a hypothetical syntax like this:
let mut x = 0;
let mut y = 0;
concurrent_bikeshed {
{
// `x` is not borrowed across the `.await`...
x += 1;
// ...but `y` is.
let y_ref = &mut y;
sleep(Duration::from_secs(1)).await;
*y_ref += 1;
x += 1;
},
{
// Mutating `x` here does not conflict...
x += 1;
// ...but trying to mutate `y` here would conflict.
// y += 1;
},
}
Another big advantage of adding dedicated syntax for this is that it could support
return/break/continue as usual to diverge from inside any arm. That would be especially
helpful for error handling with ?, which is awkward in a lot of concurrent contexts today.