| Crates.io | maiko |
| lib.rs | maiko |
| version | 0.1.1 |
| created_at | 2025-08-14 16:11:29.361999+00 |
| updated_at | 2025-12-18 22:59:28.490818+00 |
| description | Lightweight event-driven actor runtime with topic-based pub/sub for Tokio |
| homepage | https://github.com/ddrcode/maiko |
| repository | https://github.com/ddrcode/maiko |
| max_upload_size | |
| id | 1795065 |
| size | 74,363 |
Maiko is a lightweight actor runtime for building event-driven concurrent systems in Rust. Unlike traditional actor frameworks (usually inspired by Erlang), Maiko actors communicate through topic-based pub/sub rather than direct addressing, making them loosely coupled and ideal for stream processing workloads.
Building complex Tokio applications often leads to channel spaghetti:
// Without Maiko: Manual channel orchestration
let (tx1, rx1) = mpsc::channel(32);
let (tx2, rx2) = mpsc::channel(32);
let (tx3, rx3) = mpsc::channel(32);
let tx1_clone = tx1.clone();
let tx2_clone = tx2.clone();
tokio::spawn(async move {
// Task A needs to send to B and C...
tx1_clone.send(data).await?;
tx2_clone.send(data).await?;
});
tokio::spawn(async move {
// Task B needs rx1 and tx3...
while let Some(msg) = rx1.recv().await {
tx3.send(process(msg)).await?;
}
});
// ... and it gets worse with more tasks
With Maiko, channels disappear from your code:
// Actors just subscribe to topics - Maiko handles all routing
sup.add_actor("task_a", |ctx| TaskA { ctx }, &[Topic::Input])?;
sup.add_actor("task_b", |ctx| TaskB { ctx }, &[Topic::Processed])?;
sup.add_actor("task_c", |ctx| TaskC { ctx }, &[Topic::Input, Topic::Processed])?;
// No manual channel creation, cloning, or wiring needed!
Maiko manages the entire channel topology internally, letting you focus on business logic instead of coordination.
Maiko (舞妓) are traditional Japanese performers known for their coordinated dances and artistic discipline. Like maiko who respond to music and each other in harmony, Maiko actors coordinate through events in the Tokio runtime.
Maiko excels at processing unidirectional event streams where actors don't need to know about each other:
Feature Comparison:
| Feature | Maiko | Actix | Ractor | Tokio Channels |
|---|---|---|---|---|
| Pub/Sub Topics | ✅ | ❌ | ❌ | ❌ |
| Actor Addressing | ❌ | ✅ | ✅ | N/A |
| Supervision Trees | ❌ | ✅ | ✅ | N/A |
| Loose Coupling | ✅ | ❌ | ❌ | ✅ |
| Event Metadata | ✅ | ❌ | ❌ | ❌ |
| Correlation Tracking | ✅ | ❌ | ❌ | ❌ |
| Type-Safe Routing | ✅ | ✅ | ✅ | ✅ |
| Learning Curve | Low | Medium | Low | Low |
Add Maiko to your Cargo.toml, by executing the following command:
cargo add maiko
use maiko::*;
// Define your events
#[derive(Event, Clone, Debug)]
enum MyEvent {
Hello(String),
}
// Create an actor
struct Greeter;
impl Actor for Greeter {
type Event = MyEvent;
async fn handle(&mut self, event: &Self::Event, meta: &Meta) -> Result<()> {
match event {
MyEvent::Hello(name) => {
println!("Hello, {}! (from {})", name, meta.actor_name());
}
}
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<()> {
let mut sup = Supervisor::<MyEvent>::default();
// Add actor and subscribe it to all topics (DefaultTopic)
sup.add_actor("greeter", |_ctx| Greeter, &[DefaultTopic])?;
sup.start().await?;
sup.send(MyEvent::Hello("World".into())).await?;
// Graceful shutdown (it attempts to process all events already in the queue)
sup.stop().await?;
Ok(())
}
See the examples/ directory for complete programs:
pingpong.rs - Simple event exchange between actorsguesser.rs - Multi-actor game with topics and timingRun examples with:
cargo run --example pingpong
cargo run --example guesser
Events are messages that flow through the system. They must implement the Event trait:
#[derive(Event, Clone, Debug)]
enum NetworkEvent {
PacketReceived(Vec<u8>),
ConnectionClosed(u32),
Error(String),
}
Topics route events to interested actors. Define custom topics for fine-grained control:
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
enum NetworkTopic {
Ingress,
Egress,
Control,
}
impl Topic<NetworkEvent> for NetworkTopic {
fn from_event(event: &NetworkEvent) -> Self {
match event {
NetworkEvent::PacketReceived(_) => NetworkTopic::Ingress,
NetworkEvent::ConnectionClosed(_) => NetworkTopic::Control,
NetworkEvent::Error(_) => NetworkTopic::Control,
}
}
}
Or use DefaultTopic to broadcast to all actors:
sup.add_actor("processor", factory, &[DefaultTopic])?;
Actors are independent units that process events asynchronously:
struct PacketProcessor {
ctx: Context<NetworkEvent>,
stats: PacketStats,
}
impl Actor for PacketProcessor {
type Event = NetworkEvent;
async fn on_start(&mut self) -> Result<()> {
println!("Processor starting...");
Ok(())
}
async fn handle(&mut self, event: &Self::Event, meta: &Meta) -> Result<()> {
match event {
NetworkEvent::PacketReceived(data) => {
self.stats.increment();
self.process_packet(data).await?;
}
_ => {}
}
Ok(())
}
async fn tick(&mut self) -> Result<()> {
// Called periodically when no events are queued
// Useful for polling external sources, timeouts, housekeeping, etc.
if self.stats.should_report() {
println!("Processed {} packets", self.stats.count);
}
Ok(())
}
async fn on_stop(&mut self) -> Result<()> {
println!("Final count: {}", self.stats.count);
Ok(())
}
}
The tick() method runs when the event queue is empty, making it perfect for:
The Context provides actors with capabilities:
// Send events to topics
ctx.send(NetworkEvent::PacketReceived(data)).await?;
// Send with correlation (for tracking related events)
ctx.send_child_event(NetworkEvent::Response(data)).await?;
// Check if system is still running
if !ctx.is_alive() {
return Ok(());
}
// Get actor's name
let name = ctx.name();
The Supervisor manages actor lifecycles:
let mut sup = Supervisor::<NetworkEvent>::new();
// Add actors with subscriptions
sup.add_actor("ingress", |ctx| IngressActor::new(ctx), &[NetworkTopic::Ingress])?;
sup.add_actor("egress", |ctx| EgressActor::new(ctx), &[NetworkTopic::Egress])?;
sup.add_actor("monitor", |ctx| MonitorActor::new(ctx), &[DefaultTopic])?;
// Start all actors
sup.start().await?;
// ... application runs ...
// Graceful shutdown
sup.stop().await?;
Maiko actors typically follow one of two patterns:
Handle-Heavy Actors (Event Processors):
// Telemetry actor - processes many incoming events
impl Actor for TelemetryCollector {
type Event = MetricEvent;
async fn handle(&mut self, event: &Self::Event, _meta: &Meta) -> Result<()> {
// Main logic here - process incoming metrics
self.export_to_otel(event).await?;
self.aggregate(event);
Ok(())
}
async fn tick(&mut self) -> Result<()> {
// Minimal - just periodic cleanup
self.flush_buffer().await
}
}
Tick-Heavy Actors (Event Producers):
// Stock data reader - polls external source, emits many events
impl Actor for StockReader {
type Event = StockEvent;
async fn handle(&mut self, event: &Self::Event, _meta: &Meta) -> Result<()> {
// Rarely receives events - maybe just control messages
if let StockEvent::Stop = event {
self.websocket.close().await?;
}
Ok(())
}
async fn tick(&mut self) -> Result<()> {
// Main logic here - poll WebSocket and emit events
if let Some(tick) = self.websocket.next().await {
let event = StockEvent::Tick(tick.symbol, tick.price);
self.ctx.send(event).await?;
}
Ok(())
}
}
Maiko actors don't know about each other. They only know about:
This is fundamentally different from Akka/Actix where actors have addresses:
// Traditional actors (tight coupling)
actor_ref.tell(message); // Must know the actor's address
// Maiko (loose coupling)
ctx.send(event).await?; // Only knows about event types
Events typically flow in one direction:
System Event → Parser → Validator → Processor → Logger
This makes Maiko ideal for pipeline architectures and stream processing.
That means Maiko may be not best suited for request-response patterns. Although req-resp is possible in theory (with two separate event types), it's not the primary use case, and solutions like Actix Web or Ractor are better suited for this.
The tick() method runs in a select! loop alongside event reception. What you .await inside determines when your actor wakes:
Pattern 1: Time-Based Producer
impl Actor for HeartbeatActor {
async fn tick(&mut self) -> Result<()> {
tokio::time::sleep(Duration::from_secs(5)).await; // Wakes every 5s
self.ctx.send(HeartbeatEvent).await
}
}
Pattern 2: External Event Source
impl Actor for WebSocketReader {
async fn tick(&mut self) -> Result<()> {
let frame = self.socket.read().await?; // Wakes when data arrives
self.ctx.send(FrameEvent(frame)).await
}
}
Pattern 3: Pure Event Processor
impl Actor for EventLogger {
async fn tick(&mut self) -> Result<()> {
self.ctx.pending().await // Never wakes - only handles events (default behavior)
}
async fn handle(&mut self, event: &Event, _meta: &Meta) -> Result<()> {
log::info!("Event: {:?}", event); // All logic in handle()
Ok(())
}
}
Pattern 4: Housekeeping After Events
impl Actor for BufferedWriter {
async fn tick(&mut self) -> Result<()> {
// Returns immediately - called after processing event batches
if self.buffer.len() > 100 {
self.flush().await?;
}
Ok(())
}
async fn handle(&mut self, event: &Event, _meta: &Meta) -> Result<()> {
self.buffer.push(event.clone());
Ok(())
}
}
Key insight: ctx.pending() is more ergonomic than std::future::pending() since it returns Result<()> to match the trait signature.
Track related events across actors:
async fn handle(&mut self, event: &Self::Event, meta: &Meta) -> Result<()> {
if let Some(correlation_id) = meta.correlation_id() {
println!("Event chain: {}", correlation_id);
}
// Child events inherit correlation
self.ctx.send_child_event(ResponseEvent::Ok).await?;
Ok(())
}
Control error propagation:
impl Actor for MyActor {
// ...
fn on_error(&self, error: Error) -> Result<()> {
match error {
Error::Recoverable(_) => {
eprintln!("Warning: {}", error);
Ok(()) // Swallow error, continue
}
Error::Fatal(_) => {
eprintln!("Fatal: {}", error);
Err(error) // Propagate error, stop actor
}
}
}
}
Fine-tune actor behavior:
let config = Config::default()
.with_channel_size(100) // Event queue size per actor
.with_max_events_per_tick(50); // Events processed per tick cycle
let mut sup = Supervisor::new(config);
Contributions are welcome! Please feel free to:
Maiko is 100% human-written code, crafted with passion for Rust and genuine love for coding. While AI tools have been valuable for architectural discussions, code reviews, and documentation, every line of implementation code comes from human creativity and expertise.
We believe in:
Contributors are expected to write their own code. AI may assist with reviews, discussions, and documentation, but implementations should reflect your own understanding and skills.
Miako is built with ❤️ and by humans, for humans 🦀
Inspired by:
Licensed under the MIT License.