use std::collections::HashMap; use std::error::Error; use std::fmt::Display; use crate::controllers::Controllers; use crate::Module; use std::io; use std::pin::pin; use std::process::ExitCode; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use tokio_util::sync::CancellationToken; use serde::Deserialize; use thiserror::Error; use shiny_common::pointer_utils::ToArc; use shiny_common::clock::SystemClock; use shiny_configuration::{Configuration, GetConfigurationError}; use shiny_configuration::configuration_builder::BuildConfigurationError; use shiny_configuration::configuration_provider::env_provider::EnvironmentVariablesProvider; use shiny_configuration::configuration_provider::json5_provider::{CreateJson5ProviderFromFileError, Json5Provider}; use shiny_jobs::job_trigger::cron_trigger::{CronTrigger, NewCronTriggerError}; use shiny_jobs::job_trigger::interval_trigger::{IntervalTriggerFromStrError, IntervalTrigger}; use shiny_jobs::job_trigger::JobTrigger; use shiny_jobs::jobs_executor::JobsExecutor; use crate::context_factory::DefaultContextFactory; pub struct Application { controller: Controllers, cancellation_signal_factory: Arc, jobs_configuration_factory: Arc, dynamic_configuration_factory: Arc, } #[async_trait::async_trait] pub trait CancellationSignalFactory { async fn cancellation_signal(&self) -> io::Result<()>; } struct CtrlCCancellationSignalFactory; #[async_trait::async_trait] impl CancellationSignalFactory for CtrlCCancellationSignalFactory { async fn cancellation_signal(&self) -> io::Result<()> { tokio::signal::ctrl_c().await } } impl Application { pub fn new(clients: TClients, module: TModule) -> Self where TModule: Module, { Application { controller: module.create(clients), cancellation_signal_factory: CtrlCCancellationSignalFactory.arc(), jobs_configuration_factory: DefaultJobsConfigurationFactory.arc(), dynamic_configuration_factory: DefaultDynamicConfigurationFactory.arc(), } } pub async fn run(self) -> ExitCode { match self.run_internal().await { Ok(()) => ExitCode::SUCCESS, Err(error) => { tracing::error!("Application failed: {}", error); eprintln!("\n{}", MultilineDisplayAdapter(error)); ExitCode::FAILURE } } } async fn run_internal(self) -> Result<(), RunApplicationError> { tracing::info!("Starting application"); let configuration = self.jobs_configuration_factory.configuration()?; let jobs_configuration: HashMap = configuration.get("jobs").map_err(RunApplicationError::FailedToGetJobsConfiguration)?; let configuration = self.dynamic_configuration_factory.configuration()?; let context_factory = DefaultContextFactory::new(configuration).arc(); let mut job_executor = JobsExecutor::new(context_factory); let clock = SystemClock.arc(); for (name, job) in self.controller.job.take_jobs() { if let Some(configuration) = jobs_configuration.get(&name) { if configuration.enabled.unwrap_or(true) { let trigger: Arc = if configuration.schedule.starts_with("PT") { let trigger = IntervalTrigger::from_str(&configuration.schedule)?; tracing::info!("Registered job `{name}` with interval `{} seconds`", trigger.interval().as_secs()); trigger.arc() } else { let trigger = CronTrigger::new(&configuration.schedule, clock.clone())?; tracing::info!("Registered job `{name}` with schedule `{}`", configuration.schedule); trigger.arc() }; job_executor.schedule(name, job, trigger); } else { tracing::info!("Job `{name}` disabled, skiping"); } } else { return Err(RunApplicationError::MissingConfigurationForJob(name)); } } let cancellation_signal = self.cancellation_signal_factory.cancellation_signal(); let cancellation_token = CancellationToken::new(); let mut future = pin!(job_executor.run(cancellation_token.child_token())); tokio::select! { _ = &mut future => { return Err(RunApplicationError::JobExecutorStoppedUnexpectedly) } result = cancellation_signal => { match result { Ok(_) => { tracing::info!("Cancellation signal received, starting graceful shutdown"); cancellation_token.cancel(); }, Err(err) => { return Err(RunApplicationError::FailedToRegisterCtrlCHandler(err)) } } } } // Cancellation signal received let graceful_period = tokio::time::sleep(Duration::from_secs(10)); tokio::select! { _ = &mut future => { tracing::info!("Application successfully stopped"); Ok(()) } _ = graceful_period => { Err(RunApplicationError::JobExecutorFailedToStopDuringGracefulShutdownPeriod) } } } pub fn set_cancellation_signal_factory(&mut self, cancellation_signal_factory: Arc) { self.cancellation_signal_factory = cancellation_signal_factory; } pub fn set_infrastructure_configuration_factory(&mut self, configuration_factory: Arc) { self.jobs_configuration_factory = configuration_factory; } pub fn set_dynamic_configuration_factory(&mut self, dynamic_configuration_factory: Arc) { self.dynamic_configuration_factory = dynamic_configuration_factory; } } #[derive(Debug, Error)] enum RunApplicationError { #[error("Failed to register control-c handler")] FailedToRegisterCtrlCHandler(#[source] io::Error), #[error("Failed to create configuration")] FailedToCreateConfiguration(#[from] CreateInfrastructureConfigurationError), #[error("Failed to get jobs configuration")] FailedToGetJobsConfiguration(#[from] GetConfigurationError), #[error("Failed to create interval job trigger")] FailedToCreateIntervalJobTrigger(#[from] IntervalTriggerFromStrError), #[error("Failed to create cron job trigger")] FailedToCreateCronJobTrigger(#[from] NewCronTriggerError), #[error("Missing configuration for job {0}")] MissingConfigurationForJob(String), #[error("Job executor failed to stop during graceful shutdown")] JobExecutorFailedToStopDuringGracefulShutdownPeriod, #[error("Job executor stopped unexpectedly")] JobExecutorStoppedUnexpectedly, } pub trait ConfigurationFactory { fn configuration(&self) -> Result; } struct DefaultJobsConfigurationFactory; impl ConfigurationFactory for DefaultJobsConfigurationFactory { fn configuration(&self) -> Result { let root_provider = Json5Provider::from_path("./configuration/jobs.json5")?; let env_provider = EnvironmentVariablesProvider::new(); Ok( Configuration::builder(root_provider) .with_provider("environment", env_provider) .build()? ) } } struct DefaultDynamicConfigurationFactory; impl ConfigurationFactory for DefaultDynamicConfigurationFactory { fn configuration(&self) -> Result { let root_provider = Json5Provider::from_path("./configuration/dynamic.json5")?; let env_provider = EnvironmentVariablesProvider::new(); Ok( Configuration::builder(root_provider) .with_provider("environment", env_provider) .build()? ) } } #[derive(Debug, Error)] pub enum CreateInfrastructureConfigurationError { #[error("Failed to create yaml provider")] FailedToCreateJson5Provider(#[from] CreateJson5ProviderFromFileError), #[error("Failed to create configuration")] FailedToCreateConfiguration(#[from] BuildConfigurationError), } #[derive(Deserialize)] struct JobConfiguration { schedule: String, enabled: Option, } pub struct MultilineDisplayAdapter(pub E); impl Display for MultilineDisplayAdapter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { writeln!(f, "Error: {}", self.0)?; let mut cause = self.0.source(); if self.0.source().is_some() { writeln!(f, "Caused by:")?; } while let Some(error) = cause { writeln!(f, " {}", error)?; cause = error.source(); } Ok(()) } }