aws_utils_kinesis_data_streams

Crates.ioaws_utils_kinesis_data_streams
lib.rsaws_utils_kinesis_data_streams
version0.3.0
created_at2025-07-10 09:17:23.752313+00
updated_at2025-09-16 23:39:44.916949+00
descriptionAWS Kinesis Data Streams utilities for Rust
homepagehttps://github.com/UniqueVision/utilities.aws-utils
repositoryhttps://github.com/UniqueVision/utilities.aws-utils
max_upload_size
id1746065
size93,110
Kouhei Aoyagi (aoyagikouhei)

documentation

README

Kinesis Data Streams

A Rust library providing utilities for AWS Kinesis Data Streams operations with built-in retry logic and batch processing capabilities.

Features

  • Simple API: Easy-to-use functions for putting records to Kinesis Data Streams
  • Batch Processing: Efficient batch record operations with automatic size and count validation
  • Records Builder: Builder pattern for constructing batches of records with size constraints
  • Error Handling: Comprehensive error handling with custom error types
  • Retry Logic: Built-in retry mechanisms for handling transient failures
  • AWS SDK Integration: Built on top of the official AWS SDK for Rust
  • Testing Support: Comprehensive unit tests with mocking capabilities

Installation

Add this to your Cargo.toml:

[dependencies]
kinesis_data_streams = "0.1.0"

Usage

Basic Usage

use kinesis_data_streams::{make_client_with_timeout_default, kinesis_data_stream};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create a Kinesis client with default timeout settings
    let client = make_client_with_timeout_default(None).await;
    
    // Put a single record
    let result = kinesis_data_streams::add_record(
        &client,
        "my-stream",
        "partition-key",
        "Hello, Kinesis!".to_string(),
    ).await?;
    
    println!("Record added with sequence number: {}", result.sequence_number());
    
    Ok(())
}

Batch Processing with RecordsBuilder

use kinesis_data_streams::{make_client_with_timeout_default, kinesis_data_stream, RecordsBuilder};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = make_client_with_timeout_default(None).await;
    
    // Build a batch of records
    let mut builder = RecordsBuilder::new();
    builder.add_entry_data("Record 1".to_string())?;
    builder.add_entry_data("Record 2".to_string())?;
    builder.add_entry("Record 3".to_string(), Some("custom-partition".to_string()), None)?;
    
    // Send the batch
    let records = builder.build();
    let result = kinesis_data_streams::add_records(&client, "my-stream", records).await?;
    
    println!("Batch sent with {} failed records", result.failed_record_count().unwrap_or(0));
    
    Ok(())
}

Custom Endpoint (for testing)

use kinesis_data_streams::make_client_with_timeout_default;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Use a custom endpoint (e.g., for LocalStack)
    let client = make_client_with_timeout_default(Some("http://localhost:4566".to_string())).await;
    
    // Your Kinesis operations here...
    
    Ok(())
}

Timeout Configuration

use kinesis_data_streams::{make_client, make_client_with_timeout, make_client_with_timeout_default};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Use default timeout settings (recommended)
    let client = make_client_with_timeout_default(None).await;
    
    // Use custom timeout settings
    let client = make_client_with_timeout(
        None, // endpoint_url
        Some(Duration::from_secs(3100)), // connect_timeout
        Some(Duration::from_secs(60)),   // operation_timeout
        Some(Duration::from_secs(55)),   // operation_attempt_timeout
        Some(Duration::from_secs(50)),   // read_timeout
    ).await;
    
    // Use legacy client without timeout configuration
    let client = make_client(None, None).await;
    
    Ok(())
}

API Reference

Functions

  • make_client_with_timeout_default(endpoint_url: Option<String>) - Creates a Kinesis client with default timeout settings
  • make_client_with_timeout(endpoint_url, connect_timeout, operation_timeout, operation_attempt_timeout, read_timeout) - Creates a Kinesis client with custom timeout settings
  • make_client(endpoint_url: Option<String>, timeout_config: Option<TimeoutConfig>) - Creates a Kinesis client with optional custom endpoint and timeout configuration
  • kinesis_data_streams::add_record(client, stream_name, partition_key, data) - Puts a single record
  • kinesis_data_streams::add_records(client, stream_name, records) - Puts multiple records in batch

