mod common; use bytes::Bytes; use common::*; use futures::future::join_all; use http::StatusCode; use interledger_api::NodeStore; use interledger_service::{Account, AccountStore}; use interledger_settlement::core::{ idempotency::{IdempotentData, IdempotentStore}, types::{LeftoversStore, SettlementAccount, SettlementStore}, }; use interledger_store_redis::AccountId; use lazy_static::lazy_static; use num_bigint::BigUint; use redis::{aio::SharedConnection, cmd}; use url::Url; lazy_static! { static ref IDEMPOTENCY_KEY: String = String::from("AJKJNUjM0oyiAN46"); } #[test] fn saves_and_gets_uncredited_settlement_amount_properly() { block_on(test_store().and_then(|(store, context, _accs)| { let amounts = vec![ (BigUint::from(5u32), 11), // 5 (BigUint::from(855u32), 12), // 905 (BigUint::from(1u32), 10), // 1005 total ]; let acc = AccountId::new(); let mut f = Vec::new(); for a in amounts { let s = store.clone(); f.push(s.save_uncredited_settlement_amount(acc, a)); } join_all(f) .map_err(|err| eprintln!("Redis error: {:?}", err)) .and_then(move |_| { store .load_uncredited_settlement_amount(acc, 9) .map_err(|err| eprintln!("Redis error: {:?}", err)) .and_then(move |ret| { // 1 uncredited unit for scale 9 assert_eq!(ret, BigUint::from(1u32)); // rest should be in the leftovers store store .get_uncredited_settlement_amount(acc) .map_err(|err| eprintln!("Redis error: {:?}", err)) .and_then(move |ret| { // 1 uncredited unit for scale 9 assert_eq!(ret, (BigUint::from(5u32), 12)); let _ = context; Ok(()) }) }) }) })) .unwrap() } #[test] fn clears_uncredited_settlement_amount_properly() { block_on(test_store().and_then(|(store, context, _accs)| { let amounts = vec![ (BigUint::from(5u32), 11), // 5 (BigUint::from(855u32), 12), // 905 (BigUint::from(1u32), 10), // 1005 total ]; let acc = AccountId::new(); let mut f = Vec::new(); for a in amounts { let s = store.clone(); f.push(s.save_uncredited_settlement_amount(acc, a)); } join_all(f) .map_err(|err| eprintln!("Redis error: {:?}", err)) .and_then(move |_| { store .clear_uncredited_settlement_amount(acc) .map_err(|err| eprintln!("Redis error: {:?}", err)) .and_then(move |_| { store .get_uncredited_settlement_amount(acc) .map_err(|err| eprintln!("Redis error: {:?}", err)) .and_then(move |amount| { assert_eq!(amount, (BigUint::from(0u32), 0)); let _ = context; Ok(()) }) }) }) })) .unwrap() } #[test] fn credits_prepaid_amount() { block_on(test_store().and_then(|(store, context, accs)| { let id = accs[0].id(); context.async_connection().and_then(move |conn| { store .update_balance_for_incoming_settlement(id, 100, Some(IDEMPOTENCY_KEY.clone())) .and_then(move |_| { cmd("HMGET") .arg(format!("accounts:{}", id)) .arg("balance") .arg("prepaid_amount") .query_async(conn) .map_err(|err| eprintln!("Redis error: {:?}", err)) .and_then(move |(_conn, (balance, prepaid_amount)): (_, (i64, i64))| { assert_eq!(balance, 0); assert_eq!(prepaid_amount, 100); let _ = context; Ok(()) }) }) }) })) .unwrap() } #[test] fn saves_and_loads_idempotency_key_data_properly() { block_on(test_store().and_then(|(store, context, _accs)| { let input_hash: [u8; 32] = Default::default(); store .save_idempotent_data( IDEMPOTENCY_KEY.clone(), input_hash, StatusCode::OK, Bytes::from("TEST"), ) .map_err(|err| eprintln!("Redis error: {:?}", err)) .and_then(move |_| { store .load_idempotent_data(IDEMPOTENCY_KEY.clone()) .map_err(|err| eprintln!("Redis error: {:?}", err)) .and_then(move |data1| { assert_eq!( data1.unwrap(), IdempotentData::new(StatusCode::OK, Bytes::from("TEST"), input_hash) ); let _ = context; store .load_idempotent_data("asdf".to_string()) .map_err(|err| eprintln!("Redis error: {:?}", err)) .and_then(move |data2| { assert!(data2.is_none()); let _ = context; Ok(()) }) }) }) })) .unwrap(); } #[test] fn idempotent_settlement_calls() { block_on(test_store().and_then(|(store, context, accs)| { let id = accs[0].id(); context.async_connection().and_then(move |conn| { store .update_balance_for_incoming_settlement(id, 100, Some(IDEMPOTENCY_KEY.clone())) .and_then(move |_| { cmd("HMGET") .arg(format!("accounts:{}", id)) .arg("balance") .arg("prepaid_amount") .query_async(conn) .map_err(|err| eprintln!("Redis error: {:?}", err)) .and_then(move |(conn, (balance, prepaid_amount)): (_, (i64, i64))| { assert_eq!(balance, 0); assert_eq!(prepaid_amount, 100); store .update_balance_for_incoming_settlement( id, 100, Some(IDEMPOTENCY_KEY.clone()), // Reuse key to make idempotent request. ) .and_then(move |_| { cmd("HMGET") .arg(format!("accounts:{}", id)) .arg("balance") .arg("prepaid_amount") .query_async(conn) .map_err(|err| eprintln!("Redis error: {:?}", err)) .and_then( move |(_conn, (balance, prepaid_amount)): ( _, (i64, i64), )| { // Since it's idempotent there // will be no state update. // Otherwise it'd be 200 (100 + 100) assert_eq!(balance, 0); assert_eq!(prepaid_amount, 100); let _ = context; Ok(()) }, ) }) }) }) }) })) .unwrap() } #[test] fn credits_balance_owed() { block_on(test_store().and_then(|(store, context, accs)| { let id = accs[0].id(); context .shared_async_connection() .map_err(|err| panic!(err)) .and_then(move |conn| { cmd("HSET") .arg(format!("accounts:{}", id)) .arg("balance") .arg(-200) .query_async(conn) .map_err(|err| panic!(err)) .and_then(move |(conn, _balance): (SharedConnection, i64)| { store .update_balance_for_incoming_settlement( id, 100, Some(IDEMPOTENCY_KEY.clone()), ) .and_then(move |_| { cmd("HMGET") .arg(format!("accounts:{}", id)) .arg("balance") .arg("prepaid_amount") .query_async(conn) .map_err(|err| panic!(err)) .and_then( move |(_conn, (balance, prepaid_amount)): ( _, (i64, i64), )| { assert_eq!(balance, -100); assert_eq!(prepaid_amount, 0); let _ = context; Ok(()) }, ) }) }) }) })) .unwrap() } #[test] fn clears_balance_owed() { block_on(test_store().and_then(|(store, context, accs)| { let id = accs[0].id(); context .shared_async_connection() .map_err(|err| panic!(err)) .and_then(move |conn| { cmd("HSET") .arg(format!("accounts:{}", id)) .arg("balance") .arg(-100) .query_async(conn) .map_err(|err| panic!(err)) .and_then(move |(conn, _balance): (SharedConnection, i64)| { store .update_balance_for_incoming_settlement( id, 100, Some(IDEMPOTENCY_KEY.clone()), ) .and_then(move |_| { cmd("HMGET") .arg(format!("accounts:{}", id)) .arg("balance") .arg("prepaid_amount") .query_async(conn) .map_err(|err| panic!(err)) .and_then( move |(_conn, (balance, prepaid_amount)): ( _, (i64, i64), )| { assert_eq!(balance, 0); assert_eq!(prepaid_amount, 0); let _ = context; Ok(()) }, ) }) }) }) })) .unwrap() } #[test] fn clears_balance_owed_and_puts_remainder_as_prepaid() { block_on(test_store().and_then(|(store, context, accs)| { let id = accs[0].id(); context .shared_async_connection() .map_err(|err| panic!(err)) .and_then(move |conn| { cmd("HSET") .arg(format!("accounts:{}", id)) .arg("balance") .arg(-40) .query_async(conn) .map_err(|err| panic!(err)) .and_then(move |(conn, _balance): (SharedConnection, i64)| { store .update_balance_for_incoming_settlement( id, 100, Some(IDEMPOTENCY_KEY.clone()), ) .and_then(move |_| { cmd("HMGET") .arg(format!("accounts:{}", id)) .arg("balance") .arg("prepaid_amount") .query_async(conn) .map_err(|err| panic!(err)) .and_then( move |(_conn, (balance, prepaid_amount)): ( _, (i64, i64), )| { assert_eq!(balance, 0); assert_eq!(prepaid_amount, 60); let _ = context; Ok(()) }, ) }) }) }) })) .unwrap() } #[test] fn loads_globally_configured_settlement_engine_url() { block_on(test_store().and_then(|(store, context, accs)| { assert!(accs[0].settlement_engine_details().is_some()); assert!(accs[1].settlement_engine_details().is_none()); let account_ids = vec![accs[0].id(), accs[1].id()]; store .clone() .get_accounts(account_ids.clone()) .and_then(move |accounts| { assert!(accounts[0].settlement_engine_details().is_some()); assert!(accounts[1].settlement_engine_details().is_none()); store .clone() .set_settlement_engines(vec![ ( "ABC".to_string(), Url::parse("http://settle-abc.example").unwrap(), ), ( "XYZ".to_string(), Url::parse("http://settle-xyz.example").unwrap(), ), ]) .and_then(move |_| { store.get_accounts(account_ids).and_then(move |accounts| { // It should not overwrite the one that was individually configured assert_eq!( accounts[0] .settlement_engine_details() .unwrap() .url .as_str(), "http://settlement.example/" ); // It should set the URL for the account that did not have one configured assert!(accounts[1].settlement_engine_details().is_some()); assert_eq!( accounts[1] .settlement_engine_details() .unwrap() .url .as_str(), "http://settle-abc.example/" ); let _ = context; Ok(()) }) }) // store.set_settlement_engines }) })) .unwrap() }