| Crates.io | rs-sqs-receiver |
| lib.rs | rs-sqs-receiver |
| version | 0.1.1 |
| created_at | 2025-06-07 11:02:21.086409+00 |
| updated_at | 2025-06-07 12:57:01.887411+00 |
| description | An asynchronous AWS SQS message receiver framework with trait-based handlers and shared resource support |
| homepage | |
| repository | https://github.com/MRDavidYen/rs-sqs-receiver |
| max_upload_size | |
| id | 1703914 |
| size | 124,450 |
An asynchronous AWS SQS message receiver framework for Rust that abstracts SQS polling complexity and allows you to register custom message handlers with shared resources.
Add this to your Cargo.toml:
[dependencies]
rs-sqs-receiver = "0.1.0"
tokio = { version = "1.0", features = ["full"] }
The simplest way to start receiving messages:
use rs_sqs_receiver::{client::create_sqs_client_from_env, receiver::start_receive_queue};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = create_sqs_client_from_env().await;
let queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue";
let database_pool = "shared_db_connection".to_string();
start_receive_queue(
client,
queue_url,
database_pool,
|message, db_pool| async move {
println!("Processing message: {} with DB: {}", message, db_pool);
// Your message processing logic here
Ok(())
}
).await;
Ok(())
}
For more complex scenarios with multiple queues:
use rs_sqs_receiver::{client::create_sqs_client_from_env, receiver::AwsSqsReceiver};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = create_sqs_client_from_env().await;
let mut receiver = AwsSqsReceiver::new();
// Add handler with shared resources
let database_pool = "db_connection".to_string();
receiver.add_handler_fn(
"https://sqs.us-east-1.amazonaws.com/123456789012/orders-queue",
|message: String, db: String| async move {
println!("Processing order: {} with DB: {}", message, db);
Ok(())
},
database_pool,
None, // Optional config parameter
);
// Add simple handler (no shared resources)
receiver.add_simple_handler(
"https://sqs.us-east-1.amazonaws.com/123456789012/notifications-queue",
|message: String| async move {
println!("Sending notification: {}", message);
Ok(())
},
None, // Optional config parameter
);
// Start all handlers (this consumes the receiver)
receiver.start_all_handlers(client).await;
Ok(())
}
Set up your AWS credentials using environment variables:
export AWS_ACCESS_KEY_ID=your_access_key
export AWS_SECRET_ACCESS_KEY=your_secret_key
export AWS_REGION=us-east-1
use rs_sqs_receiver::client::create_sqs_client_with_credentials;
let client = create_sqs_client_with_credentials(
"your_access_key",
"your_secret_key",
"us-east-1"
);
You can customize SQS polling behavior using AwsSqsReceiverConfig:
use rs_sqs_receiver::{
client::create_sqs_client_from_env,
receiver::{AwsSqsReceiver, AwsSqsReceiverConfig}
};
let mut receiver = AwsSqsReceiver::new();
// Create custom configuration
let config = AwsSqsReceiverConfig {
max_number_of_messages: 5, // Receive up to 5 messages per poll (default: 10)
wait_time_seconds: 15, // Wait 15 seconds for messages (default: 20)
};
// Use config with handler
receiver.add_handler_fn(
"https://sqs.us-east-1.amazonaws.com/123456789012/my-queue",
|message: String, _: ()| async move {
println!("Processing: {}", message);
Ok(())
},
(),
Some(config), // Pass the config
);
// Or use default configuration by passing None
receiver.add_simple_handler(
"https://sqs.us-east-1.amazonaws.com/123456789012/other-queue",
|message: String| async move {
println!("Processing: {}", message);
Ok(())
},
None, // Use default config (10 messages, 20 second wait)
);
You can share any type that implements Send + Sync + Clone + 'static:
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Clone)]
struct AppState {
db_pool: Arc<DatabasePool>,
cache: Arc<Mutex<HashMap<String, String>>>,
config: AppConfig,
}
let shared_state = AppState {
db_pool: Arc::new(create_db_pool()),
cache: Arc::new(Mutex::new(HashMap::new())),
config: load_config(),
};
receiver.add_handler_fn(
queue_url,
|message: String, state: AppState| async move {
// Use state.db_pool, state.cache, etc.
Ok(())
},
shared_state,
None, // Optional config parameter
);
Handlers must return Result<(), AwsSqsReceiverError>:
use rs_sqs_receiver::errors::AwsSqsReceiverError;
receiver.add_handler_fn(
queue_url,
|message: String, _shared: ()| async move {
match process_message(&message).await {
Ok(_) => Ok(()),
Err(e) => {
eprintln!("Failed to process message: {}", e);
Err(AwsSqsReceiverError::ProcessingError(e.to_string()))
}
}
},
(),
None, // Optional config parameter
);
Use the shutdown-aware API for graceful application termination:
use tokio::sync::oneshot;
let mut receiver = AwsSqsReceiver::new();
// ... add handlers ...
let (shutdown_tx, shutdown_rx) = oneshot::channel();
// Start handlers with shutdown support
let handler_task = tokio::spawn(async move {
receiver.start_all_handlers_with_shutdown(client, shutdown_rx).await;
});
// Later, trigger shutdown
shutdown_tx.send(()).expect("Failed to send shutdown signal");
// Wait for graceful shutdown
handler_task.await.expect("Handler task failed");
start_receive_queue(): Functional API for single queue processingAwsSqsReceiver::new(): Create a new receiver instanceadd_handler_fn(queue_url, handler_fn, shared_resources, config): Add handler with shared resources and optional configadd_simple_handler(queue_url, handler_fn, config): Add handler without shared resources, with optional configstart_all_handlers(): Start all registered handlersstart_all_handlers_with_shutdown(): Start with graceful shutdown supportAwsSqsReceiverConfig: Configuration struct for customizing SQS polling behavior
max_number_of_messages: Maximum messages per poll (1-10, default: 10)wait_time_seconds: Long polling wait time in seconds (0-20, default: 20)create_sqs_client_from_env(): Create client from environment variablescreate_sqs_client_with_credentials(): Create client with explicit credentials# Build the project
cargo build
# Run tests
cargo test
# Run integration tests (requires AWS credentials)
TEST_SQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123456789012/test-queue cargo test
# Format code
cargo fmt
# Run clippy lints
cargo clippy
Integration tests require AWS credentials and a test SQS queue. Set the TEST_SQS_QUEUE_URL environment variable to your test queue URL.
aws-sdk-sqs and aws-config for SQS operationstokio with full featuresthiserror for ergonomic error typesasync-trait and futures for async abstractionsThis project is licensed under either of
at your option.
Contributions are welcome! Please feel free to submit a Pull Request.