RecordsBuilder

A builder for creating batches of records with automatic size validation:

  • new() - Creates a new builder with default AWS limits
  • new_with_limit(single_limit, total_limit, record_limit) - Creates a builder with custom limits
  • add_entry_data(data) - Adds a record with auto-generated partition key
  • add_entry(data, partition_key, explicit_hash_key) - Adds a record with custom keys
  • build() - Builds the final vector of records
  • len() - Returns the number of records
  • is_empty() - Checks if the builder is empty

Error Handling

The library provides comprehensive error handling through the Error enum using the thiserror crate:

#[derive(Error, Debug)]
pub enum Error {
    #[error(transparent)]
    BuildError(#[from] Box<aws_sdk_kinesis::error::BuildError>),
    
    #[error("EntryOverAll {0}")]
    EntryOverAll(String),
    
    #[error("EntryOverItem {0}")]
    EntryOverItem(String),
    
    #[error(transparent)]
    AwsSdk(#[from] Box<aws_sdk_kinesis::Error>),
}

Error variants:

  • BuildError - Errors when building AWS SDK request entries
  • EntryOverItem - Individual record exceeds the 1MB size limit
  • EntryOverAll - Adding a record would exceed batch limits (5MB total or 500 records)
  • AwsSdk - General AWS SDK errors (network issues, authentication, etc.)

Error Handling Example

use kinesis_data_streams::{make_client_with_timeout_default, kinesis_data_stream, RecordsBuilder, error::Error};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = make_client_with_timeout_default(None).await;
    
    match kinesis_data_streams::add_record(&client, "my-stream", "key", "data").await {
        Ok(output) => println!("Success: {}", output.sequence_number()),
        Err(Error::AwsSdk(e)) => {
            // Handle AWS SDK errors (e.g., stream not found, throttling)
            eprintln!("AWS error: {}", e);
        }
        Err(e) => eprintln!("Other error: {}", e),
    }
    
    // Batch operations with size limit handling
    let mut builder = RecordsBuilder::new();
    match builder.add_entry_data("Large data...".to_string()) {
        Ok(()) => println!("Record added to batch"),
        Err(Error::EntryOverItem(msg)) => {
            // Single record too large
            eprintln!("Record too large: {}", msg);
        }
        Err(Error::EntryOverAll(msg)) => {
            // Batch is full, need to send current batch
            eprintln!("Batch full: {}", msg);
        }
        Err(e) => eprintln!("Unexpected error: {}", e),
    }
    
    Ok(())
}

AWS Kinesis Limits

The library respects AWS Kinesis Data Streams limits:

  • Single Record: Maximum 1MB per record
  • Batch Operation: Maximum 5MB total payload and 500 records per batch
  • Partition Key: Maximum 256 UTF-8 characters

These limits are enforced by the RecordsBuilder to prevent API errors.

Testing

Run the test suite:

cargo test

For integration tests with specific environment variables:

RUST_LOG=info REALM_CODE=test cargo test test_kinesis_data_streams_records -- --nocapture --test-threads=1

The library includes comprehensive unit tests with mocking capabilities using mockito for testing without actual AWS resources.

Configuration

Authentication

The client uses the AWS SDK's default credential chain for authentication:

  • Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION)
  • ECS task role (for Fargate/ECS)
  • EC2 instance profile
  • AWS credentials file
  • Other configured credential providers

Dependencies

  • aws-config - AWS configuration management
  • aws-sdk-kinesis - Official AWS Kinesis SDK
  • thiserror - Error handling
  • tracing - Logging and tracing
  • uuid - UUID generation for partition keys

License

This project is licensed under either of

  • Apache License, Version 2.0
  • MIT License

at your option.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Commit count: 84

cargo fmt