// #![feature(async_fn_in_trait)] // use futures::{future, SinkExt, Stream, StreamExt, TryStreamExt}; // use http::{ // header::{CONNECTION, SEC_WEBSOCKET_ACCEPT, SEC_WEBSOCKET_KEY, UPGRADE}, // Request, Response, StatusCode, // }; // use hyper::{upgrade::Upgraded, Body}; // use std::{ // collections::HashMap, // convert::Infallible, // fmt::Debug, // net::SocketAddr, // pin::Pin, // sync::Arc, // task::{Context, Poll}, // time::Duration, // }; // use serde::de::DeserializeOwned; // use serde_json::Value; // use std::sync::Mutex; // use tokio::sync::{ // mpsc::{unbounded_channel, UnboundedSender}, // oneshot::Sender, // }; // use tokio_tungstenite::{ // self, // tungstenite::{handshake::derive_accept_key, protocol::Role, Message}, // WebSocketStream, // }; // use dafunk_core::{ // payload::Payload, // types::{ // action::{GetStatus, OnebotAction}, // response::OnebotActionResponse, // *, // }, // }; // use dafunk_core::obc::OneBotConnection; // type ActionMap = Mutex>>; // type ResponseSender = Sender>; // type ResponseMap = Mutex>; // pub struct EventStream { // receiver: tokio::sync::mpsc::UnboundedReceiver, // } // impl EventStream { // /// Create a new event stream. // /// // /// ``` // /// let (tx, rx) = unbounded_channel::(); // /// let stream = EventStream::new(rx); // /// ``` // /// // pub fn new(receiver: tokio::sync::mpsc::UnboundedReceiver) -> Self { // Self { receiver } // } // } // impl Stream for EventStream { // type Item = E; // fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // match self.receiver.poll_recv(cx) { // Poll::Ready(Some(event)) => Poll::Ready(Some(event)), // Poll::Ready(None) => Poll::Ready(None), // Poll::Pending => Poll::Pending, // } // } // } // /// A connection to a OneBot implementation using the WebSocket Reverse protocol. // /// // /// See [Onebot-反向Websocket](https://12.onebot.dev/connect/communication/websocket-reverse/) for more information. // pub struct WSRConn { // access_token: Option, // addr: SocketAddr, // responses: ResponseMap, // actions: ActionMap, // } // impl WSRConn { // /// Create a new [WSRConn] // /// ``` // /// use dafunk::connection::wsr::WSRConn; // /// // /// let conn = WSRConn::new("127.0.0.1:6700".parse().unwrap()); // /// ``` // /// // pub fn new(addr: SocketAddr) -> Self { // Self { // access_token: None, // addr, // responses: Mutex::new(HashMap::new()), // actions: Mutex::new(HashMap::new()), // } // } // /// Set the access token for the connection // pub fn token(self, access_token: String) -> Self { // Self { // access_token: Some(access_token), // ..self // } // } // } // async fn handle_connection( // ws_stream: WebSocketStream, // tx: UnboundedSender, // wsr: Arc, // ) where // E: DeserializeOwned + Debug + Send + Sync + 'static, // { // let (mut outgoing, incoming) = ws_stream.split(); // let (a_tx, mut a_rx) = unbounded_channel::(); // let broadcast_incoming = incoming.try_for_each(|msg| { // let value = match msg { // Message::Text(ref text) => serde_json::from_str::(&text).unwrap(), // Message::Binary(ref bin) => serde_json::from_slice::(&bin).unwrap(), // _ => return future::ok(()), // }; // let action_response = serde_json::from_value::>(value.clone()); // if let Ok(action_response) = action_response { // println!("action_response"); // if let Some(tx) = wsr // .responses // .lock() // .unwrap() // .remove(&action_response.echo.clone().unwrap_or_default()) // { // tx.send(action_response).ok(); // println!("action_response send"); // } // return future::ok(()); // } // let selft = serde_json::from_value::(value.clone()["self"].clone()); // if let Ok(selft) = selft { // wsr.actions // .lock() // .unwrap() // .insert(selft.clone(), a_tx.clone()); // } // let event = serde_json::from_value::(value); // if let Ok(event) = event { // tx.send(event).ok(); // } // future::ok(()) // }); // let fut = async move { // while let Some(action) = a_rx.recv().await { // outgoing.send(Message::Text(action)).await.ok(); // } // }; // tokio::select! { // _ = broadcast_incoming => {} // _ = fut => {} // } // } // async fn handle_request( // mut req: Request, // wsr: Arc, // tx: UnboundedSender, // ) -> Result, Infallible> // where // E: DeserializeOwned + Debug + Send + Sync + 'static, // { // if wsr.access_token.is_some() { // match req.headers().get("Authorization") { // Some(auth) if auth == wsr.access_token.as_ref().unwrap() => {} // _ => return Ok(Response::builder().status(401).body(Body::empty()).unwrap()), // }; // } // let headers = req.headers(); // let key = headers.get(SEC_WEBSOCKET_KEY); // let derived = key.map(|k| derive_accept_key(k.as_bytes())); // let wsr = wsr.clone(); // tokio::spawn(async move { // match hyper::upgrade::on(&mut req).await { // Ok(upgraded) => { // handle_connection( // WebSocketStream::from_raw_socket(upgraded, Role::Server, None).await, // tx, // wsr, // ) // .await // } // Err(_) => {} // } // }); // let mut res = Response::new(Body::empty()); // *res.status_mut() = StatusCode::SWITCHING_PROTOCOLS; // res.headers_mut() // .append(CONNECTION, "Upgrade".parse().unwrap()); // res.headers_mut() // .append(UPGRADE, "websocket".parse().unwrap()); // res.headers_mut().append( // SEC_WEBSOCKET_ACCEPT, // derived.unwrap_or("".into()).parse().unwrap(), // ); // Ok(res) // } // type BoxError = Box; // impl OneBotConnection for WSRConn { // type Error = BoxError; // type StreamOutput = EventStream; // async fn send( // self: Arc, // action: OnebotAction, // ) -> Result, Self::Error> // where // A: 'static, // A: Payload, // { // let mut action = action; // let (tx, rx) = tokio::sync::oneshot::channel::>(); // let echo = action // .echo // .clone() // .unwrap_or(uuid::Uuid::new_v4().to_string()); // action.echo = Some(echo.clone()); // let selft = action.self_.clone().unwrap(); // let payload = action.json(); // self.responses.lock().unwrap().insert(echo, tx); // let tx = self.actions.lock().unwrap().get(&selft).cloned(); // if let Some(tx) = tx { // tx.send(payload).ok(); // } else { // return Err("No connection for selft".into()); // } // tokio::time::timeout(Duration::from_secs(10), rx) // .await? // .map_err(|_| "timeout".into()) // .map(|res| res.into_response()) // } // async fn receive(self: Arc) -> Self::StreamOutput // where // E: 'static + Send + Sync, // E: DeserializeOwned, // E: Debug, // { // let (tx, rx) = unbounded_channel::(); // let wsr = self.clone(); // let make_svc_fn = hyper::service::make_service_fn(move |_| { // let wsr = wsr.clone(); // let tx = tx.clone(); // async move { // Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request| { // let wsr = wsr.clone(); // let tx: UnboundedSender = tx.clone(); // async move { handle_request(req, wsr, tx).await } // })) // } // }); // let server = hyper::Server::bind(&self.addr).serve(make_svc_fn); // tokio::spawn(server); // EventStream::new(rx) // } // } // #[tokio::main] // async fn main() { // let wsr = WSRConn::new("127.0.0.1:6703".parse().unwrap()); // let wsr = Arc::new(wsr); // let mut stream = wsr.clone().receive::().await; // while let Some(_) = stream.next().await { // let wsr = wsr.clone(); // tokio::spawn(async move { // let get_status = GetStatus {}; // let mut action = OnebotAction::new(get_status); // action.self_ = Some(Selft { // platform: "qq".into(), // user_id: "1057584970".into(), // }); // let res = wsr.send(action).await; // println!("{:?}", res); // }); // } // } fn main() {}