// Copyright 2024 Vincent Chan // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. use serde::Serialize; use bson::Document; use std::borrow::Borrow; use std::sync::Weak; use serde::de::DeserializeOwned; use crate::options::UpdateOptions; use crate::{Error, IndexModel, Result}; use crate::db::db_inner::DatabaseInner; use crate::action::{Aggregate, Find}; use crate::results::{DeleteResult, InsertManyResult, InsertOneResult, UpdateResult}; macro_rules! try_multiple { ($err: expr, $action: expr) => { match $action { Ok(ret) => ret, Err(expr_err) => { return Err($err.add(expr_err)) }, } } } macro_rules! try_db_op { ($txn: expr, $action: expr) => { match $action { Ok(ret) => { $txn.commit()?; ret } Err(err) => { try_multiple!(err, $txn.rollback()); return Err(err); } } } } pub trait CollectionT { fn name(&self) -> &str; /// Return the size of all data in the collection. fn count_documents(&self) -> Result; /// Updates up to one document matching `query` in the collection. /// [documentation](https://www.polodb.org/docs/curd/update) for more information on specifying updates. fn update_one(&self, query: Document, update: Document) -> Result; fn update_one_with_options(&self, query: Document, update: Document, options: UpdateOptions) -> Result; /// Updates all documents matching `query` in the collection. /// [documentation](https://www.polodb.org/docs/curd/update) for more information on specifying updates. fn update_many(&self, query: Document, update: Document) -> Result; fn update_many_with_options(&self, query: Document, update: Document, options: UpdateOptions) -> Result; /// Deletes up to one document found matching `query`. fn delete_one(&self, query: Document) -> Result; /// When query is `None`, all the data in the collection will be deleted. /// /// The size of data deleted returns. fn delete_many(&self, query: Document) -> Result; fn create_index(&self, index: IndexModel) -> Result<()>; /// Drops the index specified by `name` from this collection. fn drop_index(&self, name: impl AsRef) -> Result<()>; fn drop(&self) -> Result<()>; /// Inserts `doc` into the collection. fn insert_one(&self, doc: impl Borrow) -> Result where T: Serialize; /// Inserts the data in `docs` into the collection. fn insert_many(&self, docs: impl IntoIterator>) -> Result where T: Serialize; /// When query document is passed to the function. The result satisfies /// the query document. fn find(&self, filter: Document) -> Find<'_, '_, T> where T: DeserializeOwned + Send + Sync; /// Finds a single document in the collection matching `filter`. fn find_one(&self, filter: Document) -> Result> where T: DeserializeOwned + Send + Sync; /// Runs an aggregation operation. fn aggregate(&self, pipeline: impl IntoIterator) -> Aggregate<'_, '_>; } /// A wrapper of collection in struct. /// /// All CURD methods can be done through this structure. /// /// It can be used to perform collection-level operations such as CRUD operations. pub struct Collection { db: Weak, name: String, _phantom: std::marker::PhantomData, } impl Collection { pub(crate) fn new(db: Weak, name: &str) -> Collection { Collection { db, name: name.into(), _phantom: std::default::Default::default(), } } } impl CollectionT for Collection { fn name(&self) -> &str { &self.name } fn count_documents(&self) -> Result { let db = self.db.upgrade().ok_or(Error::DbIsClosed)?; let txn = db.start_transaction()?; let count = db.count_documents(&self.name, &txn)?; Ok(count) } fn update_one(&self, query: Document, update: Document) -> Result { let db = self.db.upgrade().ok_or(Error::DbIsClosed)?; let txn = db.start_transaction()?; let result = try_db_op!(txn, db.update_one( &self.name, query, update, UpdateOptions::default(), &txn, )); Ok(result) } fn update_one_with_options(&self, query: Document, update: Document, options: UpdateOptions) -> Result { let db = self.db.upgrade().ok_or(Error::DbIsClosed)?; let txn = db.start_transaction()?; let result = try_db_op!(txn, db.update_one( &self.name, query, update, options, &txn, )); Ok(result) } fn update_many(&self, query: Document, update: Document) -> Result { let db = self.db.upgrade().ok_or(Error::DbIsClosed)?; let txn = db.start_transaction()?; let result = try_db_op!(txn, db.update_many( &self.name, query, update, UpdateOptions::default(), &txn, )); Ok(result) } fn update_many_with_options(&self, query: Document, update: Document, options: UpdateOptions) -> Result { let db = self.db.upgrade().ok_or(Error::DbIsClosed)?; let txn = db.start_transaction()?; let result = try_db_op!(txn, db.update_many( &self.name, query, update, options, &txn, )); Ok(result) } fn delete_one(&self, query: Document) -> Result { let db = self.db.upgrade().ok_or(Error::DbIsClosed)?; let txn = db.start_transaction()?; let result = try_db_op!(txn, db.delete_one(&self.name, query, &txn)); Ok(result) } fn delete_many(&self, query: Document) -> Result { let db = self.db.upgrade().ok_or(Error::DbIsClosed)?; let txn = db.start_transaction()?; let result = try_db_op!(txn, db.delete_many(&self.name, query, &txn)); Ok(result) } fn create_index(&self, index: IndexModel) -> Result<()> { let db = self.db.upgrade().ok_or(Error::DbIsClosed)?; let txn = db.start_transaction()?; try_db_op!(txn, db.create_index(&self.name, index, &txn)); Ok(()) } fn drop_index(&self, name: impl AsRef) -> Result<()> { let db = self.db.upgrade().ok_or(Error::DbIsClosed)?; let txn = db.start_transaction()?; try_db_op!(txn, db.drop_index(&self.name, name.as_ref(), &txn)); Ok(()) } fn drop(&self) -> Result<()> { let db = self.db.upgrade().ok_or(Error::DbIsClosed)?; let txn = db.start_transaction()?; try_db_op!(txn, db.drop_collection(&self.name, &txn)); Ok(()) } fn insert_one(&self, doc: impl Borrow) -> Result where T: Serialize { let db = self.db.upgrade().ok_or(Error::DbIsClosed)?; let txn = db.start_transaction()?; let result = try_db_op!(txn, db.insert_one( &self.name, bson::to_document(doc.borrow())?, &txn, )); Ok(result) } fn insert_many(&self, docs: impl IntoIterator>) -> Result where T: Serialize { let db = self.db.upgrade().ok_or(Error::DbIsClosed)?; let txn = db.start_transaction()?; let result = try_db_op!(txn, db.insert_many(&self.name, docs, &txn)); Ok(result) } fn find(&self, filter: Document) -> Find where T: DeserializeOwned + Send + Sync { Find::new(self.db.clone(), &self.name, None, filter) } fn find_one(&self, filter: Document) -> Result> where T: DeserializeOwned + Send + Sync { let mut cursor = self.find(filter).run()?; let test = cursor.advance()?; if !test { return Ok(None); } Ok(Some(cursor.deserialize_current()?)) } fn aggregate(&self, pipeline: impl IntoIterator) -> Aggregate<'_, '_> { Aggregate::new( self.db.clone(), &self.name, pipeline.into_iter().collect(), None, ) } }