use std::collections::{HashMap, VecDeque}; use std::fs::{DirBuilder, File}; use std::io::{self, Write}; use std::path::{Path, PathBuf}; use std::str::FromStr; use base64::engine::general_purpose; use base64::Engine as _; use lunatic::ap::{AbstractProcess, Config, ProcessRef}; use lunatic::supervisor::{Supervisor, SupervisorConfig}; use lunatic::{abstract_process, ProcessName}; use serde::{Deserialize, Serialize}; use submillisecond::params::Params; use submillisecond::response::Response; use submillisecond::{router, Application, Json, RequestContext, Router}; use uuid::Uuid; // ===================================== // Middleware for requests // ===================================== fn logging_middleware(req: RequestContext) -> Response { let request_id = req .headers() .get("x-request-id") .and_then(|req_id| req_id.to_str().ok()) .map(|req_id| req_id.to_string()) .unwrap_or_else(|| "DEFAULT_REQUEST_ID".to_string()); println!("[ENTER] request {request_id}"); let res = req.next_handler(); println!("[EXIT] request {request_id}"); res } // ===================================== // Persistence utils // ===================================== const NEWLINE: &[u8] = &[b'\n']; /// Every Line is a new "state change" entry /// Each line starts with one of the following keywords /// that indicate the type of entry const NEW_USER: u8 = 1; const PUSH_TODO: u8 = 2; const POLL_TODO: u8 = 3; #[derive(Debug)] pub struct FileLog { // cwd: str, // file_name: str, full_path: PathBuf, file: File, } #[derive(Serialize, Deserialize)] struct PushEntry { user_uuid: Uuid, todo: Todo, } impl FileLog { pub fn new(cwd: &str, file_name: &str) -> FileLog { DirBuilder::new().recursive(true).create(cwd).unwrap(); let full_path = Path::new(cwd).join(file_name); FileLog { // cwd, // file_name, full_path: full_path.clone(), file: match File::create(&full_path) { Err(why) => panic!("couldn't open {cwd:?}: {why}"), // write 0 as initial cursor Ok(file) => file, }, } } pub fn append_new_user(&mut self, user: &User) { self.append(NEW_USER, ron::to_string(user).unwrap().as_bytes()) } pub fn append_poll_todo(&mut self, user_uuid: Uuid, todo_uuid: Uuid) { self.append( POLL_TODO, ron::to_string(&(user_uuid, todo_uuid)).unwrap().as_bytes(), ) } pub fn append_push_todo(&mut self, user_uuid: Uuid, todo: Todo) { self.append( PUSH_TODO, ron::to_string(&PushEntry { user_uuid, todo }) .unwrap() .as_bytes(), ) } pub fn append(&mut self, header: u8, data: &[u8]) { // let x: MyStruct = ron::from_str("(boolean: true, float: 1.23)").unwrap(); let encoded = general_purpose::STANDARD.encode(data); let buf = [&[header], encoded.as_bytes(), NEWLINE].concat(); match self.file.write_all(&buf) { Err(why) => panic!( "[FileLog {:?}] couldn't write to file: {}", self.full_path, why ), Ok(_) => println!( "[FileLog {:?}] Successfully appended log to file", self.full_path ), }; } } // ===================================== // Persistence process definition // ===================================== pub struct PersistenceSup; impl Supervisor for PersistenceSup { type Arg = String; type Children = (PersistenceProcess,); fn init(config: &mut SupervisorConfig, name: Self::Arg) { // Always register the `PersistenceProcess` under the name passed to the // supervisor. config.children_args((((), Some(name)),)) } } #[derive(ProcessName)] pub struct PersistenceProcessID; pub struct PersistenceProcess { users: HashMap, users_nicknames: HashMap, wal: FileLog, } #[abstract_process(visibility=pub)] impl PersistenceProcess { #[init] fn init(_: Config, _: ()) -> Result { // Coordinator shouldn't die when a client dies. This makes the link // one-directional. unsafe { lunatic::host::api::process::die_when_link_dies(0) }; Ok(PersistenceProcess { users: HashMap::new(), users_nicknames: HashMap::new(), wal: FileLog::new("/persistence", "todos.wal"), }) } #[handle_request] fn add_todo(&mut self, user_id: Uuid, todo: Todo) -> bool { if let Some(user) = self.users.get_mut(&user_id) { self.wal.append_push_todo(user.uuid, todo.clone()); user.todos.push_back(todo); return true; } false } #[handle_request] fn create_user(&mut self, CreateUserDto { nickname, name }: CreateUserDto) -> Option { let user_uuid = Uuid::new_v4(); if self.users_nicknames.get(&nickname).is_some() { // user already exists return None; } let user = User { uuid: user_uuid, nickname: nickname.clone(), name, todos: VecDeque::new(), }; self.wal.append_new_user(&user); self.users_nicknames.insert(nickname, user_uuid); self.users.insert(user_uuid, user); Some(user_uuid) } #[handle_request] fn poll_todo(&mut self, user_id: Uuid) -> Option { if let Some(user) = self.users.get_mut(&user_id) { if let Some(front) = user.todos.front() { self.wal.append_poll_todo(user.uuid, front.uuid); } return user.todos.pop_front(); } None } #[handle_request] fn peek_todo(&mut self, user_id: Uuid) -> Option { if let Some(user) = self.users.get_mut(&user_id) { if let Some(f) = user.todos.front() { return Some(f.clone()); } } None } #[handle_request] fn list_todos(&mut self, user_id: Uuid) -> Vec { // self.todos_wal // .append_confirmation(message_uuid, pubrel.clone(), SystemTime::now()); if let Some(user) = self.users.get_mut(&user_id) { return user.todos.iter().cloned().collect(); } vec![] } } // ===================================== // DTOs // ===================================== #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Todo { uuid: Uuid, title: String, description: String, } #[derive(Serialize, Deserialize, Clone, Debug)] pub struct User { uuid: Uuid, nickname: String, name: String, todos: VecDeque, } #[derive(Serialize, Deserialize, Debug)] pub struct CreateUserDto { nickname: String, name: String, } #[derive(Serialize, Deserialize, Debug)] struct CreateTodoDto { title: String, description: String, } #[derive(Serialize, Deserialize, Debug)] struct CreateUserResponseDto { uuid: Uuid, } // routes logic fn create_user(user: Json) -> Json { let persistence = ProcessRef::::lookup(&"persistence").unwrap(); if let Some(uuid) = persistence.create_user(user.0) { return Json(CreateUserResponseDto { uuid }); } panic!("Cannot create user"); } fn list_todos(params: Params) -> Json> { let persistence = ProcessRef::::lookup(&PersistenceProcessID).unwrap(); let user_id = params.get("user_id").unwrap(); let todos = persistence.list_todos(Uuid::from_str(user_id).unwrap()); Json(todos) } fn poll_todo(params: Params) -> Json { let persistence = ProcessRef::::lookup(&PersistenceProcessID).unwrap(); let user_id = params.get("user_id").unwrap(); if let Some(todo) = persistence.poll_todo(Uuid::from_str(user_id).unwrap()) { return Json(todo); } panic!("Cannot poll todo {params:#?}"); } fn push_todo(params: Params, body: Json) -> Json> { let persistence = ProcessRef::::lookup(&PersistenceProcessID).unwrap(); let user_id = params.get("user_id").unwrap(); println!("RECEIVED BODY {body:?} | {user_id}"); let todo = Todo { uuid: Uuid::new_v4(), title: body.0.title, description: body.0.description, }; if persistence.add_todo(Uuid::from_str(user_id).unwrap(), todo.clone()) { return Json(Some(todo)); } Json(None) } fn liveness_check() -> &'static str { println!("Running liveness check"); r#"{"status":"UP"}"# } // has the prefix /api/mgmt const MGMT_ROUTER: Router = router! { GET "/alive" => liveness_check GET "/health" => liveness_check GET "/metrics" => liveness_check }; const ROUTER: Router = router! { with logging_middleware; "/api/users" => { POST "/" => create_user "/:user_id" => { GET "/todos" => list_todos POST "/todos" => push_todo POST "/todos/poll" => poll_todo } } "/api/mgmt" => MGMT_ROUTER() GET "/something_different/:shoppingcart_id" => liveness_check }; fn main() -> io::Result<()> { PersistenceSup::link() .start("persistence".to_owned()) .unwrap(); Application::new(ROUTER).serve("0.0.0.0:3000") }