use std::env; use std::time::Duration; use async_trait::async_trait; use bytes::Bytes; use futures_core::Stream; use futures_util::StreamExt; use tokio::sync::Mutex; use ve_tos_rust_sdk::asynchronous::bucket::BucketAPI; use ve_tos_rust_sdk::asynchronous::object::ObjectAPI; use ve_tos_rust_sdk::asynchronous::tos; use ve_tos_rust_sdk::asynchronous::tos::{AsyncSleeper, TosClient, TosClientImpl}; use ve_tos_rust_sdk::bucket::{CreateBucketInput, DeleteBucketInput, HeadBucketInput}; use ve_tos_rust_sdk::credential::{CommonCredentials, CommonCredentialsProvider}; use ve_tos_rust_sdk::object::{DeleteObjectInput, ListObjectsType2Input}; use crate::common::gen_random_string; #[derive(Debug, Default)] pub struct TokioSleeper {} #[async_trait] impl AsyncSleeper for TokioSleeper { async fn sleep(&self, duration: Duration) { tokio::time::sleep(duration).await; } } type DefaultTosClient = TosClientImpl, CommonCredentials, TokioSleeper>; pub struct AsyncContext { client: DefaultTosClient, https_client: DefaultTosClient, buckets: Mutex>, fixed_bucket: String, non_exists_bucket: String, } impl Default for AsyncContext { fn default() -> Self { let ak = env::var("TOS_ACCESS_KEY").unwrap_or("".to_string()); let sk = env::var("TOS_SECRET_KEY").unwrap_or("".to_string()); let ep = env::var("TOS_ENDPOINT").unwrap_or("".to_string()); let https_ep = env::var("TOS_HTTPS_ENDPOINT").unwrap_or("".to_string()); let client = tos::builder::() .connection_timeout(3000) .request_timeout(10000) .max_retry_count(0) .ak(ak.clone()) .sk(sk.clone()) .region("test-region") .endpoint(ep.clone()) .build().unwrap(); let https_client = tos::builder::() .connection_timeout(3000) .request_timeout(10000) .max_retry_count(0) .ak(ak.clone()) .sk(sk.clone()) .region("test-region") .endpoint(https_ep.clone()) .build().unwrap(); Self { client, https_client, buckets: Mutex::new(vec![]), fixed_bucket: "".to_string(), non_exists_bucket: "".to_string(), } } } impl AsyncContext { pub fn client(&self) -> &impl TosClient { &self.client } pub fn https_client(&self) -> &impl TosClient { &self.https_client } pub fn fixed_bucket(&self) -> &str { &self.fixed_bucket } pub fn non_exists_bucket(&self) -> &str { &self.non_exists_bucket } pub async fn add_bucket(&self, bucket: impl Into) { let mut buckets = self.buckets.lock().await; buckets.push(bucket.into()); } pub async fn tear_down(&self) { let buckets = self.buckets.lock().await; for bucket in buckets.iter() { self.clean_bucket(bucket).await; } if self.fixed_bucket != "" { self.clean_bucket(self.fixed_bucket.as_str()).await; } } async fn clean_bucket(&self, bucket: &str) { let mut can_delete_bucket = true; let mut input = ListObjectsType2Input::new(bucket); input.set_max_keys(1000); 'outer: loop { match self.client.list_objects_type2(&input).await { Ok(o) => { for content in o.contents() { if let Err(_) = self.client.delete_object(&DeleteObjectInput::new(bucket, content.key())).await { can_delete_bucket = false; break 'outer; } } if !o.is_truncated() { break; } input.set_continuation_token(o.next_continuation_token()); } Err(_) => { can_delete_bucket = false; break; } } } if can_delete_bucket { let _ = self.client.delete_bucket(&DeleteBucketInput::new(bucket)).await; } } } pub async fn create_async_context() -> AsyncContext { let mut ctx = AsyncContext::default(); let mut non_exists_bucket; loop { non_exists_bucket = gen_random_string(30); if let Err(_) = ctx.client.head_bucket(&HeadBucketInput::new(non_exists_bucket.clone())).await { ctx.non_exists_bucket = non_exists_bucket; break; } } let mut fixed_bucket; loop { fixed_bucket = gen_random_string(10); match ctx.client.create_bucket(&CreateBucketInput::new(fixed_bucket.clone())).await { Ok(_) => { ctx.fixed_bucket = fixed_bucket; break; } Err(e) => { if !e.is_server_error() { panic!("{}", e.to_string()); } let ex = e.as_server_error().unwrap(); if ex.status_code() != 409 { panic!("unexpected status code"); } } } } ctx } pub async fn read_to_string(r: &mut (dyn Stream> + Unpin)) -> String { String::from_utf8(read_to_buf(r).await).unwrap() } pub async fn read_to_buf(r: &mut (dyn Stream> + Unpin)) -> Vec { let mut buf = Vec::new(); loop { match r.next().await { None => return buf, Some(result) => { let x = result.unwrap(); buf.extend_from_slice(x.slice(0..x.len()).as_ref()); } } } }