use crate::common::{
self,
CerberusResult,
CerberusError,
SubscriptionSummary,
Projections,
Projection,
};
use serde::{ Deserialize, Serialize };
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct SubscriptionDetail {
pub event_stream_id: String,
pub group_name: String,
pub config: SubscriptionConfig,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub enum NamedConsumerStrategy {
RoundRobin,
DispatchToSingle,
Pinned,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct SubscriptionConfig {
pub resolve_linktos: bool,
pub start_from: i64,
pub message_timeout_milliseconds: usize,
pub extra_statistics: bool,
pub max_retry_count: usize,
pub live_buffer_size: usize,
pub buffer_size: usize,
pub read_batch_size: usize,
pub check_point_after_milliseconds: usize,
pub min_check_point_count: usize,
pub max_check_point_count: usize,
pub max_subscriber_count: usize,
pub named_consumer_strategy: NamedConsumerStrategy,
}
impl Default for SubscriptionConfig {
fn default() -> Self {
SubscriptionConfig {
resolve_linktos: false,
start_from: 0,
message_timeout_milliseconds: 10_000,
extra_statistics: false,
max_retry_count: 10,
live_buffer_size: 500,
buffer_size: 500,
read_batch_size: 20,
check_point_after_milliseconds: 1_000,
min_check_point_count: 10,
max_check_point_count: 500,
max_subscriber_count: 10,
named_consumer_strategy: NamedConsumerStrategy::RoundRobin,
}
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct ProjectionConfig {
pub name: String,
pub query: String,
#[serde(default)]
pub emit_enabled: bool,
}
pub struct Api<'a> {
host: &'a str,
port: u16,
client: reqwest::Client,
}
pub struct ProjectionConf<'a> {
pub name: Option<&'a str>,
pub kind: &'a str,
pub enabled: bool,
pub emit: bool,
pub checkpoints: bool,
pub track_emitted_streams: bool,
pub script: String,
}
pub struct UpdateProjectionConf<'a> {
pub name: &'a str,
pub emit: bool,
pub track_emitted_streams: bool,
pub query: &'a str,
}
pub enum ClusterState {
Cluster(common::ClusterMembers),
ProblematicClusterNode,
NoCluster,
}
fn default_error_handler(mut resp: reqwest::Response) -> CerberusResult {
if resp.status().is_client_error() {
if resp.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(
CerberusError::UserFault(
"Your current user cannot perform that action.".to_owned()));
}
if resp.status() == reqwest::StatusCode::NOT_FOUND {
return Err(
CerberusError::UserFault(
"You are asking for a resource that doesn't exist.".to_owned()));
}
let msg = resp.text().unwrap_or_else(|_| "".to_owned());
return Err(
CerberusError::UserFault(
format!("User error: {}", msg)));
}
if resp.status().is_server_error() {
let msg = resp.text().unwrap_or_else(|_| "".to_owned());
return Err(
CerberusError::UserFault(
format!("Server error: [{}] {}", resp.status(), msg)));
}
Err(
CerberusError::DevFault(
format!("{:?}", resp)))
}
fn default_connection_error(
api: &Api,
error: reqwest::Error,
) -> CerberusError {
CerberusError::UserFault(
format!("Unable to connect to node {}:{}: {}", api.host(), api.port(), error))
}
impl<'a> Api<'a> {
pub fn new(
host: &'a str,
port: u16,
user_opt: Option,
) -> Api<'a> {
let mut builder = reqwest::Client::builder();
if let Some(user) = user_opt {
let mut headers = reqwest::header::HeaderMap::new();
let auth = match user.password {
None => format!("{}:", user.login),
Some(passw) => format!("{}:{}", user.login, passw),
};
let header_value = format!("Basic {}", base64::encode(&auth));
let header_value = reqwest::header::HeaderValue::from_str(&header_value).unwrap();
headers.insert(reqwest::header::AUTHORIZATION, header_value);
builder = builder.default_headers(headers);
}
Api {
host,
port,
client: builder.build().unwrap(),
}
}
pub fn host(&self) -> &str {
self.host
}
pub fn port(&self) -> u16 {
self.port
}
pub fn with_different_node<'b>(
&self,
host: &'b str,
port: u16,
) -> Api<'b> {
Api {
host,
port,
client: self.client.clone(),
}
}
pub fn create_projection(
&self,
conf: ProjectionConf,
) -> CerberusResult {
let enabled = format!("{}", conf.enabled);
let emit = format!("{}", conf.emit);
let checkpoints = format!("{}", conf.checkpoints);
let track_emitted_streams_value = format!("{}", conf.track_emitted_streams);
let mut query = vec![
("enabled", enabled.as_str()),
("emit", emit.as_str()),
("checkpoints", checkpoints.as_str()),
("trackemittedstreams", track_emitted_streams_value.as_str()),
("type", "JS")
];
if let Some(name) = conf.name {
query.push(("name", name));
}
let req = self.client
.post(&format!("http://{}:{}/projections/{}", self.host, self.port, conf.kind))
.header(reqwest::header::CONTENT_TYPE, "application/json;charset=UTF-8")
.query(&query)
.body(conf.script);
let mut resp = req.send().map_err(|e| {
CerberusError::UserFault(
format!("Failed to create a projection: {}", e))
})?;
if resp.status().is_success() {
return resp.json().map_err(|e| {
let msg = resp.text().unwrap_or_else(|_| "".to_owned());
CerberusError::DevFault(
format!(
"We were unable to deserialize ProjectionCreationSuccess \
out of a projection creation response. \
response body [{}], Error: {}", msg, e))
});
}
default_error_handler(resp)
}
pub fn projection_cropped_info(
&self,
projection_name: &str,
) -> CerberusResult {
let info_opt = self.projection_cropped_info_opt(projection_name)?;
match info_opt {
Some(info) =>
Ok(info),
None =>
Err(
CerberusError::UserFault(
format!("Projection [{}] doesn't exist.", projection_name))),
}
}
pub fn projection_cropped_info_opt(
&self,
projection_name: &str,
) -> CerberusResult