#![cfg(feature = "serde")] use collectd_plugin::{ collectd_plugin, CollectdLoggerBuilder, ConfigItem, Plugin, PluginCapabilities, PluginManager, PluginRegistration, Value, ValueList, }; use log::error; use log::LevelFilter; use serde::Deserialize; use std::borrow::Cow; use std::error; use std::io::Write; use std::net::TcpStream; use std::ops::Deref; use std::sync::Mutex; /// Here is what our collectd config can look like: /// /// ``` /// LoadPlugin write_graphite_rust /// /// /// Name "localhost.1" /// Address "127.0.0.1:20003" /// /// /// Name "localhost.2" /// Address "127.0.0.1:20004" /// Prefix "iamprefix" /// /// /// ``` #[derive(Deserialize, Debug, PartialEq, Default)] #[serde(deny_unknown_fields)] struct GraphiteConfig { #[serde(rename = "Node")] nodes: Vec, } #[derive(Deserialize, Debug, PartialEq, Default)] #[serde(rename_all = "PascalCase")] #[serde(deny_unknown_fields)] struct GraphiteNode { name: String, address: String, prefix: Option, } struct GraphitePlugin { // We need a mutex as writers aren't thread safe writer: Mutex, prefix: Option, } struct GraphiteManager; impl PluginManager for GraphiteManager { fn name() -> &'static str { "write_graphite_rust" } fn plugins( config: Option<&[ConfigItem<'_>]>, ) -> Result> { // Register a logging hook so that any usage of the `log` crate will be forwarded to // collectd's logging facilities CollectdLoggerBuilder::new() .prefix_plugin::() .filter_level(LevelFilter::Info) .try_init() .expect("really the only thing that should create a logger"); // Deserialize the collectd configuration into our configuration struct let config: GraphiteConfig = collectd_plugin::de::from_collectd(config.unwrap_or_else(Default::default))?; let config: Vec<(String, Box)> = config .nodes .into_iter() .map(|x| { let plugin = GraphitePlugin { writer: Mutex::new(TcpStream::connect(x.address)?), prefix: x.prefix, }; let bx: Box = Box::new(plugin); Ok((x.name, bx)) }) .collect::, Box>>()?; Ok(PluginRegistration::Multiple(config)) } } /// If necessary removes any characters from a string that have special meaning in graphite. fn graphitize(s: &str) -> Cow<'_, str> { let needs_modifying = s .chars() .any(|x| x == '.' || x.is_whitespace() || x.is_control()); if !needs_modifying { Cow::Borrowed(s) } else { let new_s: String = s .chars() .map(|x| { if x == '.' || x.is_whitespace() || x.is_control() { '-' } else { x } }) .collect(); Cow::Owned(new_s) } } impl GraphitePlugin { fn write_value(&self, mut line: String, val: Value, dt: &str) { line.push(' '); line.push_str(val.to_string().as_str()); line.push(' '); line.push_str(dt); line.push('\n'); // Finally, we get our exclusive lock on the tcp writer and send our data down the pipe. If // there is a failure, the proper response would be to try and allocate a new connection or // backoff. Instead we log the error. let mut w = self.writer.lock().unwrap(); if let Err(ref e) = w.write(line.as_bytes()) { error!("could not write to graphite: {}", e); } } } impl Plugin for GraphitePlugin { fn capabilities(&self) -> PluginCapabilities { PluginCapabilities::WRITE } fn write_values(&self, list: ValueList<'_>) -> Result<(), Box> { // We use a heap allocated string to construct data to send to graphite. Collectd doesn't // use the heap (preferring fixed size arrays). We could get the same behavior using the // ArrayString type from the arrayvec crate. let mut line = String::new(); if let Some(ref prefix) = self.prefix { line.push_str(prefix.as_str()); line.push('.'); } line.push_str(graphitize(list.host).deref()); line.push('.'); line.push_str(graphitize(list.plugin).deref()); if let Some(instance) = list.plugin_instance { line.push('-'); line.push_str(graphitize(instance).deref()); } line.push('.'); line.push_str(graphitize(list.type_).deref()); if let Some(type_instance) = list.type_instance { line.push('-'); line.push_str(graphitize(type_instance).deref()); } let dt = list.time.timestamp().to_string(); // If there is only one value in the list we don't have to clone our premade string, // instead we can write it directly if list.values.len() == 1 { self.write_value(line, list.values[0].value, dt.as_str()); } else { for v in list.values { let mut nv = line.clone(); nv.push('.'); nv.push_str(graphitize(v.name).deref()); self.write_value(nv, v.value, dt.as_str()); } } Ok(()) } } collectd_plugin!(GraphiteManager); #[cfg(test)] mod tests { use super::*; #[test] fn test_graphitize() { assert_eq!(graphitize("hello").deref(), "hello"); assert_eq!(graphitize("hello.maty").deref(), "hello-maty"); assert_eq!(graphitize("foo@bar.com").deref(), "foo@bar-com"); assert_eq!(graphitize(" test \n ").deref(), "--test---"); } }