kcl-async

Crates.iokcl-async
lib.rskcl-async
version0.1.0
created_at2025-07-13 19:42:19.720448+00
updated_at2025-07-13 19:42:19.720448+00
descriptionAsync KCL MultiLang Daemon library
homepagehttps://github.com/Shemnei/kcl-async
repositoryhttps://github.com/Shemnei/kcl-async
max_upload_size
id1750742
size34,265
Jonas Grawe (Shemnei)

documentation

https://github.com/Shemnei/kcl-async

README

kcl-async

[!CAUTION]

Under development and provides no guarantees for compatibility/future updates. Use at your own risk.

Rust async library for AWS Kinesis Client Library (KCL) consumers using the MultiLangDaemon interface.

Usage

Examples

See ./examples for a full usage example of this crate.

pub struct ExampleProcessor;

#[async_trait]
impl<T: Transport + Send> Processor<T> for ExampleProcessor {
    type Error = ();

    async fn initialize(&mut self, _msg: InitializeMessage) -> Result<(), Self::Error> {
        Ok(())
    }

    async fn process_records(
        &mut self,
        msg: ProcessRecordsMessage,
        checkpointer: &mut Checkpointer<'_, T>,
    ) -> Result<(), Self::Error> {
        for record in msg.records {
            let _bytes = record.to_bytes();

            // Process ...
        }

        if checkpointer.checkpoint(None, None).await.is_err() {
            return Err(());
        }

        Ok(())
    }

    async fn shutdown(
        &mut self,
        _msg: ShutdownMessage,
        _checkpointer: &mut Checkpointer<'_, T>,
    ) -> Result<(), Self::Error> {
        Err(())
    }

    async fn shutdown_requested(
        &mut self,
        _msg: ShutdownRequestedMessage,
        _checkpointer: &mut Checkpointer<'_, T>,
    ) -> Result<(), Self::Error> {
        Err(())
    }

    async fn lease_lost(&mut self, _msg: LeaseLostMessage) -> Result<(), Self::Error> {
        Err(())
    }

    async fn shard_ended(&mut self, _msg: ShardEndedMessage) -> Result<(), Self::Error> {
        Err(())
    }
}

#[tokio::main]
async fn main() -> Result<(), ()> {
    // Setup
    // e.g. tracing, database connections, ...

    // Start KCL process
    if let Err(err) = run(StdTransport::new(), ExampleProcessor).await {
        error!("Failed execution: {err:?}");
        return Err(());
    }

    Ok(())
}

Bootstrap

This repo provides a tool which bootstraps the KCL setup, downloading the required JAR files and providing the command for running the application using KCL. This tool is located at ./kcl-bootstrap. It must be run in the root directory and provided a KCL configuration.

# Sets up JARS and prints run command
./kcl-bootstrap --properties <PATH-TO-KCL-PROPERTIES>

# Sets up JARS and executes command
./kcl-bootstrap --properties <PATH-TO-KCL-PROPERTIES> --execute

Mentions

References

Commit count: 0

cargo fmt