use crate::mocks::mock_reader::MockReader; use crate::mocks::mock_writer::MockWriter; use kcl::checkpointer::Checkpointer; use kcl::{Processor, Record}; #[derive(Default)] pub struct MockProcessor { pub shard: Option, pub records: Vec, pub lease_lost: bool, pub shard_ended: bool, pub shutdown_requested: bool, } impl Processor for MockProcessor { fn initialize(&mut self, shard_id: &str) { self.shard = Some(shard_id.to_owned()) } fn process_records( &mut self, data: &[Record], _checkpointer: &mut Checkpointer, ) { for record in data { self.records.push((*record).clone()) } } fn lease_lost(&mut self) { self.lease_lost = true; } fn shard_ended(&mut self, _checkpointer: &mut Checkpointer) { self.shard_ended = true; } fn shutdown_requested(&mut self, _checkpointer: &mut Checkpointer) { self.shutdown_requested = true; } } #[derive(Default)] pub struct MockCheckpointingProcessor { pub shard: Option, pub records: Vec, pub lease_lost: bool, pub shard_ended: bool, pub shutdown_requested: bool, } impl Processor for MockCheckpointingProcessor { fn initialize(&mut self, shard_id: &str) { self.shard = Some(shard_id.to_owned()) } fn process_records( &mut self, data: &[Record], checkpointer: &mut Checkpointer, ) { for record in data { self.records.push((*record).clone()) } match checkpointer.checkpoint(None, None) { Ok(_) => {} Err(error) => { panic!("{error}, retryable: {}", error.is_retryable()) } }; } fn lease_lost(&mut self) { self.lease_lost = true; } fn shard_ended(&mut self, _checkpointer: &mut Checkpointer) { self.shard_ended = true; } fn shutdown_requested(&mut self, _checkpointer: &mut Checkpointer) { self.shutdown_requested = true; } }