| Crates.io | aws_utils_kinesis_data_streams |
| lib.rs | aws_utils_kinesis_data_streams |
| version | 0.3.0 |
| created_at | 2025-07-10 09:17:23.752313+00 |
| updated_at | 2025-09-16 23:39:44.916949+00 |
| description | AWS Kinesis Data Streams utilities for Rust |
| homepage | https://github.com/UniqueVision/utilities.aws-utils |
| repository | https://github.com/UniqueVision/utilities.aws-utils |
| max_upload_size | |
| id | 1746065 |
| size | 93,110 |
A Rust library providing utilities for AWS Kinesis Data Streams operations with built-in retry logic and batch processing capabilities.
Add this to your Cargo.toml:
[dependencies]
kinesis_data_streams = "0.1.0"
use kinesis_data_streams::{make_client_with_timeout_default, kinesis_data_stream};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a Kinesis client with default timeout settings
let client = make_client_with_timeout_default(None).await;
// Put a single record
let result = kinesis_data_streams::add_record(
&client,
"my-stream",
"partition-key",
"Hello, Kinesis!".to_string(),
).await?;
println!("Record added with sequence number: {}", result.sequence_number());
Ok(())
}
use kinesis_data_streams::{make_client_with_timeout_default, kinesis_data_stream, RecordsBuilder};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = make_client_with_timeout_default(None).await;
// Build a batch of records
let mut builder = RecordsBuilder::new();
builder.add_entry_data("Record 1".to_string())?;
builder.add_entry_data("Record 2".to_string())?;
builder.add_entry("Record 3".to_string(), Some("custom-partition".to_string()), None)?;
// Send the batch
let records = builder.build();
let result = kinesis_data_streams::add_records(&client, "my-stream", records).await?;
println!("Batch sent with {} failed records", result.failed_record_count().unwrap_or(0));
Ok(())
}
use kinesis_data_streams::make_client_with_timeout_default;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Use a custom endpoint (e.g., for LocalStack)
let client = make_client_with_timeout_default(Some("http://localhost:4566".to_string())).await;
// Your Kinesis operations here...
Ok(())
}
use kinesis_data_streams::{make_client, make_client_with_timeout, make_client_with_timeout_default};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Use default timeout settings (recommended)
let client = make_client_with_timeout_default(None).await;
// Use custom timeout settings
let client = make_client_with_timeout(
None, // endpoint_url
Some(Duration::from_secs(3100)), // connect_timeout
Some(Duration::from_secs(60)), // operation_timeout
Some(Duration::from_secs(55)), // operation_attempt_timeout
Some(Duration::from_secs(50)), // read_timeout
).await;
// Use legacy client without timeout configuration
let client = make_client(None, None).await;
Ok(())
}
make_client_with_timeout_default(endpoint_url: Option<String>) - Creates a Kinesis client with default timeout settingsmake_client_with_timeout(endpoint_url, connect_timeout, operation_timeout, operation_attempt_timeout, read_timeout) - Creates a Kinesis client with custom timeout settingsmake_client(endpoint_url: Option<String>, timeout_config: Option<TimeoutConfig>) - Creates a Kinesis client with optional custom endpoint and timeout configurationkinesis_data_streams::add_record(client, stream_name, partition_key, data) - Puts a single recordkinesis_data_streams::add_records(client, stream_name, records) - Puts multiple records in batchA builder for creating batches of records with automatic size validation:
new() - Creates a new builder with default AWS limitsnew_with_limit(single_limit, total_limit, record_limit) - Creates a builder with custom limitsadd_entry_data(data) - Adds a record with auto-generated partition keyadd_entry(data, partition_key, explicit_hash_key) - Adds a record with custom keysbuild() - Builds the final vector of recordslen() - Returns the number of recordsis_empty() - Checks if the builder is emptyThe library provides comprehensive error handling through the Error enum using the thiserror crate:
#[derive(Error, Debug)]
pub enum Error {
#[error(transparent)]
BuildError(#[from] Box<aws_sdk_kinesis::error::BuildError>),
#[error("EntryOverAll {0}")]
EntryOverAll(String),
#[error("EntryOverItem {0}")]
EntryOverItem(String),
#[error(transparent)]
AwsSdk(#[from] Box<aws_sdk_kinesis::Error>),
}
Error variants:
BuildError - Errors when building AWS SDK request entriesEntryOverItem - Individual record exceeds the 1MB size limitEntryOverAll - Adding a record would exceed batch limits (5MB total or 500 records)AwsSdk - General AWS SDK errors (network issues, authentication, etc.)use kinesis_data_streams::{make_client_with_timeout_default, kinesis_data_stream, RecordsBuilder, error::Error};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = make_client_with_timeout_default(None).await;
match kinesis_data_streams::add_record(&client, "my-stream", "key", "data").await {
Ok(output) => println!("Success: {}", output.sequence_number()),
Err(Error::AwsSdk(e)) => {
// Handle AWS SDK errors (e.g., stream not found, throttling)
eprintln!("AWS error: {}", e);
}
Err(e) => eprintln!("Other error: {}", e),
}
// Batch operations with size limit handling
let mut builder = RecordsBuilder::new();
match builder.add_entry_data("Large data...".to_string()) {
Ok(()) => println!("Record added to batch"),
Err(Error::EntryOverItem(msg)) => {
// Single record too large
eprintln!("Record too large: {}", msg);
}
Err(Error::EntryOverAll(msg)) => {
// Batch is full, need to send current batch
eprintln!("Batch full: {}", msg);
}
Err(e) => eprintln!("Unexpected error: {}", e),
}
Ok(())
}
The library respects AWS Kinesis Data Streams limits:
These limits are enforced by the RecordsBuilder to prevent API errors.
Run the test suite:
cargo test
For integration tests with specific environment variables:
RUST_LOG=info REALM_CODE=test cargo test test_kinesis_data_streams_records -- --nocapture --test-threads=1
The library includes comprehensive unit tests with mocking capabilities using mockito for testing without actual AWS resources.
The client uses the AWS SDK's default credential chain for authentication:
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION)aws-config - AWS configuration managementaws-sdk-kinesis - Official AWS Kinesis SDKthiserror - Error handlingtracing - Logging and tracinguuid - UUID generation for partition keysThis project is licensed under either of
at your option.
Contributions are welcome! Please feel free to submit a Pull Request.