go-zoom-kinesis

Crates.iogo-zoom-kinesis
lib.rsgo-zoom-kinesis
version
sourcesrc
created_at2024-10-29 14:38:35.647284
updated_at2024-12-11 15:59:26.563706
descriptionA robust AWS Kinesis stream processor with checkpointing and retry capabilities
homepage
repositoryhttps://github.com/cgorski/go-zoom-kinesis
max_upload_size
id1427083
Cargo.toml error:TOML parse error at line 18, column 1 | 18 | autolib = false | ^^^^^^^ unknown field `autolib`, expected one of `name`, `version`, `edition`, `authors`, `description`, `readme`, `license`, `repository`, `homepage`, `documentation`, `build`, `resolver`, `links`, `default-run`, `default_dash_run`, `rust-version`, `rust_dash_version`, `rust_version`, `license-file`, `license_dash_file`, `license_file`, `licenseFile`, `license_capital_file`, `forced-target`, `forced_dash_target`, `autobins`, `autotests`, `autoexamples`, `autobenches`, `publish`, `metadata`, `keywords`, `categories`, `exclude`, `include`
size0
Christopher A Gorski (cgorski)

documentation

https://docs.rs/go-zoom-kinesis

README

go-zoom-kinesis 🐊

CI codecov Crates.io Documentation License: MIT

A robust, production-ready AWS Kinesis stream processor with checkpointing and retry capabilities. Built with reliability and performance in mind.

Features 🚀

  • âœĻ Automatic checkpointing with multiple storage backends
  • 🔄 Configurable retry logic with exponential backoff
  • 🛞ïļ Comprehensive error handling
  • 😊 Multiple shard processing
  • ðŸ•Ĩ DynamoDB checkpoint storage support
  • 📘 Detailed tracing and monitoring
  • ðŸ“Ķ Graceful shutdown handling
  • ðŸĒŠ Production-ready with extensive test coverage
  • 🎧 Configurable stream position initialization
  • 🔄 Smart checkpoint recovery with fallback options

Basic Usage 📓

use go_zoom_kinesis::{
    KinesisProcessor, ProcessorConfig, RecordProcessor,
    processor::RecordMetadata, processor::InitialPosition,
    store::InMemoryCheckpointStore,
    monitoring::MonitoringConfig,
    error::{ProcessorError, ProcessingError},
};
use aws_sdk_kinesis::{Client, types::Record};
use std::time::Duration;
use async_trait::async_trait;

#[derive(Clone)]
struct MyProcessor;

#[async_trait]
impl RecordProcessor for MyProcessor {
    type Item = ();

    async fn process_record<'a>(
        &self,
        record: &'a Record,
        metadata: RecordMetadata<'a>,
    ) -> Result<Option<Self::Item>, ProcessingError> {
        println!("Processing record: {:?}", record);
        Ok(None)
    }
}

#[tokio::main]
async fn main() -> Result<(), ProcessorError> {
    let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
    let client = Client::new(&config);

    let config = ProcessorConfig {
        stream_name: "my-stream".to_string(),
        batch_size: 100,
        api_timeout: Duration::from_secs(30),
        processing_timeout: Duration::from_secs(300),
        max_retries: Some(3),
        shard_refresh_interval: Duration::from_secs(60),
        initial_position: InitialPosition::TrimHorizon,
        prefer_stored_checkpoint: true,
        monitoring: MonitoringConfig {
            enabled: true,
            ..Default::default()
        },
        ..Default::default()
    };

    let processor = MyProcessor;
    let store = InMemoryCheckpointStore::new();

    let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
    let (processor, _monitoring_rx) = KinesisProcessor::new(
        config,
        processor,
        client,
        store,
    );

    processor.run(shutdown_rx).await
}

Contributing 😊

Contributions are welcome! Please feel free to submit a Pull Request.

License 📒

This project is licensed under the MIT License - see the LICENSE file for details.

Support 🔠

If you have any questions or run into issues, please open an issue on GitHub.

Commit count: 77

cargo fmt