use async_trait::async_trait; use eternalfest_core::blob::{BlobId, BlobStore, GetBlobDataOptions, GetBlobOptions, MediaType}; use eternalfest_core::clock::Clock; use eternalfest_core::core::Instant; use eternalfest_core::file::{ CreateDirectoryError, CreateFileError, CreateStoreDirectoryOptions, CreateStoreFileOptions, DeleteItemError, DeleteItemOptions, Directory, DirectoryId, DirectoryIdRef, Drive, DriveId, DriveItem, DriveItemDisplayName, File, FileId, FileStore, GetDirectoryChildrenError, GetDirectoryError, GetDirectoryOptions, GetDriveByOwnerError, GetDriveByOwnerOptions, GetDriveError, GetDriveOptions, GetFileDataError, GetFileError, GetFileOptions, GetItemByPathError, GetItemByPathOptions, }; use eternalfest_core::pg_num::PgU32; use eternalfest_core::types::{AnyError, ApiRef}; use eternalfest_core::user::{UserId, UserIdRef}; use eternalfest_core::uuid::UuidGenerator; use sqlx::postgres::{PgPool, PgQueryResult}; pub struct PgFileStore where TyBlobStore: BlobStore, TyClock: Clock, TyDatabase: ApiRef, TyUuidGenerator: UuidGenerator, { blob_store: TyBlobStore, clock: TyClock, database: TyDatabase, uuid_generator: TyUuidGenerator, } #[derive(Copy, Clone, Debug, Eq, PartialEq)] struct DirectoryMeta { owner: UserIdRef, } impl PgFileStore where TyBlobStore: BlobStore, TyClock: Clock, TyDatabase: ApiRef, TyUuidGenerator: UuidGenerator, { pub fn new(blob_store: TyBlobStore, clock: TyClock, database: TyDatabase, uuid_generator: TyUuidGenerator) -> Self { Self { blob_store, clock, database, uuid_generator, } } async fn get_directory_meta(&self, item_id: DirectoryId) -> Result, AnyError> { #[derive(Debug, sqlx::FromRow)] struct Row { owner_id: UserId, } let row: Option = sqlx::query_as::<_, Row>( r" SELECT owner_id FROM drives INNER JOIN drive_item_closure ON (drives.root_id = drive_item_closure.ancestor_id) INNER JOIN drive_items ON (drive_item_closure.descendant_id = drive_items.drive_item_id) WHERE drive_item_closure.descendant_id = $1::DIRECTORY_ID AND drive_items.type = 'directory'; ", ) .bind(item_id) // It is safe to use outside of a transaction: drive can't change for a given item and owner can't change // for a given drive. .fetch_optional(self.database.as_ref()) .await?; Ok(row.map(|row| DirectoryMeta { owner: UserIdRef { id: row.owner_id }, })) } } #[async_trait] impl FileStore for PgFileStore where TyBlobStore: BlobStore, TyClock: Clock, TyDatabase: ApiRef, TyUuidGenerator: UuidGenerator, { async fn get_drive(&self, options: &GetDriveOptions) -> Result { #[derive(Debug, sqlx::FromRow)] struct Row { drive_id: DriveId, owner_id: UserId, created_at: Instant, updated_at: Instant, root_id: DirectoryId, } let row: Row = sqlx::query_as::<_, Row>( r" SELECT drive_id, owner_id, created_at, updated_at, root_id FROM drives WHERE drives.drive_id = $1::DRIVE_ID; ", ) .bind(options.id) .fetch_optional(self.database.as_ref()) .await .map_err(|e| GetDriveError::Other(e.into()))? .ok_or(GetDriveError::NotFound(options.id))?; Ok(Drive { id: row.drive_id, owner: UserIdRef { id: row.owner_id }, root: DirectoryIdRef { id: row.root_id }, created_at: row.created_at, updated_at: row.updated_at, }) } async fn get_drive_by_owner(&self, options: &GetDriveByOwnerOptions) -> Result { let now = self.clock.now(); let mut tx = self .database .as_ref() .begin() .await .map_err(GetDriveByOwnerError::other)?; #[derive(Debug, sqlx::FromRow)] struct Row { drive_id: DriveId, owner_id: UserId, created_at: Instant, updated_at: Instant, root_id: DirectoryId, } let row: Option = sqlx::query_as::<_, Row>( r" SELECT drive_id, owner_id, created_at, updated_at, root_id FROM drives WHERE drives.owner_id = $1::USER_ID; ", ) .bind(options.id) .fetch_optional(&mut tx) .await .map_err(|e| GetDriveByOwnerError::Other(e.into()))?; let row = match row { Some(row) => row, None => { let new_drive_id = DriveId::from_uuid(self.uuid_generator.next()); let new_root_id = DirectoryId::from_uuid(self.uuid_generator.next()); { let res: PgQueryResult = sqlx::query( r" INSERT INTO drive_items(drive_item_id, type, created_at, updated_at, display_name) VALUES ($1::DIRECTORY_ID, 'directory', $2::INSTANT, $2::INSTANT, '$root'); ", ) .bind(new_root_id) .bind(now) .bind(options.id) .execute(&mut tx) .await .map_err(|e| GetDriveByOwnerError::Other(e.into()))?; assert_eq!(res.rows_affected(), 1); let res: PgQueryResult = sqlx::query( r" INSERT INTO directories(drive_item_id, type) VALUES ($1::DIRECTORY_ID, 'directory'); ", ) .bind(new_root_id) .execute(&mut tx) .await .map_err(|e| GetDriveByOwnerError::Other(e.into()))?; assert_eq!(res.rows_affected(), 1); let res: PgQueryResult = sqlx::query( r" INSERT INTO drive_item_closure(ancestor_id, descendant_id, distance) VALUES ($1::DIRECTORY_ID, $1::DIRECTORY_ID, 0); ", ) .bind(new_root_id) .execute(&mut tx) .await .map_err(|e| GetDriveByOwnerError::Other(e.into()))?; assert_eq!(res.rows_affected(), 1); } sqlx::query_as::<_, Row>( r" INSERT INTO drives (drive_id, created_at, updated_at, owner_id, root_id) VALUES ($1::DRIVE_ID, $2::INSTANT, $2::INSTANT, $3::USER_ID, $4::DIRECTORY_ID) RETURNING drive_id, owner_id, created_at, updated_at, root_id; ", ) .bind(new_drive_id) .bind(now) .bind(options.id) .bind(new_root_id) .fetch_one(&mut tx) .await .map_err(|e| GetDriveByOwnerError::Other(e.into()))? } }; tx.commit().await.map_err(GetDriveByOwnerError::other)?; Ok(Drive { id: row.drive_id, owner: UserIdRef { id: row.owner_id }, root: DirectoryIdRef { id: row.root_id }, created_at: row.created_at, updated_at: row.updated_at, }) } async fn get_item_by_path(&self, _options: &GetItemByPathOptions) -> Result { todo!() } async fn create_directory(&self, options: &CreateStoreDirectoryOptions) -> Result { let parent = self .get_directory_meta(options.parent_id) .await .map_err(CreateDirectoryError::Other)?; let parent = match parent { Some(parent) => parent, None => return Err(CreateDirectoryError::ParentNotFound), }; if let Some(expected_owner) = options.check_owner { if parent.owner.id != expected_owner { return Err(CreateDirectoryError::NotOwner); } } let now = self.clock.now(); let new_dir_id = DirectoryId::from_uuid(self.uuid_generator.next()); let mut tx = self .database .as_ref() .begin() .await .map_err(CreateDirectoryError::other)?; #[derive(Debug, sqlx::FromRow)] struct Row { created_at: Instant, updated_at: Instant, } let row: Row = sqlx::query_as( r" INSERT INTO drive_items(drive_item_id, type, created_at, updated_at, display_name) VALUES ($1::DIRECTORY_ID, 'directory', $2::INSTANT, $2::INSTANT, $3::DRIVE_ITEM_DISPLAY_NAME) RETURNING created_at, updated_at; ", ) .bind(new_dir_id) .bind(now) .bind(&options.display_name) .fetch_one(&mut tx) .await .map_err(|e| CreateDirectoryError::Other(e.into()))?; let res: PgQueryResult = sqlx::query( r" INSERT INTO directories(drive_item_id, type) VALUES ($1::DIRECTORY_ID, 'directory'); ", ) .bind(new_dir_id) .execute(&mut tx) .await .map_err(|e| CreateDirectoryError::Other(e.into()))?; assert_eq!(res.rows_affected(), 1); // Add one link for each ancestor of the parent, and the link to itself let res: PgQueryResult = sqlx::query( r" INSERT INTO drive_item_closure(ancestor_id, descendant_id, distance) ( SELECT ancestor_id, $1::DIRECTORY_ID, distance + 1 FROM drive_item_closure WHERE descendant_id = $2::DIRECTORY_ID ) UNION ALL SELECT $1::DIRECTORY_ID, $1::DIRECTORY_ID, 0; ", ) .bind(new_dir_id) .bind(options.parent_id) .execute(&mut tx) .await .map_err(|e| CreateDirectoryError::Other(e.into()))?; // Root and self links are guaranteed assert!(res.rows_affected() >= 2); tx.commit().await.map_err(CreateDirectoryError::other)?; Ok(Directory { id: new_dir_id, created_at: row.created_at, updated_at: row.updated_at, display_name: options.display_name.clone(), children: Some(Vec::new()), }) } async fn get_directory(&self, options: &GetDirectoryOptions) -> Result { #[derive(Debug, sqlx::FromRow)] struct Row { drive_item_id: DirectoryId, created_at: Instant, updated_at: Instant, display_name: DriveItemDisplayName, } let row: Row = sqlx::query_as::<_, Row>( r" SELECT di.drive_item_id, di.created_at, di.updated_at, di.display_name FROM drive_items di WHERE di.drive_item_id = $1::DIRECTORY_ID AND di.type = 'directory'; ", ) .bind(options.id) .fetch_optional(self.database.as_ref()) .await .map_err(|e| GetDirectoryError::Other(e.into()))? .ok_or(GetDirectoryError::NotFound(options.id))?; Ok(Directory { id: row.drive_item_id, display_name: row.display_name, children: None, created_at: row.created_at, updated_at: row.updated_at, }) } async fn get_directory_children( &self, _options: &GetDirectoryOptions, ) -> Result, GetDirectoryChildrenError> { todo!() } async fn create_file(&self, options: &CreateStoreFileOptions) -> Result { let parent = self .get_directory_meta(options.parent_id) .await .map_err(CreateFileError::Other)?; let parent = match parent { Some(parent) => parent, None => return Err(CreateFileError::ParentNotFound), }; if let Some(expected_owner) = options.check_owner { if parent.owner.id != expected_owner { return Err(CreateFileError::NotOwner); } } let blob = self .blob_store .get_blob(&GetBlobOptions { id: options.blob_id }) .await .map_err(|e| CreateFileError::Other(e.into()))?; let now = self.clock.now(); let new_file_id = FileId::from_uuid(self.uuid_generator.next()); let mut tx = self.database.as_ref().begin().await.map_err(CreateFileError::other)?; #[derive(Debug, sqlx::FromRow)] struct Row { created_at: Instant, updated_at: Instant, } let row: Row = sqlx::query_as( r" INSERT INTO drive_items(drive_item_id, type, created_at, updated_at, display_name) VALUES ($1::FILE_ID, 'file', $2::INSTANT, $2::INSTANT, $3::DRIVE_ITEM_DISPLAY_NAME) RETURNING created_at, updated_at; ", ) .bind(new_file_id) .bind(now) .bind(&options.display_name) .fetch_one(&mut tx) .await .map_err(|e| CreateFileError::Other(e.into()))?; let res: PgQueryResult = sqlx::query( r" INSERT INTO files(drive_item_id, type, blob_id) VALUES ($1::FILE_ID, 'file', $2::BLOB_ID); ", ) .bind(new_file_id) .bind(options.blob_id) .execute(&mut tx) .await .map_err(|e| CreateFileError::Other(e.into()))?; assert_eq!(res.rows_affected(), 1); // Add one link for each ancestor of the parent, and the link to itself let res: PgQueryResult = sqlx::query( r" INSERT INTO drive_item_closure(ancestor_id, descendant_id, distance) ( SELECT ancestor_id, $1::FILE_ID, distance + 1 FROM drive_item_closure WHERE descendant_id = $2::DIRECTORY_ID ) UNION ALL SELECT $1::FILE_ID, $1::FILE_ID, 0; ", ) .bind(new_file_id) .bind(options.parent_id) .execute(&mut tx) .await .map_err(|e| CreateFileError::Other(e.into()))?; // Root and self links are guaranteed assert!(res.rows_affected() >= 2); tx.commit().await.map_err(CreateFileError::other)?; Ok(File { id: new_file_id, created_at: row.created_at, updated_at: row.updated_at, display_name: options.display_name.clone(), media_type: blob.media_type, byte_size: blob.byte_size, }) } async fn get_file(&self, options: &GetFileOptions) -> Result { #[derive(Debug, sqlx::FromRow)] struct Row { drive_item_id: FileId, created_at: Instant, updated_at: Instant, display_name: DriveItemDisplayName, media_type: MediaType, byte_size: PgU32, } // TODO: Retrieve blob data using `self.blob_store` let row: Row = sqlx::query_as::<_, Row>( r" SELECT di.drive_item_id, di.type, di.created_at, di.updated_at, di.display_name, b.media_type, b.byte_size FROM drive_items di INNER JOIN files f ON (di.drive_item_id = f.drive_item_id) INNER JOIN blobs b ON (f.blob_id = b.blob_id) WHERE di.drive_item_id = $1::FILE_ID AND di.type = 'file'; ", ) .bind(options.id) .fetch_optional(self.database.as_ref()) .await .map_err(|e| GetFileError::Other(e.into()))? .ok_or(GetFileError::NotFound(options.id))?; Ok(File { id: row.drive_item_id, display_name: row.display_name, created_at: row.created_at, updated_at: row.updated_at, media_type: row.media_type, byte_size: row.byte_size.into(), }) } async fn get_file_data(&self, options: &GetFileOptions) -> Result, GetFileDataError> { #[derive(Debug, sqlx::FromRow)] struct Row { blob_id: BlobId, } let row: Row = sqlx::query_as::<_, Row>( r" SELECT blob_id FROM files WHERE drive_item_id = $1::FILE_ID; ", ) .bind(options.id) .fetch_optional(self.database.as_ref()) .await .map_err(|e| GetFileDataError::Other(e.into()))? .ok_or(GetFileDataError::NotFound(options.id))?; self .blob_store .get_blob_data(&GetBlobDataOptions { id: row.blob_id }) .await .map_err(|e| GetFileDataError::Other(e.into())) } async fn delete_item(&self, _options: &DeleteItemOptions) -> Result<(), DeleteItemError> { todo!() } } #[cfg(feature = "neon")] impl neon::prelude::Finalize for PgFileStore where TyBlobStore: BlobStore, TyClock: Clock, TyDatabase: ApiRef, TyUuidGenerator: UuidGenerator, { } #[cfg(test)] mod test { use super::PgFileStore; use crate::test::TestApi; use eternalfest_blob_store::pg::PgBlobStore; use eternalfest_buffer_store::fs::FsBufferStore; use eternalfest_core::blob::BlobStore; use eternalfest_core::clock::VirtualClock; use eternalfest_core::core::Instant; use eternalfest_core::file::FileStore; use eternalfest_core::user::UserStore; use eternalfest_core::uuid::Uuid4Generator; use eternalfest_db_schema::force_create_latest; use eternalfest_user_store::pg::PgUserStore; use serial_test::serial; use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; use sqlx::PgPool; use std::sync::Arc; async fn make_test_api( ) -> TestApi, Arc, Arc, Arc, Arc> { let config = eternalfest_config::find_config(std::env::current_dir().unwrap()).unwrap(); let admin_database: PgPool = PgPoolOptions::new() .max_connections(5) .connect_with( PgConnectOptions::new() .host(&config.db.host) .port(config.db.port) .database(&config.db.name) .username(&config.db.admin_user) .password(&config.db.admin_password), ) .await .unwrap(); force_create_latest(&admin_database, true).await.unwrap(); admin_database.close().await; let database: PgPool = PgPoolOptions::new() .max_connections(5) .connect_with( PgConnectOptions::new() .host(&config.db.host) .port(config.db.port) .database(&config.db.name) .username(&config.db.user) .password(&config.db.password), ) .await .unwrap(); let database = Arc::new(database); let clock = Arc::new(VirtualClock::new(Instant::ymd_hms(2020, 1, 1, 0, 0, 0))); let uuid_generator = Arc::new(Uuid4Generator); let data_root = config.data.root.to_file_path().expect("InvalidDataRoot"); let buffer_store = Arc::new(FsBufferStore::new(uuid_generator.clone(), data_root).await); let blob_store: Arc = Arc::new(PgBlobStore::new( buffer_store, clock.clone(), database.clone(), uuid_generator.clone(), )); let user_store: Arc = Arc::new(PgUserStore::new(clock.clone(), database.clone())); let file_store: Arc = Arc::new(PgFileStore::new( blob_store.clone(), clock.clone(), database, uuid_generator.clone(), )); TestApi { blob_store, clock, file_store, user_store, uuid_generator, } } test_file_store!( #[serial] || make_test_api().await ); }