use crate::connection::ConnectionRef; use crate::{CancelToken, CopyInWriter, CopyOutReader, Portal, RowIter, Statement, ToStatement}; use tokio_opengauss::types::{BorrowToSql, ToSql, Type}; use tokio_opengauss::{Error, Row, SimpleQueryMessage}; /// A representation of a openGauss database transaction. /// /// Transactions will implicitly roll back by default when dropped. Use the `commit` method to commit the changes made /// in the transaction. Transactions can be nested, with inner transactions implemented via savepoints. pub struct Transaction<'a> { connection: ConnectionRef<'a>, transaction: Option>, } impl<'a> Drop for Transaction<'a> { fn drop(&mut self) { if let Some(transaction) = self.transaction.take() { let _ = self.connection.block_on(transaction.rollback()); } } } impl<'a> Transaction<'a> { pub(crate) fn new( connection: ConnectionRef<'a>, transaction: tokio_opengauss::Transaction<'a>, ) -> Transaction<'a> { Transaction { connection, transaction: Some(transaction), } } /// Consumes the transaction, committing all changes made within it. pub fn commit(mut self) -> Result<(), Error> { self.connection .block_on(self.transaction.take().unwrap().commit()) } /// Rolls the transaction back, discarding all changes made within it. /// /// This is equivalent to `Transaction`'s `Drop` implementation, but provides any error encountered to the caller. pub fn rollback(mut self) -> Result<(), Error> { self.connection .block_on(self.transaction.take().unwrap().rollback()) } /// Like `Client::prepare`. pub fn prepare(&mut self, query: &str) -> Result { self.connection .block_on(self.transaction.as_ref().unwrap().prepare(query)) } /// Like `Client::prepare_typed`. pub fn prepare_typed(&mut self, query: &str, types: &[Type]) -> Result { self.connection.block_on( self.transaction .as_ref() .unwrap() .prepare_typed(query, types), ) } /// Like `Client::execute`. pub fn execute(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result where T: ?Sized + ToStatement, { self.connection .block_on(self.transaction.as_ref().unwrap().execute(query, params)) } /// Like `Client::query`. pub fn query(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result, Error> where T: ?Sized + ToStatement, { self.connection .block_on(self.transaction.as_ref().unwrap().query(query, params)) } /// Like `Client::query_one`. pub fn query_one(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result where T: ?Sized + ToStatement, { self.connection .block_on(self.transaction.as_ref().unwrap().query_one(query, params)) } /// Like `Client::query_opt`. pub fn query_opt( &mut self, query: &T, params: &[&(dyn ToSql + Sync)], ) -> Result, Error> where T: ?Sized + ToStatement, { self.connection .block_on(self.transaction.as_ref().unwrap().query_opt(query, params)) } /// Like `Client::query_raw`. pub fn query_raw(&mut self, query: &T, params: I) -> Result, Error> where T: ?Sized + ToStatement, P: BorrowToSql, I: IntoIterator, I::IntoIter: ExactSizeIterator, { let stream = self .connection .block_on(self.transaction.as_ref().unwrap().query_raw(query, params))?; Ok(RowIter::new(self.connection.as_ref(), stream)) } /// Binds parameters to a statement, creating a "portal". /// /// Portals can be used with the `query_portal` method to page through the results of a query without being forced /// to consume them all immediately. /// /// Portals are automatically closed when the transaction they were created in is closed. /// /// # Panics /// /// Panics if the number of parameters provided does not match the number expected. pub fn bind(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result where T: ?Sized + ToStatement, { self.connection .block_on(self.transaction.as_ref().unwrap().bind(query, params)) } /// Continues execution of a portal, returning the next set of rows. /// /// Unlike `query`, portals can be incrementally evaluated by limiting the number of rows returned in each call to /// `query_portal`. If the requested number is negative or 0, all remaining rows will be returned. pub fn query_portal(&mut self, portal: &Portal, max_rows: i32) -> Result, Error> { self.connection.block_on( self.transaction .as_ref() .unwrap() .query_portal(portal, max_rows), ) } /// The maximally flexible version of `query_portal`. pub fn query_portal_raw( &mut self, portal: &Portal, max_rows: i32, ) -> Result, Error> { let stream = self.connection.block_on( self.transaction .as_ref() .unwrap() .query_portal_raw(portal, max_rows), )?; Ok(RowIter::new(self.connection.as_ref(), stream)) } /// Like `Client::copy_in`. pub fn copy_in(&mut self, query: &T) -> Result, Error> where T: ?Sized + ToStatement, { let sink = self .connection .block_on(self.transaction.as_ref().unwrap().copy_in(query))?; Ok(CopyInWriter::new(self.connection.as_ref(), sink)) } /// Like `Client::copy_out`. pub fn copy_out(&mut self, query: &T) -> Result, Error> where T: ?Sized + ToStatement, { let stream = self .connection .block_on(self.transaction.as_ref().unwrap().copy_out(query))?; Ok(CopyOutReader::new(self.connection.as_ref(), stream)) } /// Like `Client::simple_query`. pub fn simple_query(&mut self, query: &str) -> Result, Error> { self.connection .block_on(self.transaction.as_ref().unwrap().simple_query(query)) } /// Like `Client::batch_execute`. pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> { self.connection .block_on(self.transaction.as_ref().unwrap().batch_execute(query)) } /// Like `Client::cancel_token`. pub fn cancel_token(&self) -> CancelToken { CancelToken::new(self.transaction.as_ref().unwrap().cancel_token()) } /// Like `Client::transaction`, but creates a nested transaction via a savepoint. pub fn transaction(&mut self) -> Result, Error> { let transaction = self .connection .block_on(self.transaction.as_mut().unwrap().transaction())?; Ok(Transaction::new(self.connection.as_ref(), transaction)) } /// Like `Client::transaction`, but creates a nested transaction via a savepoint with the specified name. pub fn savepoint(&mut self, name: I) -> Result, Error> where I: Into, { let transaction = self .connection .block_on(self.transaction.as_mut().unwrap().savepoint(name))?; Ok(Transaction::new(self.connection.as_ref(), transaction)) } }