//! This module contains the implementation of a ISO server (TCP) use std::io::{BufReader, Read, Write}; use std::net::{SocketAddr, TcpStream, ToSocketAddrs}; use std::sync::Arc; use std::thread::JoinHandle; use witchcraft_metrics::{Meter, ExponentiallyDecayingReservoir, Histogram}; use hexdump::hexdump_iter; use crate::iso8583::IsoError; use crate::iso8583::iso_spec::{IsoMsg, Spec}; use crate::iso8583::mli::{MLI, MLI2E, MLI2I, MLI4E, MLI4I, MLIType}; /// This struct represents an error associated with server errors pub struct IsoServerError { pub msg: String } /// This struct represents a IsoServer pub struct ISOServer { /// The listen address for this server sock_addr: Vec, pub(crate) mli: Arc>, /// The specification associated with the server pub spec: &'static crate::iso8583::iso_spec::Spec, /// The message processor to be used to handle incoming requests pub(crate) msg_processor: Arc>, txn_rate_metric: Meter, response_time_metric: witchcraft_metrics::Histogram, } /// This trait whose implementation is used by the IsoServer to handle incoming requests pub trait MsgProcessor: Send + Sync { fn process(&self, iso_server: &ISOServer, msg: &mut Vec) -> Result<(Vec, IsoMsg), IsoError>; } impl ISOServer { /// Returns a new ISO server on success or a IsoServer if the provided addr is incorrect pub fn new<'a>(host_port: String, spec: &'static Spec, mli_type: MLIType, msg_processor: Box) -> Result { let mli: Arc>; match mli_type { MLIType::MLI2E => { mli = Arc::new(Box::new(MLI2E {})); } MLIType::MLI2I => { mli = Arc::new(Box::new(MLI2I {})); } MLIType::MLI4E => { mli = Arc::new(Box::new(MLI4E {})); } MLIType::MLI4I => { mli = Arc::new(Box::new(MLI4I {})); } } match host_port.to_socket_addrs() { Ok(addrs) => { let addrs = addrs.as_slice(); //use only ipv4 for now let addrs = addrs.iter().filter(|s| s.is_ipv4()).map(|s| *s).collect::>(); if addrs.len() > 0 { Ok(ISOServer { sock_addr: addrs, spec, mli, msg_processor: Arc::new(msg_processor), txn_rate_metric: Meter::new(), response_time_metric: Histogram::new(ExponentiallyDecayingReservoir::new()), }) } else { Err(IsoServerError { msg: format!("invalid host_port: {} : unresolvable?", &host_port) }) } } Err(e) => Err(IsoServerError { msg: format!("invalid host_port: {}: cause: {}", &host_port, e.to_string()) }) } } // Returns the meter for transaction count metric pub fn txn_rate_metric(&self) -> &Meter { &self.txn_rate_metric } // Returns the histogram metric for response time pub fn response_time_metric(&self) -> &Histogram { &self.response_time_metric } /// Starts the server in a separate thread pub fn start(&self) -> JoinHandle<()> { let server = ISOServer { sock_addr: self.sock_addr.clone(), spec: self.spec, mli: self.mli.clone(), msg_processor: self.msg_processor.clone(), txn_rate_metric: Meter::new(), response_time_metric: Histogram::new(ExponentiallyDecayingReservoir::new()), }; std::thread::spawn(move || { let listener = std::net::TcpListener::bind(server.sock_addr.as_slice()).unwrap(); for stream in listener.incoming() { let client = stream.unwrap(); debug!("Accepted new connection .. {:?}", &client.peer_addr()); new_client(&server, client); } }) } } /// Runs a new thread to handle a new incoming connection fn new_client(iso_server: &ISOServer, stream_: TcpStream) { let server = ISOServer { sock_addr: iso_server.sock_addr.clone(), spec: iso_server.spec, mli: iso_server.mli.clone(), msg_processor: iso_server.msg_processor.clone(), txn_rate_metric: Meter::new(), response_time_metric: Histogram::new(ExponentiallyDecayingReservoir::new()), }; std::thread::spawn(move || { let stream = stream_; let mut reading_mli = true; let mut mli: u32 = 0; let mut reader = BufReader::with_capacity(8192, &stream); let mut writer: Box = Box::new(&stream); let mut t1 = std::time::Instant::now(); 'done: loop { if reading_mli { match server.mli.parse(&mut reader) { Ok(n) => { mli = n; t1 = std::time::Instant::now(); reading_mli = false; } Err(e) => { error!("client socket_err: {} {}", &stream.peer_addr().unwrap().to_string(), e.msg); break 'done; } }; } else { if mli > 0 { let mut data = vec![0; mli as usize]; debug!("reading data for mli {} ", mli); match reader.read_exact(&mut data[..]) { Err(e) => { error!("client socket_err: {} {}", stream.peer_addr().unwrap().to_string(), e.to_string()); break 'done; } _ => (), }; mli = 0; reading_mli = true; debug!("received request: \n{}\n len = {}", get_hexdump(&data), mli); match server.msg_processor.process(&server, &mut data) { Ok(resp) => { debug!("iso_response : {} \n parsed :\n --- {} \n --- \n", get_hexdump(&resp.0), resp.1); match server.mli.create(&(resp.0).len()) { Ok(mut resp_data) => { (&mut resp_data).write_all(resp.0.as_slice()).unwrap(); writer.write_all(resp_data.as_slice()).unwrap(); writer.flush().unwrap(); debug!("request processing time = {} millis", std::time::Instant::now().duration_since(t1).as_millis()); } Err(e) => { error!("failed to construct mli {}", e.msg) } } } Err(e) => { error!("failed to handle incoming req - {}", e.msg) } } } } } }); } pub(in crate::iso8583) fn get_hexdump(data: &Vec) -> String { let mut hexdmp = String::new(); hexdmp.push_str("\n"); hexdump_iter(data).for_each(|f| { hexdmp.push_str(f.as_ref()); hexdmp.push_str("\n"); }); hexdmp }