#![feature(scoped)] extern crate iron; extern crate router; extern crate rustc_serialize; extern crate toml; extern crate term; use toml::{Parser, Value}; use rustc_serialize::json::{ToJson, Json}; use iron::{Iron, Request, Response, IronResult}; use iron::status; use iron::middleware::Handler; use router::{Router}; use std::process::{Command, Stdio}; use std::vec::Vec; use std::sync::{Arc, Mutex, RwLock}; use std::thread; use std::thread::JoinHandle; use std::collections::{HashMap, BTreeMap}; use std::io::prelude::*; use std::fs::File; mod logger; #[derive(Debug)] enum ProcessStatus { Started, Running(usize), Done(bool), RetriesExceeded } impl ToJson for ProcessStatus { fn to_json(&self) -> Json { let mut d = BTreeMap::new(); match *self { ProcessStatus::Started => { d.insert("state".to_string(), "started".to_json()); }, ProcessStatus::RetriesExceeded => { d.insert("state".to_string(), "retries_exceeded".to_json()); }, ProcessStatus::Running(n) => { d.insert("state".to_string(), "running".to_string().to_json()); d.insert("meta".to_string(), n.to_json()); }, ProcessStatus::Done(b) => { d.insert("state".to_string(), "done".to_string().to_json()); d.insert("meta".to_string(), b.to_json()); } } Json::Object(d) } } #[derive(Debug)] struct ManagedProcess { name : String, command : String, max_retries : usize, current_try : Arc>, status : Arc>, color : term::color::Color } impl ToJson for ManagedProcess { fn to_json(&self) -> Json { let mut d = BTreeMap::new(); d.insert("name".to_string(), self.name.to_json()); d.insert("command".to_string(), self.command.to_json()); d.insert("max_retries".to_string(), self.max_retries.to_json()); { d.insert("status".to_string(), self.status.read().unwrap().to_json()); } Json::Object(d) } } impl ManagedProcess { fn new(color : term::color::Color, name : String, command : String, max_retries : usize) -> ManagedProcess { let p = ManagedProcess { name : name, command : command, max_retries : max_retries, current_try : Arc::new(RwLock::new(1)), status : Arc::new(RwLock::new(ProcessStatus::Started)), color : color }; p } fn start(&self) { loop { { let mut s = self.status.write().unwrap(); *s = ProcessStatus::Running(self.current_try.read().unwrap().clone()); } let mut child = Command::new(self.command.to_owned()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() .unwrap_or_else(|e| { panic!("oops: {}", e) }); logger::follow("stdout".to_string(), self.color, self.name.to_owned(), child.stdout.take()); logger::follow("stderr".to_string(), self.color, self.name.to_owned(), child.stderr.take()); match child.wait() { Ok(status) => { { let mut s = self.status.write().unwrap(); if status.success() { *s = ProcessStatus::Done(true); println!{"[rust-pm] info: {} - finished.", self.name} break; } } }, Err(e) => { panic!("oops: {}", e) } } if *self.current_try.read().unwrap() == self.max_retries && self.max_retries > 0 { { let mut s = self.status.write().unwrap(); *s = ProcessStatus::RetriesExceeded; } println!{"[rust-pm] info: {} - reached max retries of {}. Giving up.", self.name, self.max_retries } break; } { let max_retries_str = if self.max_retries == 0 { "Forever".to_string() } else { format!("{}", self.max_retries) }; let mut cur = self.current_try.write().unwrap(); println!{"[rust-pm] info: {} - exited with error. Try {} out of {}.", self.name, *cur, max_retries_str } *cur += 1; } } } } struct ProcessManager { processes : HashMap>>, handlers : HashMap> } impl ProcessManager { fn new() -> ProcessManager { let pm = ProcessManager { processes : HashMap::new(), handlers : HashMap::new() }; pm } fn manage(&mut self, p : ManagedProcess) { self.processes.insert(p.name.clone(), Arc::new(RwLock::new(p))); } fn start_all(&mut self) { for (name, process) in self.processes.iter() { let ft1 = process.clone(); let h = thread::spawn(move || { let p = ft1.read().unwrap(); p.start(); }); self.handlers.insert(name.clone(), h); } } } struct IndexHandler { pm : Arc> } impl IndexHandler { fn new(pm : &Arc>) -> IndexHandler { IndexHandler { pm : pm.clone() } } } impl Handler for IndexHandler { fn handle(&self, _ : &mut Request) -> IronResult { let mut ps : Vec = Vec::new(); for p in self.pm.lock().unwrap().processes.values() { ps.push(p.read().unwrap().to_json()); } let json_ps = ps.to_json().to_string(); Ok(Response::with((status::Ok, json_ps))) } } fn load_config() -> BTreeMap { let mut f = File::open("./rust-pm.toml").ok().unwrap(); let mut s = String::new(); let _ = f.read_to_string(&mut s); let mut tom_parser = Parser::new(s.as_ref()); match tom_parser.parse() { Some(v) => { v }, None => { panic!("Invalid cofig: {:?}", tom_parser.errors); } } } fn main () { let mut pm = ProcessManager::new(); let mut table = load_config(); let server_port = table.remove("server-port") .expect("Server port required"); let mut colors = logger::ColorSelector::new(); for (k, c) in table { let sub_table = c.as_table(); if !sub_table.is_some() { continue; } let command = sub_table.unwrap().get("command") .expect("command required") .as_str() .expect("Invalir value for command").to_string(); let retries = sub_table.unwrap().get("max_retries") .expect("max_retries required") .as_integer().expect("Invalid value for max_retries"); pm.manage(ManagedProcess::new(colors.next(), k, command, retries as usize)); } let arc_pm = Arc::new(Mutex::new(pm)); let mut router = Router::new(); router.get("/", IndexHandler::new(&arc_pm)); router.get("/:query", handler); arc_pm.lock().unwrap().start_all(); Iron::new(router).http(server_port.as_str().expect("Invalid server:port")).unwrap(); fn handler(req: &mut Request) -> IronResult { let ref query = req.extensions.get::() .unwrap().find("query").unwrap_or("/"); Ok(Response::with((status::Ok, *query))) } }