use bytes::Bytes; use crate::{ body::{FromBody, IntoBody}, codec::MessageCodec, http::{RequestOpts, ResponseInfo}, metadata::Metadata, status::{Code, Status, StatusError}, }; pub struct Client { base_url: reqwest::Url, http: reqwest::Client, } impl Client { pub fn new(base_url: &str) -> anyhow::Result { Ok(Self { base_url: base_url.parse()?, http: Default::default(), }) } pub async fn unary<'a, Req, Resp>( &self, path: impl AsRef, msg: Req, opts: RequestOpts, ) -> anyhow::Result<(Resp, ResponseInfo)> where Req: IntoBody, Req::Body: Into, Req::Error: Into, Resp: FromBody, Resp::Error: Into, { const CODEC: MessageCodec = MessageCodec::Json; let timeout = opts.timeout; let url = self.base_url.join(path.as_ref())?; let body = msg.into_body(&CODEC).map_err(Into::into)?; let req = crate::http::unary_request(url.as_str(), CODEC, body, opts)?; let resp_fut = async { let mut resp = self.execute_request(req.try_into()?).await?; let resp_info = ResponseInfo::new(std::mem::take(resp.headers_mut())); let body = resp.bytes().await?; let resp_msg = Resp::from_body(body, &MessageCodec::Json) .await .map_err(Into::into)?; Ok((resp_msg, resp_info)) }; if timeout.is_zero() { resp_fut.await } else { tokio::time::timeout(timeout, resp_fut) .await .unwrap_or_else(|_elapsed| { Err(StatusError::new(Code::DeadlineExceeded, "client timeout elapsed").into()) }) } } async fn execute_request(&self, req: reqwest::Request) -> anyhow::Result { let mut resp = self.http.execute(req).await?; if !resp.status().is_success() { let code = crate::http::status_to_code(resp.status()); let metadata = Metadata::from_headers(std::mem::take(resp.headers_mut())); let body = resp.bytes().await?; let status = serde_json::from_slice(&body).unwrap_or_else(|err| { if !body.is_empty() { tracing::warn!("Failed to parse error response body: {err}"); } Status::new(code, code.to_string()) }); return Err(StatusError { status, metadata }.into()); } Ok(resp) } }