use mesos::{Scheduler, SchedulerClient, SchedulerConf, ProtobufCallbackRouter, run_protobuf_scheduler}; use mesos::proto::*; use mesos::util; struct TestScheduler { max_id: u64, } impl TestScheduler { fn get_id(&mut self) -> u64 { self.max_id += 1; self.max_id } } impl Scheduler for TestScheduler { fn subscribed(&mut self, client: &SchedulerClient, framework_id: &FrameworkID, heartbeat_interval_seconds: Option) { println!("received subscribed"); client.reconcile(vec![]); } // Inverse offers are only available with the HTTP API // and are great for doing things like triggering // replication with stateful services before the agent // goes down for maintenance. fn inverse_offers(&mut self, client: &SchedulerClient, inverse_offers: Vec<&InverseOffer>) { println!("received inverse offers"); // this never lets go willingly let offer_ids = inverse_offers.iter() .map(|o| o.get_id().clone()) .collect(); client.decline(offer_ids, None); } fn offers(&mut self, client: &SchedulerClient, offers: Vec<&Offer>) { // Offers are guaranteed to be for the same agent, and // there will be at least one. let agent_id = offers[0].get_agent_id(); println!("received {} offers from agent {}", offers.len(), agent_id.get_value()); let offer_ids: Vec = offers.iter() .map(|o| o.get_id().clone()) .collect(); // get resources with whatever filters you need let mut offer_cpus: f64 = offers.iter() .flat_map(|o| o.get_resources()) .filter(|r| r.get_name() == "cpus") .map(|c| c.get_scalar()) .fold(0f64, |acc, cpu_res| { acc + cpu_res.get_value() }); // or use this if you don't require special filtering let mut offer_mem = util::get_scalar_resource_sum("mem", offers); let mut tasks = vec![]; while offer_cpus >= 1f64 && offer_mem >= 128f64 { let name = &*format!("sleepy-{}", self.get_id()); let task_id = util::task_id(name); let mut command = CommandInfo::new(); command.set_value("env && sleep 10".to_string()); let mem = util::scalar("mem", "*", 128f64); let cpus = util::scalar("cpus", "*", 1f64); let resources = vec![mem, cpus]; let task_info = util::task_info(name, &task_id, agent_id, &command, resources); tasks.push(task_info); offer_cpus -= 1f64; offer_mem -= 128f64; } client.launch(offer_ids, tasks, None); } fn rescind(&mut self, client: &SchedulerClient, offer_id: &OfferID) { println!("received rescind"); } fn update(&mut self, client: &SchedulerClient, status: &TaskStatus) { println!("received update {:?} from {}", status.get_state(), status.get_task_id().get_value()); } fn message(&mut self, client: &SchedulerClient, agent_id: &AgentID, executor_id: &ExecutorID, data: Vec) { println!("received message"); } fn failure(&mut self, client: &SchedulerClient, agent_id: Option<&AgentID>, executor_id: Option<&ExecutorID>, status: Option) { println!("received failure"); } fn error(&mut self, client: &SchedulerClient, message: String) { println!("received error"); } fn heartbeat(&mut self, client: &SchedulerClient) { println!("received heartbeat"); } fn disconnected(&mut self) { println!("disconnected from scheduler"); } } #[test] fn main() { let mut scheduler = TestScheduler { max_id: 0 }; let conf = SchedulerConf { master_url: "http://localhost:5050".to_string(), user: "root".to_string(), name: "rust http".to_string(), framework_timeout: 0f64, implicit_acknowledgements: true, framework_id: None, }; // If you don't like the callback approach, you can implement // an event router of your own. This is merely provided for // those familiar with the mesos libraries in other languages. let mut router = ProtobufCallbackRouter { scheduler: &mut scheduler, conf: conf.clone(), }; run_protobuf_scheduler(&mut router, conf) }