use std::sync::Arc; use chrono::Utc; use futures::TryFutureExt; use lib::database::run_store::RunStore; use lib::database::DatabaseError; use lib::e; use lib::prelude::TonicRequestExt; use lib::service::ServiceContext; use lib::types::{Run, RunId, RunStatus, TriggerId}; use metrics::counter; use proto::common::PaginationIn; use proto::dispatcher_proto::dispatcher_server::Dispatcher; use proto::dispatcher_proto::{ DispatchRequest, DispatchResponse, GetRunRequest, GetRunResponse, ListRunsRequest, ListRunsResponse, }; use thiserror::Error; use tonic::{Request, Response, Status}; use crate::dispatch_manager::DispatchManager; pub(crate) struct DispatcherAPIHandler { #[allow(unused)] context: ServiceContext, dispatch_manager: DispatchManager, run_store: Arc, } impl DispatcherAPIHandler { pub fn new( context: ServiceContext, dispatch_manager: DispatchManager, run_store: Arc, ) -> Self { Self { context, dispatch_manager, run_store, } } } #[tonic::async_trait] impl Dispatcher for DispatcherAPIHandler { async fn dispatch( &self, request: Request, ) -> Result, Status> { let ctx = request.context()?; let request = request.into_inner(); let dispatch_mode = request.mode(); let run_id = RunId::generate(&ctx.project_id); let run = Run { id: run_id.into(), trigger_id: request.trigger_id.unwrap().into(), project_id: ctx.project_id.clone(), created_at: Utc::now(), payload: request.payload.map(|p| p.into()), action: request.action.unwrap().into(), status: RunStatus::Attempting, latest_attempt_id: None, latest_attempt: None, }; counter!("dispatcher.runs_total", 1); e!( context = ctx, RunCreated { meta: run.meta().into() } ); let run = self .dispatch_manager .run(run, dispatch_mode) .await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(DispatchResponse { run: Some(run.into()), })) } async fn get_run( &self, request: Request, ) -> Result, Status> { let ctx = request.context()?; let request = request.into_inner(); let run_id: RunId = request.run_id.unwrap().into(); let run = self .run_store .get_run(&ctx.project_id, &run_id) .map_err(DispatcherHandlerError::Store) .await?; match run { | Some(run) => { Ok(Response::new(GetRunResponse { run: Some(run.into()), })) } | None => { Err(DispatcherHandlerError::NotFound(run_id.to_string()).into()) } } } async fn list_runs( &self, request: Request, ) -> Result, Status> { let ctx = request.context()?; let request = request.into_inner(); let trigger_id: TriggerId = request.trigger_id.unwrap().into(); let pagination: PaginationIn = request.pagination.unwrap(); let runs = self .run_store .get_runs_by_trigger(&ctx.project_id, &trigger_id, pagination) .await .map_err(DispatcherHandlerError::Store)?; Ok(Response::new(ListRunsResponse { runs: runs.data.into_iter().map(Into::into).collect(), pagination: Some(runs.pagination), })) } } #[derive(Error, Debug)] pub(crate) enum DispatcherHandlerError { #[error("Run '{0}' is unknown to this dispatcher!")] NotFound(String), #[error("Operation on underlying database failed: {0}")] Store(#[from] DatabaseError), } impl From for Status { fn from(e: DispatcherHandlerError) -> Self { // match variants of TriggerError match e { | DispatcherHandlerError::NotFound(e) => Status::not_found(e), | e => Status::invalid_argument(e.to_string()), } } }