use std::sync::Arc; use std::sync::atomic::{AtomicI64, Ordering}; use std::time::Duration; use tokio_util::sync::CancellationToken; use shiny_common::context::{Context, ContextFactory}; use shiny_common::error::ServiceError; use shiny_common::pointer_utils::ToArc; use shiny_jobs::Job; use shiny_jobs::job_trigger::interval_trigger::IntervalTrigger; use shiny_jobs::jobs_executor::JobsExecutor; struct DefaultContextFactory; impl ContextFactory for DefaultContextFactory { fn create(&self) -> Context { Context::new() } } #[tokio::test] async fn test_jobs_executor() { let mut executor = JobsExecutor::new(DefaultContextFactory.arc()); let counter = AtomicI64::new(0).arc(); let job = JobExample(counter.clone()).arc(); let trigger = IntervalTrigger::new(Duration::from_millis(10)).arc(); executor.schedule("job".to_string(), job, trigger); let cancellation_token = CancellationToken::new(); let child_token = cancellation_token.child_token(); let task = tokio::spawn(async move { executor.run(child_token).await }); tokio::time::sleep(Duration::from_millis(100)).await; cancellation_token.cancel(); tokio::select! { result = task => { result.unwrap() } _ = tokio::time::sleep(Duration::from_millis(100)) => {} } let counter = counter.load(Ordering::SeqCst); assert!(counter >= 7 && counter <= 13, "counter = {counter}"); } struct JobExample(Arc); #[async_trait::async_trait] impl Job for JobExample { async fn execute(&self, _: &mut Context) -> Result<(), ServiceError> { self.0.fetch_add(1, Ordering::SeqCst); Ok(()) } }