#[macro_use] extern crate log; extern crate clap; extern crate pretty_env_logger; extern crate rayon; extern crate cxmr_candles; extern crate cxmr_feeds; extern crate cxmr_snapshots; extern crate cxmr_tectonic; use std::time::Instant; use clap::{App, Arg}; use rayon::prelude::*; use cxmr_candles::round_open; use cxmr_feeds::EventData; use cxmr_snapshots::{Snapshot, SnapshotCollector}; use cxmr_tectonic::{create_dtf, glob_dtf, read_dtf}; use itertools::Itertools; fn main() { pretty_env_logger::init(); let matches = App::new("cxmr-snapshot") .arg( Arg::with_name("interval") .short("i") .long("interval") .help("Snapshot interval in seconds") .takes_value(true) .required(true), ) .arg( Arg::with_name("directory") .short("d") .long("directory") .help("Snapshot directory") .takes_value(true), ) .arg( Arg::with_name("file") .short("f") .long("file") .help("Snapshot file") .takes_value(true), ) .get_matches(); let interval = matches .value_of("interval") .unwrap() .parse::() .unwrap() * 1000; let dirs = match matches.value_of("directory") { Some(path) => glob_dtf(path), None => vec![matches.value_of("file").unwrap().into()], }; let total = Instant::now(); // dirs.into_par_iter().for_each(|path| { dirs.into_iter().for_each(|path| { let feed = read_dtf(&path).unwrap(); let now = Instant::now(); let snapshot = SnapshotCollector::new(interval) .consume(feed.events.as_ref().unwrap()) .unwrap(); let hours = snapshot.duration().unwrap() / 1000 / 60 / 60; info!( "Total size: {} time: {:.2} days processing time: {:?}", snapshot.len(), hours / 24, now.elapsed() ); let now = Instant::now(); snapshot .states .into_iter() .group_by(|state| round_open(state.time() / 1000, 60 * 60)) .into_iter() .map(|(key, group)| (key, group.collect::>())) .collect::>() .into_par_iter() .for_each(|(key, states)| { let day = round_open(key, 24 * 60 * 60); let dir_path = std::path::Path::new(&path) .parent() .unwrap() .join(feed.metadata.market.as_str()) .join(&format!("{}", day)); std::fs::create_dir_all(&dir_path).unwrap(); let fname = format!("{}.dtf", key); let path = std::path::Path::new(&dir_path).join(&fname); let snapshot = Snapshot { market: feed.metadata.market.clone().into(), start_time: states.first().map(|s| s.time()), interval, states, }; let events: Vec = (&snapshot).into(); create_dtf(&path, feed.metadata.market.as_str(), &(events)).unwrap(); }); info!("Writing time: {:?}", now.elapsed()); }); info!("Total writing time: {:?}", total.elapsed()); }