// use serde::{Serialize, Deserialize}; // // use std::collections::HashMap; // use zookeeper::ZooKeeper; // use std::sync::Arc; // use zkmq::ZkPath; // use std::convert::TryInto; // use chrono::{Utc, DateTime}; // use chrono::serde::ts_seconds; // use anyhow::Context; // use log::*; // // // const STATE_EVAL_INTERVAL: i64 = 30; // // // macro_rules! map( // { $($key:expr => $value:expr),+ } => { // { // let mut m = ::std::collections::HashMap::new(); // $( // m.insert($key, $value); // )+ // m // } // }; // ); // // #[derive(Serialize, Deserialize, PartialEq)] // enum JobType { // Infinite, // Finite(u16) // } // // #[derive(Serialize, Deserialize, PartialEq)] // enum SchedulePolicy { // /// The task is inserted with a schedule_after value of Now(), and on execution the schedule_adter // /// is set to Now() // Immediate, // /// Delay execution(seconds). // /// The task is inserted with a schedule_after value of Now(), and on on execute the schedule_after // /// time is set to Now() + Seconds // Delayed(u64), // /// Reference Time, Seconds // /// Tasks will be scheduled relative Reference Time. If the RefTime is 2021-03-06 21:00:00, and // /// the current time is 2021-03-07 21:30:00 with a Interval of 120 seconds. Next Execution Time // /// is calculated by adding 120 seconds to the reference time until the timestamp is in the future. // /// That timestamp is used for the schedule_after field of the task // Scheduled(u64, u64) // } // // #[derive(Serialize, Deserialize, PartialEq)] // enum RetryPolicy { // /// Only try once // SuccessOrFailure, // /// Tru N times // SuccessOrRetry(u8), // /// Try N times, backing off (Ntotal - Ncurremt)! (like 3! = 3*2*1) seconds // SuccessOrBackoff(u8) // } // // #[derive(Serialize, Deserialize)] // struct Job { // /// Resources the job is requesting // resources: HashMap, // /// Method being invoked // method: String, // /// Arguments to pass to the method // arguments: HashMap, // /// Job Type // job_type: JobType, // /// Scheduling Policy // policy: SchedulePolicy, // // retry: RetryPolicy, // // #[serde(with = "ts_seconds")] // schedule_after: DateTime // } // // #[derive(Serialize, Deserialize)] // enum JobState { // // // // Pending (Back off) - how many times has this job attempted to be scheduled // Pending(u8), // // Running (Back off) - how many times has the probe returned Lost under BEFORE_LOST threshold // Running, // Exited(u8) // } // // #[derive(Serialize, Deserialize)] // enum JobEvaluator { // Noop, // // Validate Health of the Job // Evaluate, // /// Evaluate the // EvaluateExit, // /// Probe the health of the instance on the executor // Probe, // // } // // /// Scan all jobs in //job/ // fn state_evaluator(zk: Arc) -> anyhow::Result<()> { // let dir = ZkPath::new("/jotty"); // // read //processes/state_evaluation/last_run to see if my StateEvaluationRate has passed // if let Ok(timestamp) = zk.get_data(&*dir.join("proc/state_eval/last_run").to_string(), false) { // let ts = chrono::NaiveDateTime::from_timestamp(i64::from_be_bytes(timestamp.0.try_into().unwrap()), 0); // // if the last run time + our STATE_EVAL_INTERVAL is in the past, we can run // if (ts + chrono::Duration::seconds(STATE_EVAL_INTERVAL)) > chrono::Utc::now().naive_utc() { // println!("not due for another evaluation, sleeping"); // return Ok(()); // } // } // // if it has, try and latch //processes/state_evaluation/lock // // let latch = zookeeper::recipes::leader::LeaderLatch::new(zk.clone(), "state_evaluator".to_string(), dir.join("lock").to_string()); // if !latch.has_leadership() { // println!("cannot lock state_evaluator"); // return Ok(()); // } // let sync = zkmq::producer::ZkMQProducer::new(zk.clone(), &*dir.join("queue").join("job_eval").to_string(), Some("state_evaluator")).unwrap(); // // // get all children in //job // let all_jobs = zk.get_children(&*dir.join("job").to_string(), false)?; // all_jobs.iter().for_each(|v| { sync.produce(v.as_bytes()); }); // // Ok(()) // } // // // struct ExecutorState { // pub id: String, // // pub address: String, // pub port: u16, // } // // // fn probe_job_health(zk: Arc, job: String) -> anyhow::Result> { // // let allocs=zk.get_children(&*format!("/jotty/job/{}/alloc", &job), false).context("looking up executor id")?; // debug!("job {} has {} allocations", &job, allocs.len()); // // for alloc in allocs { // // get the executor ID // let executor_id = String::from_utf8(zk.get_data("/jotty/job/{}/alloc/{}", false)?.0)?; // let alloc_state: AllocState = serde_json::from_slice(&*zk.get_data(&*format!("/jotty/executor/{}/alloc/{}", &executor_id, &alloc), false)?.0)?; // // } // // let lookup = zk.get_data(&*format!("/jotty/executor/{}/state", &job), false); // if lookup.is_err() { // Ok(Err(JobProbeError::ExecutorNotRegistered)) // } // let executor_state: ExecutorState = serde_json::from_slice(&*lookup.unwrap().0).context("parsing executor state")?; // // let executor_psk = String::from_utf8(zk.get_data(&*format!("/jotty/job/{}/executor", &job), false).context("looking up executor id")?.0).context("converting executor raw data to a string")?; // // // Ok(Ok(())) // } // // fn scheduler() { // // } // // fn job_evaluator(zk: Arc) -> anyhow::Result<()> { // let mut queue = zkmq::consumer::ZkMQConsumer::new(zk.clone(), &*dir.join("queue").join("job_eval").to_string(), Some("job_evaluator_1")).unwrap(); // // loop { // let message_r = queue.consume(None); // if message_r.is_err() { // // } // let message = message_r.unwrap(); // // let operation: JobEvaluator = serde_json::from_slice(&*message.body).unwrap(); // match operation { // JobEvaluator::Noop => {} // JobEvaluator::Evaluate => {} // JobEvaluator::EvaluateExit => {} // JobEvaluator::Probe => {} // } // } // // // for each child, figure out what operation to take: // // - read //job//state // // - If Pending: // // - Read //job//schedule_after, and check if the time is in the past // // - Evaluate JobType, SchedulePolicy, and RetryPolicy, or transisition job to a exited state // // - Create Schedule(job_id) task // // - If Running: // // - Create a Probe(job_id) task // // - If Exited: // // - Eval RetryPolicy, if exit code != 0 // // - Eval JobType, decrement Finite value if applicable // // - Eval SchedulePolicy for when schedule_after should be set to // // - Set /state to either Pending or Completed // // - If Completed: // // - do nothing // } // // fn main() { // // // let job = Job { // // resources: map!{ // // "cpu_cores" => 2, // // "memory" => 512, // // }, // // method: "co.volf.nomadic.spawn_gn1_vm".to_string(), // // arguments: map!{ // // "instance_id".to_string() => "svm-a1", // // } // // // // }; // // } fn main() {}