Crates.io | go-zoom-kinesis |
lib.rs | go-zoom-kinesis |
version | |
source | src |
created_at | 2024-10-29 14:38:35.647284 |
updated_at | 2024-12-11 15:59:26.563706 |
description | A robust AWS Kinesis stream processor with checkpointing and retry capabilities |
homepage | |
repository | https://github.com/cgorski/go-zoom-kinesis |
max_upload_size | |
id | 1427083 |
Cargo.toml error: | TOML parse error at line 18, column 1 | 18 | autolib = false | ^^^^^^^ unknown field `autolib`, expected one of `name`, `version`, `edition`, `authors`, `description`, `readme`, `license`, `repository`, `homepage`, `documentation`, `build`, `resolver`, `links`, `default-run`, `default_dash_run`, `rust-version`, `rust_dash_version`, `rust_version`, `license-file`, `license_dash_file`, `license_file`, `licenseFile`, `license_capital_file`, `forced-target`, `forced_dash_target`, `autobins`, `autotests`, `autoexamples`, `autobenches`, `publish`, `metadata`, `keywords`, `categories`, `exclude`, `include` |
size | 0 |
A robust, production-ready AWS Kinesis stream processor with checkpointing and retry capabilities. Built with reliability and performance in mind.
use go_zoom_kinesis::{
KinesisProcessor, ProcessorConfig, RecordProcessor,
processor::RecordMetadata, processor::InitialPosition,
store::InMemoryCheckpointStore,
monitoring::MonitoringConfig,
error::{ProcessorError, ProcessingError},
};
use aws_sdk_kinesis::{Client, types::Record};
use std::time::Duration;
use async_trait::async_trait;
#[derive(Clone)]
struct MyProcessor;
#[async_trait]
impl RecordProcessor for MyProcessor {
type Item = ();
async fn process_record<'a>(
&self,
record: &'a Record,
metadata: RecordMetadata<'a>,
) -> Result<Option<Self::Item>, ProcessingError> {
println!("Processing record: {:?}", record);
Ok(None)
}
}
#[tokio::main]
async fn main() -> Result<(), ProcessorError> {
let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
let client = Client::new(&config);
let config = ProcessorConfig {
stream_name: "my-stream".to_string(),
batch_size: 100,
api_timeout: Duration::from_secs(30),
processing_timeout: Duration::from_secs(300),
max_retries: Some(3),
shard_refresh_interval: Duration::from_secs(60),
initial_position: InitialPosition::TrimHorizon,
prefer_stored_checkpoint: true,
monitoring: MonitoringConfig {
enabled: true,
..Default::default()
},
..Default::default()
};
let processor = MyProcessor;
let store = InMemoryCheckpointStore::new();
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
let (processor, _monitoring_rx) = KinesisProcessor::new(
config,
processor,
client,
store,
);
processor.run(shutdown_rx).await
}
Contributions are welcome! Please feel free to submit a Pull Request.
This project is licensed under the MIT License - see the LICENSE file for details.
If you have any questions or run into issues, please open an issue on GitHub.