use crate::eresult::EResult; use crate::message::NetMessage; use crate::proto::steammessages_base::CMsgProtoBufHeader; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use bytes::{Buf, BufMut, BytesMut}; use protobuf::{Enum, Message}; use std::borrow::Cow; use std::fmt::Debug; use std::io::{Cursor, Seek, SeekFrom}; use steam_vent_crypto::CryptError; use steam_vent_proto::enums_clientserver::EMsg; use steamid_ng::SteamID; use thiserror::Error; use tracing::{debug, trace}; pub const PROTO_MASK: u32 = 0x80000000; #[derive(Debug, Error)] pub enum NetworkError { #[error("{0}")] IO(#[from] std::io::Error), #[error("{0}")] Ws(#[from] tokio_tungstenite::tungstenite::Error), #[error("Invalid message header")] InvalidHeader, #[error("Invalid message kind {0}")] InvalidMessageKind(i32), #[error("Failed to perform crypto handshake")] CryptoHandshakeFailed, #[error("Different message expected, expected {0:?}, got {1:?}")] DifferentMessage(EMsg, EMsg), #[error("Different service method expected, expected {0:?}, got {1:?}")] DifferentServiceMethod(&'static str, String), #[error("{0}")] MalformedBody(#[from] crate::message::MalformedBody), #[error("Crypto error: {0}")] CryptoError(#[from] CryptError), #[error("Unexpected end of stream")] EOF, #[error("Response timed out")] Timeout, #[error("Remote returned an error code: {0:?}")] ApiError(EResult), } impl From for NetworkError { fn from(value: EResult) -> Self { NetworkError::ApiError(value) } } pub type Result = std::result::Result; #[derive(Debug, Default, Clone)] pub struct NetMessageHeader { pub source_job_id: u64, pub target_job_id: u64, pub steam_id: SteamID, pub session_id: i32, pub target_job_name: Option>, pub result: Option, } impl From for NetMessageHeader { fn from(header: CMsgProtoBufHeader) -> Self { NetMessageHeader { source_job_id: header.jobid_source(), target_job_id: header.jobid_target(), steam_id: header.steamid().into(), session_id: header.client_sessionid(), target_job_name: header .has_target_job_name() .then(|| header.target_job_name().to_string().into()), result: header.eresult, } } } impl NetMessageHeader { fn read( mut reader: R, kind: EMsg, is_protobuf: bool, ) -> Result<(Self, usize)> { if is_protobuf { let header_length = reader.read_u32::()?; trace!("reading protobuf header of {} bytes", header_length); let header = if header_length > 0 { let mut bytes = vec![0; header_length as usize]; let num = reader.read(&mut bytes)?; CMsgProtoBufHeader::parse_from_bytes(&bytes[0..num]) .map_err(|_| NetworkError::InvalidHeader)? .into() } else { NetMessageHeader::default() }; Ok((header, 8 + header_length as usize)) } else if kind == EMsg::k_EMsgChannelEncryptRequest || kind == EMsg::k_EMsgChannelEncryptResult { let target_job_id = reader.read_u64::()?; let source_job_id = reader.read_u64::()?; Ok(( NetMessageHeader { target_job_id, source_job_id, session_id: 0, steam_id: SteamID::default(), ..NetMessageHeader::default() }, 4 + 8 + 8, )) } else { reader.seek(SeekFrom::Current(3))?; // 1 byte (fixed) header size, 2 bytes (fixed) header version let target_job_id = reader.read_u64::()?; let source_job_id = reader.read_u64::()?; reader.seek(SeekFrom::Current(1))?; // header canary (fixed) let steam_id = reader.read_u64::()?.into(); let session_id = reader.read_i32::()?; Ok(( NetMessageHeader { source_job_id, target_job_id, steam_id, session_id, target_job_name: None, result: None, }, 4 + 3 + 8 + 8 + 1 + 8 + 4, )) } } pub(crate) fn write( &self, writer: &mut W, kind: EMsg, proto: bool, ) -> std::io::Result<()> { if kind == EMsg::k_EMsgChannelEncryptResponse { writer.write_u32::(kind.value() as u32)?; } else if proto { trace!("writing header for {:?} protobuf message: {:?}", kind, self); let proto_header = self.proto_header(kind); writer.write_u32::(kind.value() as u32 | PROTO_MASK)?; writer.write_u32::(proto_header.compute_size() as u32)?; proto_header.write_to_writer(writer)?; } else { trace!("writing header for {:?} message: {:?}", kind, self); writer.write_u32::(kind.value() as u32)?; writer.write_u8(32)?; writer.write_u16::(2)?; writer.write_u64::(self.target_job_id)?; writer.write_u64::(self.source_job_id)?; writer.write_u8(239)?; writer.write_u64::(self.steam_id.into())?; writer.write_i32::(self.session_id)?; } Ok(()) } fn proto_header(&self, kind: EMsg) -> CMsgProtoBufHeader { let mut proto_header = CMsgProtoBufHeader::new(); proto_header.set_steamid( if kind == EMsg::k_EMsgServiceMethodCallFromClientNonAuthed { 0 } else { self.steam_id.into() }, ); proto_header.set_client_sessionid(self.session_id); if kind == EMsg::k_EMsgServiceMethodCallFromClientNonAuthed || kind == EMsg::k_EMsgServiceMethodCallFromClient { proto_header.set_realm(1); proto_header.set_jobid_source(self.source_job_id); if self.source_job_id > 0 { proto_header.set_jobid_target(self.target_job_id); } } if let Some(target_job_name) = self.target_job_name.as_deref() { proto_header.set_target_job_name(target_job_name.into()); } proto_header } fn encode_size(&self, kind: EMsg, proto: bool) -> usize { if kind == EMsg::k_EMsgChannelEncryptResponse { 4 } else if proto { let proto_header = self.proto_header(kind); 4 + 4 + proto_header.compute_size() as usize } else { 4 + 1 + 2 + 8 + 8 + 1 + 8 + 4 + 4 } } } #[derive(Debug, Clone)] pub struct RawNetMessage { pub kind: EMsg, pub is_protobuf: bool, pub header: NetMessageHeader, pub data: BytesMut, pub(crate) frame_header_buffer: Option, pub(crate) iv_buffer: Option, pub(crate) header_buffer: BytesMut, } impl RawNetMessage { pub fn read(mut value: BytesMut) -> Result { let mut reader = Cursor::new(&value); let kind = reader .read_i32::() .map_err(|_| NetworkError::InvalidHeader)?; let is_protobuf = kind < 0; let kind = kind & (!PROTO_MASK) as i32; let kind = match EMsg::from_i32(kind) { Some(kind) => kind, None => return Err(NetworkError::InvalidMessageKind(kind)), }; trace!( "reading header for {:?} {}message", kind, if is_protobuf { "protobuf " } else { "" } ); let header_start = reader.position() as usize; let (header, body_start) = NetMessageHeader::read(&mut reader, kind, is_protobuf)?; value.advance(header_start); let header_buffer = value.split_to(body_start - header_start); Ok(RawNetMessage { kind, is_protobuf, header, data: value, frame_header_buffer: None, iv_buffer: None, header_buffer, }) } pub fn from_message(header: NetMessageHeader, message: T) -> Result { Self::from_message_with_kind(header, message, T::KIND) } pub fn from_message_with_kind( mut header: NetMessageHeader, message: T, kind: EMsg, ) -> Result { debug!("writing raw {:?} message", kind); message.process_header(&mut header); let body_size = message.encode_size(); // allocate the buffer with extra bytes and split those off // this allows later re-joining the bytes and use the space for the frame header and iv // without having to copy the message again // // 8 byte frame header, 16 byte iv, header, body, 16 byte encryption padding let mut buff = BytesMut::with_capacity( 8 + 16 + header.encode_size(kind, T::IS_PROTOBUF) + body_size + 16, ); buff.extend([0; 8 + 16]); let frame_header_buffer = buff.split_to(8); let iv_buffer = buff.split_to(16); { let mut writer = (&mut buff).writer(); header.write(&mut writer, kind, T::IS_PROTOBUF)?; } let header_buffer = buff.split(); let mut writer = (&mut buff).writer(); message.write_body(&mut writer)?; trace!("encoded body({} bytes): {:?}", buff.len(), buff.as_ref()); Ok(RawNetMessage { kind, is_protobuf: T::IS_PROTOBUF, header, data: buff, frame_header_buffer: Some(frame_header_buffer), iv_buffer: Some(iv_buffer), header_buffer, }) } } impl RawNetMessage { pub fn into_message(self) -> Result { if let Some(result) = self.header.result { EResult::from_result(result)?; } if self.kind == T::KIND { trace!( "reading body of {:?} message({} bytes)", self.kind, self.data.len() ); let body = T::read_body(self.data, &self.header)?; Ok(body) } else { Err(NetworkError::DifferentMessage(T::KIND, self.kind)) } } }