| Crates.io | dynamo_table |
| lib.rs | dynamo_table |
| version | 0.1.0 |
| created_at | 2025-11-30 12:57:15.929806+00 |
| updated_at | 2025-11-30 12:57:15.929806+00 |
| description | A high-level DynamoDB table abstraction with get_item, query, update, filter, batch operations, pagination, and type-safe queries |
| homepage | |
| repository | https://github.com/quietscroll/dynamo-table |
| max_upload_size | |
| id | 1958197 |
| size | 333,651 |
A high-level, type-safe DynamoDB table abstraction for Rust with support for batch operations, pagination, Global Secondary Indexes, and more.
serde for automatic serializationtokio and aws-sdk-dynamodbAdd to your Cargo.toml:
[dependencies]
dynamo_table = "0.1"
aws-config = "1"
aws-sdk-dynamodb = "1"
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
The client automatically initializes with sensible defaults on first use:
use dynamo_table::DynamoTable;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct User {
user_id: String,
email: String,
name: String,
}
impl DynamoTable for User {
type PK = String;
type SK = String;
const TABLE: &'static str = "users";
const PARTITION_KEY: &'static str = "user_id";
const SORT_KEY: Option<&'static str> = None;
fn partition_key(&self) -> Self::PK {
self.user_id.clone()
}
}
#[tokio::main]
async fn main() -> Result<(), dynamo_table::Error> {
// No initialization needed! Just use the table methods directly
let user = User {
user_id: "user123".to_string(),
email: "user@example.com".to_string(),
name: "John Doe".to_string(),
};
user.add_item().await?;
let retrieved = User::get_item(&"user123".to_string(), None).await?;
println!("Retrieved: {:?}", retrieved);
Ok(())
}
Auto-initialization provides:
AWS_PROFILE=localstack environment variableFor production applications, customize the client configuration:
use dynamo_table::{init, defaults, BehaviorVersion, Region};
#[tokio::main]
async fn main() {
// Initialize once at application startup with custom config
let config = defaults(BehaviorVersion::latest())
.region(Region::new("us-west-2"))
.load()
.await;
init(&config).await;
// Now use your tables with the custom configuration!
}
Or with a custom client:
use dynamo_table::init_with_client;
#[tokio::main]
async fn main() {
let config = aws_config::load_from_env().await;
let client = aws_sdk_dynamodb::Client::new(&config);
init_with_client(client).await;
}
use dynamo_table::DynamoTable;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct User {
user_id: String,
email: String,
name: String,
created_at: i64,
}
impl DynamoTable for User {
type PK = String; // Partition key type
type SK = String; // Sort key type (use String for no sort key)
const TABLE: &'static str = "users";
const PARTITION_KEY: &'static str = "user_id";
const SORT_KEY: Option<&'static str> = None;
fn partition_key(&self) -> Self::PK {
self.user_id.clone()
}
}
use dynamo_table::Error;
async fn crud_examples() -> Result<(), Error> {
// CREATE - Add an item
let user = User {
user_id: "user123".to_string(),
email: "user@example.com".to_string(),
name: "John Doe".to_string(),
created_at: 1234567890,
};
user.add_item().await?;
// READ - Get a single item
let retrieved = User::get_item(&"user123".to_string(), None).await?;
if let Some(user) = retrieved {
println!("Found user: {:?}", user);
}
// UPDATE - Update item fields
use serde_json::json;
let updates = json!({
"name": "Jane Doe",
"email": "jane@example.com"
});
user.update_item(updates).await?;
// DELETE - Remove an item
User::delete_item("user123".to_string(), None).await?;
// Or delete using the item itself
user.destroy_item().await?;
Ok(())
}
async fn query_examples() -> Result<(), Error> {
// Query all items with a partition key
let result = User::query_items(
&"user123".to_string(),
None, // No sort key filter
Some(10), // Limit to 10 items
None, // No pagination cursor
).await?;
for user in result.items {
println!("User: {:?}", user);
}
// Check if there are more results
if let Some(cursor) = result.last_evaluated_key {
println!("More results available, cursor: {:?}", cursor);
}
// Query a single item by partition key
let single = User::query_item(&"user123".to_string()).await?;
// Count items for a partition key
let count = User::count_items(&"user123".to_string()).await?;
println!("Found {} users", count);
Ok(())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Order {
user_id: String,
order_id: String,
total: f64,
status: String,
created_at: String,
}
impl DynamoTable for Order {
type PK = String;
type SK = String;
const TABLE: &'static str = "orders";
const PARTITION_KEY: &'static str = "user_id";
const SORT_KEY: Option<&'static str> = Some("order_id");
fn partition_key(&self) -> Self::PK {
self.user_id.clone()
}
fn sort_key(&self) -> Option<Self::SK> {
Some(self.order_id.clone())
}
}
async fn composite_key_examples() -> Result<(), Error> {
// Get all orders for a user
let orders = Order::query_items(
&"user123".to_string(),
None,
Some(20),
None,
).await?;
// Get specific order
let order = Order::get_item(
&"user123".to_string(),
Some(&"order456".to_string()),
).await?;
// Query with sort key pattern - orders beginning with "2024-"
let orders_2024 = Order::query_begins_with(
&"user123".to_string(),
"2024-",
Some(10),
None,
true, // ascending order
).await?;
// Query range - orders between two IDs
let orders_range = Order::query_between(
&"user123".to_string(),
"order100",
"order200",
Some(50),
None,
true,
).await?;
// Reverse query (newest first if sort key is timestamp)
let recent_orders = Order::reverse_query_items(
&"user123".to_string(),
None,
Some(5), // Last 5 orders
None,
).await?;
Ok(())
}
Use GSI to query your data by alternate keys:
use dynamo_table::GSITable;
impl GSITable for User {
const GSI_PARTITION_KEY: &'static str = "email";
const GSI_SORT_KEY: Option<&'static str> = None;
fn gsi_partition_key(&self) -> String {
self.email.clone()
}
fn gsi_sort_key(&self) -> Option<String> {
None
}
}
async fn gsi_examples() -> Result<(), Error> {
// Query by email using GSI
let result = User::query_gsi_items(
"user@example.com".to_string(),
None,
Some(10),
None,
).await?;
// Query single item by GSI
let user = User::query_gsi_item(
"user@example.com".to_string(),
None,
).await?;
// Count items by GSI key
let count = User::count_gsi_items("user@example.com".to_string()).await?;
// Reverse query on GSI
let results = User::reverse_query_gsi_items(
"user@example.com".to_string(),
None,
Some(10),
None,
).await?;
Ok(())
}
Efficiently process multiple items in a single request:
async fn batch_examples() -> Result<(), Error> {
// Batch Get - Retrieve multiple items
let keys = vec![
("user1".to_string(), None),
("user2".to_string(), None),
("user3".to_string(), None),
];
let result = User::batch_get(keys).await?;
println!("Retrieved {} items", result.items.len());
if !result.failed_keys.is_empty() {
println!("Failed to retrieve: {:?}", result.failed_keys);
}
// Batch Write - Insert/update multiple items
let new_users = vec![user1, user2, user3];
let write_result = User::batch_upsert(new_users).await?;
println!("Wrote {} items in {:?}",
write_result.items.len(),
write_result.execution_time
);
// Batch Delete - Remove multiple items
let users_to_delete = vec![user1, user2];
let delete_result = User::batch_delete(users_to_delete).await?;
// Mixed batch - Write and delete in same call
let batch_result = batch_write(
vec![user_to_write1, user_to_write2], // Items to put
vec![user_to_delete1, user_to_delete2], // Items to delete
).await?;
Ok(())
}
Use streams to process large datasets without loading everything into memory:
use futures_util::StreamExt;
async fn streaming_examples() -> Result<(), Error> {
// Stream query results
let stream = User::query_stream(
&"user123".to_string(),
None,
Some(100), // Page size
);
tokio::pin!(stream);
let mut count = 0;
while let Some(result) = stream.next().await {
match result {
Ok(user) => {
println!("Processing user: {:?}", user);
count += 1;
}
Err(e) => eprintln!("Error: {}", e),
}
}
println!("Processed {} users total", count);
// Stream with filter
let stream_with_filter = User::query_stream_with_filter(
&"user123".to_string(),
None,
Some(50),
"status = :active".to_string(),
json!({ ":active": "active" }),
);
Ok(())
}
Apply filters to query results:
use serde_json::json;
async fn filter_examples() -> Result<(), Error> {
// Query with filter
let active_users = User::query_items_with_filter(
&"user123".to_string(),
None,
Some(50),
None,
"status = :status AND created_at > :timestamp".to_string(),
json!({
":status": "active",
":timestamp": 1700000000
}),
).await?;
// Scan with filter
let premium_users = User::scan_items_with_filter(
Some(100),
None,
"subscription_tier = :tier".to_string(),
json!({ ":tier": "premium" }),
).await?;
// GSI query with filter
let filtered_gsi = User::query_gsi_items_with_filter(
"user@example.com".to_string(),
None,
None,
Some(20),
true,
"account_status = :status".to_string(),
json!({ ":status": "verified" }),
).await?;
Ok(())
}
Prevent race conditions with conditional expressions:
use serde_json::json;
async fn conditional_update_examples() -> Result<(), Error> {
// Only update if item exists and status is pending
let updates = json!({
"status": "active",
"updated_at": 1234567890
});
let result = user.update_item_with_condition(
updates,
Some("attribute_exists(user_id) AND #status = :old_status".to_string()),
Some(json!({ ":old_status": "pending" })),
).await;
match result {
Ok(_) => println!("Updated successfully"),
Err(e) if e.is_conditional_check_failed() => {
println!("Condition failed - concurrent modification or wrong state");
}
Err(e) => return Err(e),
}
// Prevent overwriting existing items
let new_user = User {
user_id: "newuser".to_string(),
email: "new@example.com".to_string(),
name: "New User".to_string(),
};
match new_user.add_item_with_condition(
Some("attribute_not_exists(user_id)".to_string()),
None,
).await {
Ok(_) => println!("Created new user"),
Err(e) if e.is_conditional_check_failed() => {
println!("User already exists");
}
Err(e) => return Err(e),
}
Ok(())
}
Increment numeric fields atomically:
async fn counter_examples() -> Result<(), Error> {
// Increment single field
User::increment_multiple(
&"user123".to_string(),
None,
&[("login_count", 1)],
).await?;
// Increment multiple fields atomically
User::increment_multiple(
&"user123".to_string(),
None,
&[
("login_count", 1),
("points", 10),
("streak", 1),
],
).await?;
Ok(())
}
Use scans sparingly for full table operations:
async fn scan_examples() -> Result<(), Error> {
// Basic scan
let result = User::scan_items(Some(100), None).await?;
for user in result.items {
println!("User: {:?}", user);
}
// Paginated scan
let mut cursor = None;
loop {
let result = User::scan_items(Some(50), cursor).await?;
// Process items
for user in result.items {
println!("Processing: {:?}", user);
}
// Check for more pages
match result.last_evaluated_key {
Some(key) => cursor = Some(key),
None => break,
}
}
Ok(())
}
Override defaults for individual tables:
use std::time::Duration;
use dynamo_table::RetryConfig;
impl DynamoTable for User {
// Required fields...
type PK = String;
type SK = String;
const TABLE: &'static str = "users";
const PARTITION_KEY: &'static str = "user_id";
fn partition_key(&self) -> Self::PK {
self.user_id.clone()
}
// Optional: Custom page size (default: 10)
const DEFAULT_PAGE_SIZE: u16 = 50;
// Optional: Custom retry configuration (default: 2 retries)
const BATCH_RETRIES_CONFIG: RetryConfig = RetryConfig {
max_retries: 3,
initial_delay: Duration::from_millis(200),
max_delay: Duration::from_secs(5),
};
}
use dynamo_table::{RetryConfig, RetryMode, TimeoutConfig};
use std::time::Duration;
#[tokio::main]
async fn main() {
let timeout_config = TimeoutConfig::builder()
.connect_timeout(Duration::from_secs(5))
.read_timeout(Duration::from_secs(30))
.operation_timeout(Duration::from_secs(90))
.build();
let retry_config = RetryConfig::standard()
.with_max_attempts(5)
.with_initial_backoff(Duration::from_millis(500));
let config = dynamo_table::defaults(dynamo_table::BehaviorVersion::latest())
.region(dynamo_table::Region::new("us-east-1"))
.retry_config(retry_config)
.timeout_config(timeout_config)
.load()
.await;
dynamo_table::init(&config).await;
}
The library automatically detects LocalStack when AWS_PROFILE=localstack:
export AWS_PROFILE=localstack
cargo test
#[cfg(test)]
mod tests {
use super::*;
async fn test_client() {
let config = aws_config::from_env()
.endpoint_url("http://localhost:4566")
.load()
.await;
let client = aws_sdk_dynamodb::Client::new(&config);
dynamo_table::init_with_client(client).await;
}
#[tokio::test]
async fn test_user_operations() {
test_client().await;
let user = User {
user_id: "test_user".to_string(),
email: "test@example.com".to_string(),
name: "Test User".to_string(),
};
user.add_item().await.unwrap();
let retrieved = User::get_item(&user.user_id, None).await.unwrap();
assert!(retrieved.is_some());
}
}
The library provides a comprehensive Error type with helper methods:
use dynamo_table::Error;
async fn error_handling_examples() -> Result<(), Error> {
match User::update_item(updates).await {
Ok(output) => println!("Success: {:?}", output),
Err(e) if e.is_conditional_check_failed() => {
// Handle optimistic locking failure
println!("Concurrent modification detected");
}
Err(e) if e.is_serialization_error() => {
// Handle serialization issues
eprintln!("Data serialization failed: {}", e);
}
Err(e) if e.is_dynamodb_error() => {
// Handle DynamoDB service errors
eprintln!("DynamoDB error: {}", e);
}
Err(e) => return Err(e),
}
Ok(())
}
Basic Operations:
add_item() - Put an item into the tableget_item(pk, sk) - Retrieve a single itemdelete_item(pk, sk) - Delete an item by keydestroy_item(self) - Delete using the item itselfupdate_item(updates) - Update an itemupdate_item_with_condition(...) - Conditional updateQuery Operations:
query_items(pk, sk, limit, cursor) - Query items by partition keyquery_item(pk) - Query single itemreverse_query_items(...) - Query in descending orderquery_items_with_filter(...) - Query with filter expressionquery_stream(...) - Stream query resultsquery_begins_with(...) - Query with sort key prefixquery_between(...) - Query sort key rangecount_items(pk) - Count items for partition keyBatch Operations:
batch_get(keys) - Batch get multiple itemsbatch_upsert(items) - Batch write multiple itemsbatch_delete(items) - Batch delete multiple itemsScan Operations:
scan_items(limit, cursor) - Scan the tablescan_items_with_filter(...) - Scan with filterUtility:
increment_multiple(pk, sk, fields) - Atomic counter operationsdynamodb_client() - Get client for this table (can be overridden)query_gsi_items(...) - Query using Global Secondary Indexquery_gsi_item(...) - Query single item by GSIreverse_query_gsi_items(...) - Reverse query on GSIquery_gsi_items_with_filter(...) - Query GSI with filtercount_gsi_items(...) - Count items by GSI keySee the examples/ directory for more comprehensive examples:
batch_write_with_retry.rs - Advanced batch operation patternsLicensed under either of:
at your option.
Contributions are welcome! Please feel free to submit a Pull Request.
This library was extracted from a production application's storage layer and represents battle-tested patterns for DynamoDB access in Rust. It emphasizes type safety, ergonomics, and production-ready defaults while maintaining flexibility for custom configurations.