use crate::{Error, Notification}; use futures::future; use futures::{pin_mut, Stream}; use std::collections::VecDeque; use std::future::Future; use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::runtime::Runtime; use tokio_opengauss::error::DbError; use tokio_opengauss::AsyncMessage; pub struct Connection { runtime: Runtime, connection: Pin> + Send>>, notifications: VecDeque, notice_callback: Arc, } impl Connection { pub fn new( runtime: Runtime, connection: tokio_opengauss::Connection, notice_callback: Arc, ) -> Connection where S: AsyncRead + AsyncWrite + Unpin + 'static + Send, T: AsyncRead + AsyncWrite + Unpin + 'static + Send, { Connection { runtime, connection: Box::pin(ConnectionStream { connection }), notifications: VecDeque::new(), notice_callback, } } pub fn as_ref(&mut self) -> ConnectionRef<'_> { ConnectionRef { connection: self } } pub fn enter(&self, f: F) -> T where F: FnOnce() -> T, { let _guard = self.runtime.enter(); f() } pub fn block_on(&mut self, future: F) -> Result where F: Future>, { pin_mut!(future); self.poll_block_on(|cx, _, _| future.as_mut().poll(cx)) } pub fn poll_block_on(&mut self, mut f: F) -> Result where F: FnMut(&mut Context<'_>, &mut VecDeque, bool) -> Poll>, { let connection = &mut self.connection; let notifications = &mut self.notifications; let notice_callback = &mut self.notice_callback; self.runtime.block_on({ future::poll_fn(|cx| { let done = loop { match connection.as_mut().poll_next(cx) { Poll::Ready(Some(Ok(AsyncMessage::Notification(notification)))) => { notifications.push_back(notification); } Poll::Ready(Some(Ok(AsyncMessage::Notice(notice)))) => { notice_callback(notice) } Poll::Ready(Some(Ok(_))) => {} Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e)), Poll::Ready(None) => break true, Poll::Pending => break false, } }; f(cx, notifications, done) }) }) } pub fn notifications(&self) -> &VecDeque { &self.notifications } pub fn notifications_mut(&mut self) -> &mut VecDeque { &mut self.notifications } } pub struct ConnectionRef<'a> { connection: &'a mut Connection, } // no-op impl to extend the borrow until drop impl Drop for ConnectionRef<'_> { #[inline] fn drop(&mut self) {} } impl Deref for ConnectionRef<'_> { type Target = Connection; #[inline] fn deref(&self) -> &Connection { self.connection } } impl DerefMut for ConnectionRef<'_> { #[inline] fn deref_mut(&mut self) -> &mut Connection { self.connection } } struct ConnectionStream { connection: tokio_opengauss::Connection, } impl Stream for ConnectionStream where S: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin, { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.connection.poll_message(cx) } }