| Crates.io | bevy_event_bus_derive |
| lib.rs | bevy_event_bus_derive |
| version | 0.2.0 |
| created_at | 2025-09-18 10:38:30.016252+00 |
| updated_at | 2025-09-19 08:57:17.746274+00 |
| description | Procedural macros for bevy_event_bus |
| homepage | https://github.com/JViggiani/bevy_event_bus |
| repository | https://github.com/JViggiani/bevy_event_bus |
| max_upload_size | |
| id | 1844546 |
| size | 18,170 |
A Bevy plugin that connects Bevy's event system to external message brokers like Kafka.
Seamless integration with Bevy's event system
Automatic event registration
ExternalBusEvent on your event typesSerialize and Deserialize from serde for external broker compatibilityTopic-based messaging
Error handling
let _ = writer.write(...))Backends
Add to your Cargo.toml:
[dependencies]
bevy_event_bus = "0.1"
With Kafka support:
[dependencies]
bevy_event_bus = { version = "0.1", features = ["kafka"] }
use bevy::prelude::*;
use bevy_event_bus::prelude::*;
use serde::{Deserialize, Serialize};
// Define an event - no manual registration needed!
#[derive(ExternalBusEvent, Serialize, Deserialize, Clone, Debug)]
struct PlayerLevelUpEvent {
entity_id: u64,
new_level: u32,
}
use bevy::prelude::*;
use bevy_event_bus::prelude::*;
fn main() {
// Create a Kafka configuration
let kafka_config = KafkaConfig {
bootstrap_servers: "localhost:9092".to_string(),
group_id: "bevy_game".to_string(),
..Default::default()
};
// Create the Kafka backend
let kafka_backend = KafkaEventBusBackend::new(kafka_config);
App::new()
.add_plugins(EventBusPlugins(kafka_backend))
.add_systems(Update, (player_level_up_system, handle_level_ups))
.run();
}
// System that sends events
fn player_level_up_system(
mut ev_writer: EventBusWriter<PlayerLevelUpEvent>,
query: Query<(Entity, &PlayerXp, &PlayerLevel)>,
) {
for (entity, xp, level) in query.iter() {
if xp.0 >= level.next_level_requirement {
// Send to specific topic - will also trigger Bevy event system
let _ = ev_writer.write(
"game-events.level-up",
PlayerLevelUpEvent {
entity_id: entity.to_bits(),
new_level: level.0 + 1
}
);
}
}
}
// System that receives events
fn handle_level_ups(mut ev_reader: EventBusReader<PlayerLevelUpEvent>) {
for event in ev_reader.read("game-events.level-up") {
println!("Entity {} leveled up to level {}!", event.entity_id, event.new_level);
}
}
The event bus provides comprehensive error handling for both sending and receiving events. All errors are reported as Bevy events, allowing you to handle them in your systems.
When sending events, errors can occur asynchronously and handled with Bevy's inbuilt event system:
fn handle_delivery_errors(
mut delivery_errors: EventReader<EventBusError<PlayerLevelUpEvent>>,
) {
for error in delivery_errors.read() {
if error.error_type == EventBusErrorType::DeliveryFailure {
error!(
"Message delivery failed to topic '{}' ({}): {}",
error.topic,
error.backend.as_deref().unwrap_or("unknown"),
error.error_message
);
// NOTE: delivery errors don't have the original event available
// Implement your error handling logic:
// - Retry mechanisms
// - Circuit breaker patterns
// - Alerting systems
// - Fallback strategies
}
}
}
When receiving events, deserialization failures are reported as EventBusDecodeError events:
fn handle_read_errors(mut decode_errors: EventReader<EventBusDecodeError>) {
for error in decode_errors.read() {
warn!(
"Failed to decode message from topic '{}' using decoder '{}': {}",
error.topic, error.decoder_name, error.error_message
);
// You can access the raw message bytes for debugging
debug!("Raw payload size: {} bytes", error.raw_payload.len());
// Handle decoding errors:
// - Log malformed messages for debugging
// - Implement message format migration logic
// - Track error rates for monitoring
// - Skip corrupted messages gracefully
}
}
Add all error handling systems to your app:
use bevy_event_bus::prelude::*;
fn main() {
App::new()
.add_plugins(EventBusPlugins(kafka_backend))
.add_systems(Update, (
send_events_with_error_handling,
handle_delivery_errors,
handle_read_errors,
// Your other systems...
))
.run();
}
If you don't want to handle errors explicitly, simply don't add error handling systems. The errors will still be logged as warnings but won't affect your application flow:
// Simple usage without explicit error handling
fn simple_event_sending(mut ev_writer: EventBusWriter<PlayerLevelUpEvent>) {
ev_writer.write("game-events.level-up", PlayerLevelUpEvent {
entity_id: 123,
new_level: 5
});
// Errors are logged but don't need to be handled
}
use std::collections::HashMap;
use bevy_event_bus::prelude::*;
let config = KafkaConfig {
bootstrap_servers: "localhost:9092".to_string(),
group_id: "bevy_game".to_string(),
client_id: Some("game-client".to_string()),
timeout_ms: 5000,
additional_config: HashMap::new(),
};
let kafka_backend = KafkaEventBusBackend::new(config);
Reading from a topic automatically subscribes the consumer to that topic on first use.
Use additional_config to pass through arbitrary librdkafka properties (e.g. security, retries, acks).
Common keys:
enable.idempotence=truemessage.timeout.ms=5000 (already set on producer)security.protocol=SSLssl.ca.location=/path/to/ca.pemssl.certificate.location=/path/to/cert.pemssl.key.location=/path/to/key.pemYou can spin up a single-node Kafka (KRaft) automatically in tests. The test harness will:
bitnami/kafka:latest exposing 9092 if KAFKA_BOOTSTRAP_SERVERS not set.Manual run:
docker run -d --rm --name bevy_event_bus_kafka -p 9092:9092 \
-e KAFKA_ENABLE_KRAFT=yes \
-e KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv \
-e KAFKA_CFG_PROCESS_ROLES=broker,controller \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
-e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true \
bitnami/kafka:latest
Set KAFKA_BOOTSTRAP_SERVERS to override (e.g. in CI):
export KAFKA_BOOTSTRAP_SERVERS=my-broker:9092
Integration tests use the docker harness or external broker. They generate unique topic names per run to avoid offset collisions.
use std::collections::HashMap;
use bevy_event_bus::prelude::*;
let config = KafkaConfig {
bootstrap_servers: "localhost:9092".to_string(),
group_id: "bevy_game".to_string(),
client_id: Some("game-client".to_string()),
timeout_ms: 5000,
additional_config: HashMap::new(),
};
let kafka_backend = KafkaEventBusBackend::new(config);
The library includes comprehensive performance tests to measure throughput and latency under various conditions.
# Run all performance benchmarks
./run_performance_tests.sh
# Run specific test
./run_performance_tests.sh test_message_throughput
# Results are automatically saved to event_bus_perf_results.csv
Test: test_message_throughput | Send Rate: 76373 msg/s | Receive Rate: 78000 msg/s | Payload: 100 bytes
Test: test_high_volume_small_messages | Send Rate: 73742 msg/s | Receive Rate: 74083 msg/s | Payload: 20 bytes
Test: test_large_message_throughput | Send Rate: 8234 msg/s | Receive Rate: 8156 msg/s | Payload: 10000 bytes
The performance tests measure:
Performance results are tracked over time with git commit hashes for regression analysis.