use futures::{future::join_all, stream::*, sync::mpsc, Future}; use ilp_node::InterledgerNode; use serde_json::json; use tokio::runtime::Builder as RuntimeBuilder; use tracing::{debug, error_span}; use tracing_futures::Instrument; mod redis_helpers; use redis_helpers::*; mod test_helpers; use interledger::packet::Address; use interledger::stream::StreamDelivery; use std::str::FromStr; use test_helpers::*; const LOG_TARGET: &str = "interledger-tests-three-nodes"; #[test] fn three_nodes() { // Nodes 1 and 2 are peers, Node 2 is the parent of Node 3 install_tracing_subscriber(); let context = TestContext::new(); // Each node will use its own DB within the redis instance let mut connection_info1 = context.get_client_connection_info(); connection_info1.db = 1; let mut connection_info2 = context.get_client_connection_info(); connection_info2.db = 2; let mut connection_info3 = context.get_client_connection_info(); connection_info3.db = 3; let node1_http = get_open_port(Some(3010)); let node1_settlement = get_open_port(Some(3011)); let node2_http = get_open_port(Some(3020)); let node2_settlement = get_open_port(Some(3021)); let node3_http = get_open_port(Some(3030)); let node3_settlement = get_open_port(Some(3031)); let mut runtime = RuntimeBuilder::new() .panic_handler(|err| std::panic::resume_unwind(err)) .build() .unwrap(); let alice_on_alice = json!({ "ilp_address": "example.alice", "username": "alice_on_a", "asset_code": "XYZ", "asset_scale": 9, "ilp_over_http_incoming_token" : "default account holder", }); let bob_on_alice = json!({ "ilp_address": "example.bob", "username": "bob_on_a", "asset_code": "XYZ", "asset_scale": 9, "ilp_over_http_url": format!("http://localhost:{}/ilp", node2_http), "ilp_over_http_incoming_token" : "two", "ilp_over_http_outgoing_token" : "alice_on_b:one", "min_balance": -1_000_000_000, "routing_relation": "Peer", }); let alice_on_bob = json!({ "ilp_address": "example.alice", "username": "alice_on_b", "asset_code": "XYZ", "asset_scale": 9, "ilp_over_http_url": format!("http://localhost:{}/ilp", node1_http), "ilp_over_http_incoming_token" : "one", "ilp_over_http_outgoing_token" : "bob_on_a:two", "routing_relation": "Peer", }); let charlie_on_bob = json!({ "username": "charlie_on_b", "asset_code": "ABC", "asset_scale": 6, "ilp_over_btp_incoming_token" : "three", "ilp_over_http_incoming_token" : "three", "min_balance": -1_000_000_000, "routing_relation": "Child", }); let charlie_on_charlie = json!({ "username": "charlie_on_c", "asset_code": "ABC", "asset_scale": 6, "ilp_over_http_incoming_token" : "default account holder", }); let bob_on_charlie = json!({ "ilp_address": "example.bob", "username": "bob_on_c", "asset_code": "ABC", "asset_scale": 6, "ilp_over_http_incoming_token" : "two", "ilp_over_http_outgoing_token": "charlie_on_b:three", "ilp_over_http_url": format!("http://localhost:{}/ilp", node2_http), "ilp_over_btp_url": format!("btp+ws://localhost:{}/ilp/btp", node2_http), "ilp_over_btp_outgoing_token": "charlie_on_b:three", "min_balance": -1_000_000_000, "routing_relation": "Parent", }); let node1: InterledgerNode = serde_json::from_value(json!({ "ilp_address": "example.alice", "default_spsp_account": "alice_on_a", "admin_auth_token": "admin", "redis_connection": connection_info_to_string(connection_info1), "http_bind_address": format!("127.0.0.1:{}", node1_http), "settlement_api_bind_address": format!("127.0.0.1:{}", node1_settlement), "secret_seed": random_secret(), "route_broadcast_interval": Some(200), "exchange_rate_poll_interval": 60000, })) .expect("Error creating node1."); let node2: InterledgerNode = serde_json::from_value(json!({ "ilp_address": "example.bob", "admin_auth_token": "admin", "redis_connection": connection_info_to_string(connection_info2), "http_bind_address": format!("127.0.0.1:{}", node2_http), "settlement_api_bind_address": format!("127.0.0.1:{}", node2_settlement), "secret_seed": random_secret(), "route_broadcast_interval": Some(200), "exchange_rate_poll_interval": 60000, })) .expect("Error creating node2."); let node3: InterledgerNode = serde_json::from_value(json!({ "default_spsp_account": "charlie_on_c", "admin_auth_token": "admin", "redis_connection": connection_info_to_string(connection_info3), "http_bind_address": format!("127.0.0.1:{}", node3_http), "settlement_api_bind_address": format!("127.0.0.1:{}", node3_settlement), "secret_seed": random_secret(), "route_broadcast_interval": Some(200), "exchange_rate_poll_interval": 60000, })) .expect("Error creating node3."); let (finish_sender, finish_receiver) = mpsc::channel(0); let alice_fut = join_all(vec![ create_account_on_node(node1_http, alice_on_alice, "admin"), create_account_on_node(node1_http, bob_on_alice, "admin"), ]); let mut node1_finish_sender = finish_sender.clone(); runtime.spawn( node1 .serve() .and_then(move |_| alice_fut) .and_then(move |_| { node1_finish_sender .try_send(1) .expect("Could not send message from node_1"); Ok(()) }) .instrument(error_span!(target: "interledger", "node1")), ); let bob_fut = join_all(vec![ create_account_on_node(node2_http, alice_on_bob, "admin"), create_account_on_node(node2_http, charlie_on_bob, "admin"), ]); let mut node2_finish_sender = finish_sender; runtime.spawn( node2 .serve() .and_then(move |_| bob_fut) .and_then(move |_| { let client = reqwest::r#async::Client::new(); client .put(&format!("http://localhost:{}/rates", node2_http)) .header("Authorization", "Bearer admin") .json(&json!({"ABC": 1, "XYZ": 2})) .send() .map_err(|err| panic!(err)) .and_then(move |res| { res.error_for_status() .expect("Error setting exchange rates"); node2_finish_sender .try_send(2) .expect("Could not send message from node_2"); Ok(()) }) }) .instrument(error_span!(target: "interledger", "node2")), ); // We execute the futures one after the other to avoid race conditions where // Bob gets added before the node's main account let charlie_fut = create_account_on_node(node3_http, charlie_on_charlie, "admin") .and_then(move |_| create_account_on_node(node3_http, bob_on_charlie, "admin")); runtime .block_on( node3 .serve() .and_then(move |_| finish_receiver.collect()) .and_then(move |messages| { debug!( target: LOG_TARGET, "Received finish messages: {:?}", messages ); charlie_fut }) .instrument(error_span!(target: "interledger", "node3")) // we wait some time after the node is up so that we get the // necessary routes from bob .and_then(move |_| { delay(1000).map_err(|_| panic!("Something strange happened when `delay`")) }) .and_then(move |_| { let send_1_to_3 = send_money_to_username( node1_http, node3_http, 1000, "charlie_on_c", "alice_on_a", "default account holder", ); let send_3_to_1 = send_money_to_username( node3_http, node1_http, 1000, "alice_on_a", "charlie_on_c", "default account holder", ); let get_balances = move || { futures::future::join_all(vec![ get_balance("alice_on_a", node1_http, "admin"), get_balance("charlie_on_b", node2_http, "admin"), get_balance("charlie_on_c", node3_http, "admin"), ]) }; // Node 1 sends 1000 to Node 3. However, Node1's scale is 9, // while Node 3's scale is 6. This means that Node 3 will // see 1000x less. In addition, the conversion rate is 2:1 // for 3's asset, so he will receive 2 total. send_1_to_3 .map_err(|err| { eprintln!("Error sending from node 1 to node 3: {:?}", err); err }) .and_then(move |receipt: StreamDelivery| { debug!(target: LOG_TARGET, "send_1_to_3 receipt: {:?}", receipt); assert_eq!( receipt.from, Address::from_str("example.alice").unwrap(), "Payment receipt incorrect (1)" ); assert!(receipt .to .to_string() .starts_with("example.bob.charlie_on_b.charlie_on_c.")); assert_eq!(receipt.sent_asset_code, "XYZ"); assert_eq!(receipt.sent_asset_scale, 9); assert_eq!(receipt.sent_amount, 1000); assert_eq!(receipt.delivered_asset_code.unwrap(), "ABC"); assert_eq!(receipt.delivered_amount, 2); assert_eq!(receipt.delivered_asset_scale.unwrap(), 6); get_balances().and_then(move |ret| { assert_eq!(ret[0], -1000); assert_eq!(ret[1], 2); assert_eq!(ret[2], 2); Ok(()) }) }) .and_then(move |_| { send_3_to_1.map_err(|err| { eprintln!("Error sending from node 3 to node 1: {:?}", err); err }) }) .and_then(move |receipt| { debug!(target: LOG_TARGET, "send_3_to_1 receipt: {:?}", receipt); assert_eq!( receipt.from, Address::from_str("example.bob.charlie_on_b.charlie_on_c").unwrap(), "Payment receipt incorrect (2)" ); assert!(receipt.to.to_string().starts_with("example.alice")); assert_eq!(receipt.sent_asset_code, "ABC"); assert_eq!(receipt.sent_asset_scale, 6); assert_eq!(receipt.sent_amount, 1000); assert_eq!(receipt.delivered_asset_code.unwrap(), "XYZ"); assert_eq!(receipt.delivered_amount, 500_000); assert_eq!(receipt.delivered_asset_scale.unwrap(), 9); get_balances().and_then(move |ret| { assert_eq!(ret[0], 499_000); assert_eq!(ret[1], -998); assert_eq!(ret[2], -998); Ok(()) }) }) }), ) .map_err(|err| { eprintln!("Error executing tests: {:?}", err); err }) .unwrap(); }