mod mocks; use kcl::tick; use crate::mocks::mock_processor::MockProcessor; use crate::mocks::mock_reader::MockReader; use crate::mocks::mock_writer::MockWriter; fn tick_into_processor(message: &str) -> (MockProcessor, MockWriter) { let mut processor = MockProcessor::default(); let mut reader = MockReader::with_input(message.to_string()); let mut writer = MockWriter::default(); tick(&mut processor, &mut reader, &mut writer).unwrap(); (processor, writer) } fn assert_status_response(writer: &MockWriter, status: &str) { let expected_out = format!("{{\"action\":\"status\",\"responseFor\":\"{status}\"}}\n"); assert_eq!(writer.outputs.last(), Some(&expected_out)) } #[test] fn test_tick_initialize() { let message = "{\"action\" :\"initialize\", \"shardId\": \"shard1\"}"; let (processor, writer) = tick_into_processor(message); assert_eq!(processor.shard, Some("shard1".to_owned())); assert_eq!(writer.outputs.len(), 1); assert_status_response(&writer, "initialize"); } #[test] fn test_tick_new_record() { let message = "{\"action\": \"processRecords\", \ \"records\": [{\ \"data\": \"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==\",\ \"partitionKey\": \"1\",\ \"sequenceNumber\": \"49590338271490256608559692538361571095921575989136588898\",\ \"approximateArrivalTimestamp\": 1570887011763.01}]}"; let (processor, writer) = tick_into_processor(message); let record = processor.records.last().unwrap(); assert_eq!( std::str::from_utf8(record.raw_data.as_slice()).unwrap(), "Hello, this is a test." ); assert_eq!(writer.outputs.len(), 1); assert_status_response(&writer, "processRecords"); } #[test] fn test_tick_lease_lost() { let message = "{\"action\": \"leaseLost\"}"; let (processor, writer) = tick_into_processor(message); assert!(processor.lease_lost); assert_eq!(writer.outputs.len(), 1); assert_status_response(&writer, "leaseLost"); } #[test] fn test_tick_shard_ended() { let message = "{\"action\": \"shardEnded\", \"checkpoint\": \"1234\"}"; let (processor, writer) = tick_into_processor(message); assert!(processor.shard_ended); assert_eq!(writer.outputs.len(), 1); assert_status_response(&writer, "shardEnded"); } #[test] fn test_tick_shutdown_requested() { let message = "{\"action\": \"shutdownRequested\", \"checkpoint\": \"1234\"}"; let (processor, writer) = tick_into_processor(message); assert!(processor.shutdown_requested); assert_eq!(writer.outputs.len(), 1); assert_status_response(&writer, "shutdownRequested"); }