use crate::common::{ self, CerberusResult, CerberusError, SubscriptionSummary, Projections, Projection, }; use serde::{ Deserialize, Serialize }; #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct SubscriptionDetail { pub event_stream_id: String, pub group_name: String, pub config: SubscriptionConfig, } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub enum NamedConsumerStrategy { RoundRobin, DispatchToSingle, Pinned, } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct SubscriptionConfig { pub resolve_linktos: bool, pub start_from: i64, pub message_timeout_milliseconds: usize, pub extra_statistics: bool, pub max_retry_count: usize, pub live_buffer_size: usize, pub buffer_size: usize, pub read_batch_size: usize, pub check_point_after_milliseconds: usize, pub min_check_point_count: usize, pub max_check_point_count: usize, pub max_subscriber_count: usize, pub named_consumer_strategy: NamedConsumerStrategy, } impl Default for SubscriptionConfig { fn default() -> Self { SubscriptionConfig { resolve_linktos: false, start_from: 0, message_timeout_milliseconds: 10_000, extra_statistics: false, max_retry_count: 10, live_buffer_size: 500, buffer_size: 500, read_batch_size: 20, check_point_after_milliseconds: 1_000, min_check_point_count: 10, max_check_point_count: 500, max_subscriber_count: 10, named_consumer_strategy: NamedConsumerStrategy::RoundRobin, } } } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct ProjectionConfig { pub name: String, pub query: String, #[serde(default)] pub emit_enabled: bool, } pub struct Api<'a> { host: &'a str, port: u16, client: reqwest::Client, } pub struct ProjectionConf<'a> { pub name: Option<&'a str>, pub kind: &'a str, pub enabled: bool, pub emit: bool, pub checkpoints: bool, pub track_emitted_streams: bool, pub script: String, } pub struct UpdateProjectionConf<'a> { pub name: &'a str, pub emit: bool, pub track_emitted_streams: bool, pub query: &'a str, } pub enum ClusterState { Cluster(common::ClusterMembers), ProblematicClusterNode, NoCluster, } fn default_error_handler(mut resp: reqwest::Response) -> CerberusResult { if resp.status().is_client_error() { if resp.status() == reqwest::StatusCode::UNAUTHORIZED { return Err( CerberusError::UserFault( "Your current user cannot perform that action.".to_owned())); } if resp.status() == reqwest::StatusCode::NOT_FOUND { return Err( CerberusError::UserFault( "You are asking for a resource that doesn't exist.".to_owned())); } let msg = resp.text().unwrap_or_else(|_| "".to_owned()); return Err( CerberusError::UserFault( format!("User error: {}", msg))); } if resp.status().is_server_error() { let msg = resp.text().unwrap_or_else(|_| "".to_owned()); return Err( CerberusError::UserFault( format!("Server error: [{}] {}", resp.status(), msg))); } Err( CerberusError::DevFault( format!("{:?}", resp))) } fn default_connection_error( api: &Api, error: reqwest::Error, ) -> CerberusError { CerberusError::UserFault( format!("Unable to connect to node {}:{}: {}", api.host(), api.port(), error)) } impl<'a> Api<'a> { pub fn new( host: &'a str, port: u16, user_opt: Option, ) -> Api<'a> { let mut builder = reqwest::Client::builder(); if let Some(user) = user_opt { let mut headers = reqwest::header::HeaderMap::new(); let auth = match user.password { None => format!("{}:", user.login), Some(passw) => format!("{}:{}", user.login, passw), }; let header_value = format!("Basic {}", base64::encode(&auth)); let header_value = reqwest::header::HeaderValue::from_str(&header_value).unwrap(); headers.insert(reqwest::header::AUTHORIZATION, header_value); builder = builder.default_headers(headers); } Api { host, port, client: builder.build().unwrap(), } } pub fn host(&self) -> &str { self.host } pub fn port(&self) -> u16 { self.port } pub fn with_different_node<'b>( &self, host: &'b str, port: u16, ) -> Api<'b> { Api { host, port, client: self.client.clone(), } } pub fn create_projection( &self, conf: ProjectionConf, ) -> CerberusResult { let enabled = format!("{}", conf.enabled); let emit = format!("{}", conf.emit); let checkpoints = format!("{}", conf.checkpoints); let track_emitted_streams_value = format!("{}", conf.track_emitted_streams); let mut query = vec![ ("enabled", enabled.as_str()), ("emit", emit.as_str()), ("checkpoints", checkpoints.as_str()), ("trackemittedstreams", track_emitted_streams_value.as_str()), ("type", "JS") ]; if let Some(name) = conf.name { query.push(("name", name)); } let req = self.client .post(&format!("http://{}:{}/projections/{}", self.host, self.port, conf.kind)) .header(reqwest::header::CONTENT_TYPE, "application/json;charset=UTF-8") .query(&query) .body(conf.script); let mut resp = req.send().map_err(|e| { CerberusError::UserFault( format!("Failed to create a projection: {}", e)) })?; if resp.status().is_success() { return resp.json().map_err(|e| { let msg = resp.text().unwrap_or_else(|_| "".to_owned()); CerberusError::DevFault( format!( "We were unable to deserialize ProjectionCreationSuccess \ out of a projection creation response. \ response body [{}], Error: {}", msg, e)) }); } default_error_handler(resp) } pub fn projection_cropped_info( &self, projection_name: &str, ) -> CerberusResult { let info_opt = self.projection_cropped_info_opt(projection_name)?; match info_opt { Some(info) => Ok(info), None => Err( CerberusError::UserFault( format!("Projection [{}] doesn't exist.", projection_name))), } } pub fn projection_cropped_info_opt( &self, projection_name: &str, ) -> CerberusResult> { let req = self.client .get(&format!("http://{}:{}/projection/{}", self.host, self.port, projection_name)); let mut resp = req.send().map_err(|e| { CerberusError::UserFault( format!("Failed to read projection info (cropped): {}", e)) })?; if resp.status().is_success() { let info = resp.json().map_err(|e| { CerberusError::DevFault( format!( "We were unable to deserialize CroppedProjectionInfo out \ of projection info request: [{}] {:?}", e, resp)) })?; return Ok(Some(info)); } if resp.status() == reqwest::StatusCode::NOT_FOUND { return Ok(None); } default_error_handler(resp) } pub fn node_info(&self) -> CerberusResult { let req = self.client .get(&format!("http://{}:{}/info?format=json", self.host, self.port)); let mut resp = req.send().map_err(|e| CerberusError::UserFault( format!("Unable to connect to node {}:{}: {}", self.host, self.port, e)) )?; if resp.status().is_success() { return resp.json().map_err(|e| CerberusError::DevFault( format!("Failed to parse NodeInfo: {}", e)) ); } default_error_handler(resp) } pub fn gossip(&self) -> CerberusResult { let req = reqwest::Client::new() .get(&format!("http://{}:{}/gossip?format=json", self.host, self.port)); let mut resp = req.send().map_err(|e| default_connection_error(self, e) )?; if resp.status().is_success() { let members = resp.json().map_err(|e| CerberusError::DevFault( format!("Failed to deserialize ClusterMembers object: {}", e)) )?; return Ok(ClusterState::Cluster(members)); } if resp.status().is_client_error() { return Ok(ClusterState::NoCluster); } if resp.status().is_server_error() { return Ok(ClusterState::ProblematicClusterNode); } default_error_handler(resp) } pub fn subscriptions(&self) -> CerberusResult> { let req = self.client .get(&format!("http://{}:{}/subscriptions", self.host(), self.port())); let mut resp = req.send().map_err(|e| default_connection_error(self, e) )?; if resp.status().is_success() { return resp.json().map_err(|e| CerberusError::DevFault( format!("Failed to deserialize SubscriptionSummary: {}", e)) ); } default_error_handler(resp) } pub fn subscriptions_raw(&self) -> CerberusResult> { let req = self.client .get(&format!("http://{}:{}/subscriptions", self.host(), self.port())); let mut resp = req.send().map_err(|e| default_connection_error(self, e) )?; if resp.status().is_success() { return resp.json().map_err(|e| CerberusError::DevFault( format!("Failed to deserialize SubscriptionSummary raw: {}", e)) ); } default_error_handler(resp) } pub fn subscription_raw( &self, stream: &str, group_id: &str, ) -> CerberusResult { let url = format!( "http://{}:{}/subscriptions/{}/{}/info", self.host(), self.port(), stream, group_id); let req = self.client.get(&url); let mut resp = req.send().map_err(|e| default_connection_error(self, e) )?; if resp.status().is_success() { return resp.json().map_err(|e| CerberusError::UserFault( format!("Failed to deserialize SubscriptionSummary: {}", e)) ); } default_error_handler(resp) } // Note used at the moment but will be later. fn _subscription( &self, stream: &str, group_id: &str, ) -> CerberusResult { let sub_opt = self.subscription_opt(stream, group_id)?; match sub_opt { Some(sub) => Ok(sub), None => Err( CerberusError::UserFault( format!("Persistent subscription targetting [{}] stream on group [{}] doesn't exist.", stream, group_id))), } } pub fn subscription_opt( &self, stream: &str, group_id: &str, ) -> CerberusResult> { let url = format!( "http://{}:{}/subscriptions/{}/{}/info", self.host(), self.port(), stream, group_id); let req = self.client.get(&url); let mut resp = req.send().map_err(|e| default_connection_error(self, e) )?; if resp.status().is_success() { let detail = resp.json().map_err(|e| CerberusError::UserFault( format!("Failed to deserialize SubscriptionDetail: {}", e)) )?; return Ok(Some(detail)); } if resp.status() == reqwest::StatusCode::NOT_FOUND { return Ok(None); } default_error_handler(resp) } pub fn projections( &self, kind: &str, ) -> CerberusResult> { let req = self.client .get(&format!("http://{}:{}/projections/{}", self.host, self.port, kind)); let mut resp = req.send().map_err(|e| default_connection_error(self, e) )?; if resp.status().is_success() { let result: Projections = resp.json().map_err(|e| CerberusError::DevFault( format!("Failed to deserialize Projections: {}", e)) )?; return Ok(result.projections); } default_error_handler(resp) } pub fn create_subscription( &self, stream: &str, group: &str, config: SubscriptionConfig, ) -> CerberusResult<()> { let url = format!( "http://{}:{}/subscriptions/{}/{}", self.host, self.port, stream, group); let req = self.client .put(&url) .header(reqwest::header::CONTENT_TYPE, "application/json;charset=UTF-8") .body(serde_json::to_vec(&config).unwrap()); let resp = req.send().map_err(|e| default_connection_error(self, e) )?; if resp.status().is_success() { return Ok(()); } default_error_handler(resp) } pub fn update_subscription( &self, stream: &str, group: &str, config: SubscriptionConfig, ) -> CerberusResult<()> { let url = format!( "http://{}:{}/subscriptions/{}/{}", self.host, self.port, stream, group); let req = self.client .post(&url) .header(reqwest::header::CONTENT_TYPE, "application/json;charset=UTF-8") .body(serde_json::to_vec(&config).unwrap()); let resp = req.send().map_err(|e| default_connection_error(self, e) )?; if resp.status().is_success() { return Ok(()); } default_error_handler(resp) } pub fn projection_config( &self, projection_name: &str, ) -> CerberusResult { let url = format!( "http://{}:{}/projection/{}/query", self.host(), self.port(), projection_name); let query = vec![ ("config", "yes"), ]; let req = self.client .get(&url) .query(&query); let mut resp = req.send().map_err(|e| default_connection_error(self, e) )?; if resp.status().is_success() { return resp.json().map_err(|e| CerberusError::DevFault( format!("Failed to deserialize ProjectionConfig: {}", e)) ); } default_error_handler(resp) } pub fn update_projection_query( &self, conf: UpdateProjectionConf, ) -> CerberusResult<()> { let url = format!( "http://{}:{}/projection/{}/query", self.host(), self.port(), conf.name); let emit_value = format!("{}", conf.emit); let track_emitted_streams_value = format!("{}", conf.track_emitted_streams); let query_params = vec![ ("emit", emit_value.as_str()), ("trackemittedstreams", track_emitted_streams_value.as_str()), ("type", "js"), ]; let req = self.client .put(url.as_str()) .header(reqwest::header::CONTENT_TYPE, "application/json;charset=UTF-8") .query(&query_params) .body(conf.query.to_owned()); let resp = req.send().map_err(|e| default_connection_error(self, e) )?; if resp.status().is_success() { return Ok(()); } default_error_handler(resp) } }