use std::collections::HashSet; use std::iter::FromIterator; use std::sync::mpsc::channel; use std::time::Duration; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Operator; use declarative_dataflow::binding::Binding; use declarative_dataflow::plan::Implementable; use declarative_dataflow::server::Server; use declarative_dataflow::{q, AttributeConfig, InputSemantics, Plan, Rule, TxData}; use declarative_dataflow::{Aid, Value}; use Value::{Eid, Number, String}; struct Case { description: &'static str, plan: Plan, transactions: Vec>, expectations: Vec, u64, isize)>>, } fn dependencies(case: &Case) -> HashSet { let mut deps = HashSet::new(); for binding in case.plan.into_bindings().iter() { if let Binding::Attribute(binding) = binding { deps.insert(binding.source_attribute.clone()); } } deps } fn run_cases(mut cases: Vec) { for case in cases.drain(..) { timely::execute_directly(move |worker| { let mut server = Server::::new(Default::default()); let (send_results, results) = channel(); dbg!(case.description); let mut deps = dependencies(&case); let plan = case.plan.clone(); for tx in case.transactions.iter() { for datum in tx { deps.insert(datum.2.clone()); } } worker.dataflow::(|scope| { for dep in deps.iter() { server .context .internal .create_transactable_attribute( dep, AttributeConfig::tx_time(InputSemantics::Raw), scope, ) .unwrap(); } server .test_single( scope, Rule { name: "query".to_string(), plan, }, ) .inner .sink(Pipeline, "Results", move |input| { input.for_each(|_time, data| { for datum in data.iter() { send_results.send(datum.clone()).unwrap() } }); }); }); let mut transactions = case.transactions.clone(); let mut next_tx = 0; for (tx_id, tx_data) in transactions.drain(..).enumerate() { next_tx += 1; server.transact(tx_data, 0, 0).unwrap(); server.advance_domain(None, next_tx).unwrap(); worker.step_while(|| server.is_any_outdated()); let mut expected: HashSet<(Vec, u64, isize)> = HashSet::from_iter(case.expectations[tx_id].iter().cloned()); for _i in 0..expected.len() { match results.recv_timeout(Duration::from_millis(400)) { Err(_err) => { panic!("No result."); } Ok(result) => { if !expected.remove(&result) { panic!("Unknown result {:?}.", result); } } } } match results.recv_timeout(Duration::from_millis(400)) { Err(_err) => {} Ok(result) => { panic!("Extraneous result {:?}", result); } } } }); } } #[test] fn not() { env_logger::init(); let data = vec![ TxData::add(1, ":name", String("Ivan".to_string())), TxData::add(1, ":age", Number(10)), TxData::add(2, ":name", String("Ivan".to_string())), TxData::add(2, ":age", Number(20)), TxData::add(3, ":name", String("Oleg".to_string())), TxData::add(3, ":age", Number(10)), TxData::add(4, ":name", String("Oleg".to_string())), TxData::add(4, ":age", Number(20)), TxData::add(5, ":name", String("Ivan".to_string())), TxData::add(5, ":age", Number(10)), TxData::add(6, ":name", String("Ivan".to_string())), TxData::add(6, ":age", Number(20)), ]; run_cases(vec![ Case { description: "[:find ?e :where [?e :name] (not [?e :name Ivan])]", plan: q( vec![0], vec![ Binding::attribute(0, ":name", 1), Binding::not(Binding::attribute(0, ":name", 2)), Binding::not(Binding::constant(2, String("Ivan".to_string()))), ], ), transactions: vec![data.clone()], expectations: vec![vec![(vec![Eid(3)], 0, 1), (vec![Eid(4)], 0, 1)]], }, Case { description: "[:find ?e :where [?e :name ?n] (not [?e :name Ivan] [?e :age 10])]", plan: q( vec![0], vec![ Binding::attribute(0, ":name", 1), Binding::not(Binding::constant(1, String("Ivan".to_string()))), ], ), transactions: vec![data.clone()], expectations: vec![vec![(vec![Eid(3)], 0, 1), (vec![Eid(4)], 0, 1)]], }, ]); }