use crate::io; use crate::Config; // use crate::map::init as MapSpawnInit; // use crate::disk::init as DiskSpawnInit; use std::fs::Metadata; use gzb_binary_69::{Reader}; use tokio::fs::File; use tokio::spawn as TokioSpawn; use flume::Sender as FlumeSender; // use flume::Receiver as FlumeReceiver; use crate::workers::{u64_from_bytes}; use crate::workers::{Signal,Pointer}; // use tokio::sync::{Notify,Mutex}; use std::sync::Arc; use crate::config::{ MapConfig,MapMessage, // MapAddMessage,MapGetMessage,MapRemoveMessage, LocatorMessage,LocatorNext,LocatorAdd,LocatorRemove,LocatorReset // DiskConfig, }; use flume::unbounded; use std::collections::HashMap; use futures::future::join_all; use crate::response::QueResponse; // use tokio::runtime::Builder as TokioRuntimeBuilder; #[derive(Debug,Clone)] pub struct Que{ locator_sender:Arc> } const MB1:u64 = 10_000; impl Que{ pub async fn new(c:Config)->Result{ // println!("que config : {:?}",c); let (locator_sender,locator_receiver) = unbounded(); let mut locator:HashMap = HashMap::new(); let mut all_items = Vec::new(); let mut map_index:u8 = 0; let mut collect_map_initiaters = Vec::new(); for path in c.files.iter(){ map_index += 1; collect_map_initiaters.push(init_map( map_index, path.clone(), locator_sender.clone(), c.num_of_writers.clone(), c.min_que_size.clone(), c.expand_size.clone() )); } let mut map_senders = HashMap::new(); for result in join_all(collect_map_initiaters).await{ match result{ Ok((index,mut items,map_sender))=>{ for item in items.iter(){ locator.insert(item.clone(),index); } all_items.append(&mut items); map_senders.insert(index,map_sender); }, Err(_e)=>{ return Err(_e); } } } all_items.sort(); // println!("{:?}",all_items.len()); TokioSpawn(async move{ crate::locator::init( &mut map_senders, &mut all_items, &mut locator, locator_receiver, map_index ).await; }); return Ok(Que{ locator_sender:Arc::new(locator_sender) }); } pub async fn add(&mut self,value:Vec)->Result{ let (signal,sleeper) = Signal::new(); match self.locator_sender.send_async( LocatorMessage::Add(LocatorAdd{ value:value, signal:signal.clone(), }) ).await{ Ok(_)=>{ return Ok(QueResponse::new(signal,sleeper)); }, Err(_)=>{ return Err(()); } } } pub async fn next(&mut self)->Result{ let (signal,sleeper) = Signal::new(); match self.locator_sender.send_async( LocatorMessage::Next(LocatorNext{ signal:signal.clone(), }) ).await{ Ok(_)=>{ return Ok(QueResponse::new(signal,sleeper)); }, Err(_)=>{ return Err(()); } } } pub async fn remove(&mut self,pointer:Pointer)->Result{ let (signal,sleeper) = Signal::new(); match self.locator_sender.send_async( LocatorMessage::Remove(LocatorRemove{ pointer:pointer, signal:signal.clone(), }) ).await{ Ok(_)=>{ return Ok(QueResponse::new(signal,sleeper)); }, Err(_)=>{ return Err(()); } } } pub async fn reset(&mut self,pointer:Pointer)->Result{ let (signal,sleeper) = Signal::new(); match self.locator_sender.send_async( LocatorMessage::Reset(LocatorReset{ pointer:pointer, signal:signal.clone(), }) ).await{ Ok(_)=>{ return Ok(QueResponse::new(signal,sleeper)); }, Err(_)=>{ return Err(()); } } } } async fn init_map( index:u8, path:String, locator_sender:FlumeSender, num_of_writers:u8, min_que_size:u64, expand_size:u64 )->Result<(u8,Vec,FlumeSender),&'static str>{ //ensure file let metadata:Metadata; let mut file:File; match io::init_map(path.clone(),min_que_size).await{ Ok(v)=>{ file = v.0; metadata = v.1; }, Err(_e)=>{ //debug_error("failed-init-map-que.rs",ERROR); println!("!!! failed-init-map-que : {:?}",_e); return Err("failed-init-map"); } } let reader:Reader; let items:Vec; match build_map(metadata,&mut file).await{ Ok(r)=>{ reader = r.0; items = r.1; // return Ok((index,r.0,r.1)); }, Err(_e)=>{ //debug_error("failed-build-map-que.rs",ERROR); println!("!!! failed-build-map : {:?}",_e); return Err("failed-build-map"); } } // println!("{:?}",reader.corrupt); let (build_map_config,map_sender) = MapConfig::new( index.clone(), path, reader, locator_sender, num_of_writers, min_que_size, expand_size ); TokioSpawn(async move{ crate::map::init(build_map_config).await; }); // return Err("no_error"); return Ok((index,items,map_sender)); } async fn build_map(metadata:Metadata,file:&mut File)->Result<(Reader,Vec),&'static str>{ //meta info let mut len:u64 = metadata.len();//bytes let chunk_size:u64; //bytes if len > MB1{ chunk_size = MB1; } else { chunk_size = len; } //make map let mut reader:Reader = Reader::with_capacity(1000000, 1000000); //read chunks let mut index = 0; let mut read_buffer:Vec = Vec::with_capacity(chunk_size as usize); loop{ if len == 0{ break; } let start_at = index*chunk_size; let read_len:u64; if len < chunk_size{read_len = len;} else {read_len = chunk_size;} match io::read_chunk(file, &mut read_buffer, start_at, read_len).await{ Ok(_)=>{ reader.map(&mut read_buffer); }, Err(_)=>{ return Err("failed-read_chunk"); } } if len > chunk_size{ len -= chunk_size; } else { len = 0; } index += 1; } match reader.end(){ Ok(_)=>{}, Err(_)=>{} } let mut collect:Vec = Vec::with_capacity(reader.pointers.len()); for key in reader.pointers.keys(){ match u64_from_bytes(&key){ Ok(num)=>{ collect.push(num); }, Err(_)=>{} } } collect.sort(); return Ok((reader,collect)); }