// Copyright (C) 2023 Entropy Cryptography Inc. // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . //! A simple protocol server, like a mini version of entropy-tss, for benchmarking use anyhow::{anyhow, ensure}; use entropy_protocol::{ execute_protocol::{execute_dkg, execute_reshare, execute_signing_protocol, Channels}, protocol_transport::{ errors::WsError, noise::{noise_handshake_initiator, noise_handshake_responder}, ws_to_channels, SubscribeMessage, WsChannels, }, KeyParams, KeyShareWithAuxInfo, Listener, PartyId, RecoverableSignature, SessionId, ValidatorInfo, }; use entropy_shared::X25519PublicKey; use futures::future; use sp_core::{sr25519, Pair}; use std::{ collections::BTreeSet, fmt, sync::{Arc, Mutex}, time::Duration, }; use subxt::utils::AccountId32; use synedrion::{AuxInfo, KeyResharingInputs, KeyShare, NewHolder, OldHolder, ThresholdKeyShare}; use tokio::{ net::{TcpListener, TcpStream}, time::timeout, }; use tokio_tungstenite::connect_async; use x25519_dalek::StaticSecret; /// Internal state used by test server #[derive(Clone)] struct ServerState { x25519_secret_key: StaticSecret, listener: Arc>>, } /// Output of a successful protocol run #[derive(Clone)] pub enum ProtocolOutput { Sign(RecoverableSignature), Reshare(ThresholdKeyShare), Dkg(KeyShareWithAuxInfo), } impl fmt::Debug for ProtocolOutput { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Success") } } /// A websocket server handling a single test protocol session pub async fn server( socket: TcpListener, validators_info: Vec, pair: sr25519::Pair, x25519_secret_key: StaticSecret, session_id: SessionId, keyshare: Option>, threshold_keyshare: Option>, aux_info: Option>, threshold: usize, ) -> anyhow::Result { let account_id = AccountId32(pair.public().0); // Setup a single listener for tracking connnections to the other parties let (rx_ready, rx_from_others, listener) = Listener::new(validators_info.clone(), &account_id); let state = ServerState { listener: Arc::new(Mutex::new(vec![listener])), x25519_secret_key: x25519_secret_key.clone(), }; // Handle each incoming connection in a separate task let state_clone = state.clone(); tokio::spawn(async move { while let Ok((stream, _address)) = socket.accept().await { let state_clone2 = state_clone.clone(); tokio::spawn(async move { if let Err(e) = handle_connection(state_clone2, stream).await { tracing::warn!("Error when handling ws connection {}", e); }; }); } }); // Make outgoing connections open_protocol_connections(&validators_info, &session_id, &pair, &x25519_secret_key, &state) .await?; // Wait for other parties to connect let channels = { let ready = timeout(Duration::from_secs(10), rx_ready).await?; let broadcast_out = ready??; Channels(broadcast_out, rx_from_others) }; let tss_accounts: Vec = validators_info.iter().map(|validator_info| validator_info.tss_account.clone()).collect(); match session_id.clone() { SessionId::Sign(session_info) => { let rsig = execute_signing_protocol( session_id, channels, &keyshare.unwrap(), &aux_info.unwrap(), &session_info.message_hash, &pair, tss_accounts, ) .await?; let (signature, recovery_id) = rsig.to_backend(); Ok(ProtocolOutput::Sign(RecoverableSignature { signature, recovery_id })) }, SessionId::Reshare { .. } => { let old_key = threshold_keyshare.unwrap(); let party_ids: BTreeSet = tss_accounts.iter().cloned().map(PartyId::new).collect(); let inputs = KeyResharingInputs { old_holder: Some(OldHolder { key_share: old_key.clone() }), new_holder: Some(NewHolder { verifying_key: old_key.verifying_key(), old_threshold: party_ids.len(), old_holders: party_ids.clone(), }), new_holders: party_ids.clone(), new_threshold: old_key.threshold(), }; let new_keyshare = execute_reshare(session_id, channels, &pair, inputs, None).await?; Ok(ProtocolOutput::Reshare(new_keyshare.0)) }, SessionId::Dkg { .. } => { let keyshare_and_aux_info = execute_dkg(session_id, channels, &pair, tss_accounts, threshold).await?; Ok(ProtocolOutput::Dkg(keyshare_and_aux_info)) }, } } /// Handle an incoming websocket connection async fn handle_connection(state: ServerState, raw_stream: TcpStream) -> anyhow::Result<()> { let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?; let (mut encrypted_connection, serialized_signed_message) = noise_handshake_responder(ws_stream, &state.x25519_secret_key).await?; let remote_public_key = encrypted_connection.remote_public_key()?; let (subscribe_response, ws_channels_option) = match handle_initial_incoming_ws_message( serialized_signed_message, remote_public_key, state, ) .await { Ok((ws_channels, party_id)) => (Ok(()), Some((ws_channels, party_id))), Err(err) => (Err(format!("{err:?}")), None), }; // Send them a response as to whether we are happy with their subscribe message let subscribe_response_vec = bincode::serialize(&subscribe_response)?; encrypted_connection.send(subscribe_response_vec).await?; // If it was successful, proceed with relaying signing protocol messages let (ws_channels, remote_party_id) = ws_channels_option.ok_or(WsError::BadSubscribeMessage)?; ws_to_channels(encrypted_connection, ws_channels, remote_party_id).await?; Ok(()) } /// Handle a subscribe message (first message sent by the initiator of the connection) async fn handle_initial_incoming_ws_message( serialized_subscribe_message: Vec, _remote_public_key: X25519PublicKey, state: ServerState, ) -> anyhow::Result<(WsChannels, PartyId)> { let msg: SubscribeMessage = bincode::deserialize(&serialized_subscribe_message)?; tracing::info!("Got ws connection, with subscribe message: {msg:?}"); ensure!(msg.verify()?, "Invalid signature"); let ws_channels = get_ws_channels(&msg.session_id, &msg.account_id(), &state)?; Ok((ws_channels, PartyId::new(msg.account_id()))) } /// Inform the listener we have made a ws connection to another signing party, and get channels to /// the signing protocol fn get_ws_channels( _session_id: &SessionId, tss_account: &AccountId32, state: &ServerState, ) -> anyhow::Result { let mut listeners = state.listener.lock().unwrap(); let listener = listeners.get_mut(0).ok_or(anyhow::anyhow!("No listener"))?; let ws_channels = listener.subscribe(tss_account)?; if ws_channels.is_final { let listener = listeners.pop().ok_or(anyhow::anyhow!("No listener"))?; // all subscribed, wake up the waiting listener to execute the protocol let (tx, broadcaster) = listener.into_broadcaster(); let _ = tx.send(Ok(broadcaster)); }; Ok(ws_channels) } /// Set up outgoing websocket connections to other parties async fn open_protocol_connections( validators_info: &[ValidatorInfo], session_id: &SessionId, signer: &sr25519::Pair, x25519_secret_key: &x25519_dalek::StaticSecret, state: &ServerState, ) -> anyhow::Result<()> { let connect_to_validators = validators_info .iter() .filter(|validator_info| { // Decide whether to initiate a connection by comparing account IDs // otherwise, we wait for them to connect to us signer.public().0 > validator_info.tss_account.0 }) .map(|validator_info| async move { // Open a ws connection let ws_endpoint = format!("ws://{}/ws", validator_info.ip_address); let (ws_stream, _response) = connect_async(ws_endpoint).await?; // Send a SubscribeMessage in the payload of the final handshake message let subscribe_message_vec = bincode::serialize(&SubscribeMessage::new(session_id.clone(), signer)?)?; let mut encrypted_connection = noise_handshake_initiator( ws_stream, x25519_secret_key, validator_info.x25519_public_key, subscribe_message_vec, ) .await?; // Check the response as to whether they accepted our SubscribeMessage let response_message = encrypted_connection.recv().await?; let subscribe_response: Result<(), String> = bincode::deserialize(&response_message)?; if let Err(error_message) = subscribe_response { return Err(anyhow!(error_message)); } // Setup channels let ws_channels = get_ws_channels(session_id, &validator_info.tss_account, state)?; let remote_party_id = PartyId::new(validator_info.tss_account.clone()); // Handle protocol messages tokio::spawn(async move { if let Err(err) = ws_to_channels(encrypted_connection, ws_channels, remote_party_id).await { tracing::warn!("{:?}", err); }; }); Ok(()) }) .collect::>(); future::try_join_all(connect_to_validators).await?; Ok(()) }