// Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. use std::collections::HashMap; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use futures::executor::block_on; // use futures::StreamExt; use google_cloud_rust_raw::bigtable::admin::v2::{ bigtable_instance_admin::GetClusterRequest, bigtable_instance_admin_grpc::BigtableInstanceAdminClient, bigtable_table_admin::CreateTableRequest, bigtable_table_admin::DeleteTableRequest, bigtable_table_admin::ListTablesRequest, bigtable_table_admin_grpc::BigtableTableAdminClient, instance::Cluster, table::ColumnFamily, table::GcRule, table::Table, }; use google_cloud_rust_raw::bigtable::v2::{ bigtable::mutate_rows_request::Entry, data::mutation::SetCell, data::Mutation, }; /* use google_cloud_rust_raw::bigtable::v2::{ bigtable::MutateRowResponse, bigtable::MutateRowsRequest, bigtable_grpc::BigtableClient, }; */ use google_cloud_rust_raw::empty::Empty; use grpcio::{Channel, ChannelBuilder, ChannelCredentials, ClientUnaryReceiver, EnvBuilder}; use protobuf::well_known_types::duration::Duration; use protobuf::MessageField; #[allow(dead_code)] fn timestamp() -> u128 { let start = SystemTime::now(); let time = start .duration_since(UNIX_EPOCH) .expect("Failed to fetch timestamp"); time.as_micros() } /// Create a new channel used for the different types of clients fn connect(endpoint: &str) -> Channel { // Set up the gRPC environment. let env = Arc::new(EnvBuilder::new().build()); let creds = ChannelCredentials::google_default_credentials().expect("No Google credentials found"); // Create a channel to connect to Gcloud. ChannelBuilder::new(env) // Set the max size to correspond to server-side limits. .max_send_message_len(1 << 28) .max_receive_message_len(1 << 28) .set_credentials(creds) .connect(&endpoint) } /// Returns the cluster information /// fn get_cluster( client: &BigtableInstanceAdminClient, cluster_id: &str, ) -> ::grpcio::Result { println!("Get cluster information"); let request = GetClusterRequest { name: cluster_id.to_string(), ..Default::default() }; client.get_cluster(&request) } /// Lists all tables for a given cluster /// fn list_tables(client: &BigtableTableAdminClient, instance_id: &str) { println!("List all existing tables"); let request = ListTablesRequest { parent: instance_id.to_string(), ..Default::default() }; match client.list_tables(&request) { Ok(response) => { response .tables .iter() .for_each(|table| println!(" table: {:?}", table)); } Err(error) => println!("Failed to list tables: {}", error), } } /// Create a new table in the BigTable cluster /// fn create_table( client: &BigtableTableAdminClient, instance_id: &str, table_name: &str, table: Table, ) -> ::grpcio::Result { println!("Creating table {}", table_name); let request = CreateTableRequest { parent: instance_id.to_string(), table: MessageField::some(table), table_id: "hello-world".to_string(), ..Default::default() }; client.create_table(&request) } /// Deletes a table asynchronously, returns a future fn delete_table_async( client: &BigtableTableAdminClient, table_name: &str, ) -> grpcio::Result> { println!("Deleting the {} table", table_name); let request = DeleteTableRequest { name: table_name.to_string(), ..Default::default() }; client.delete_table_async(&request) } async fn async_main() { // BigTable project id let _project_id = String::from("mozilla-rust-sdk-dev"); // The BigTable instance id let instance_id = String::from("projects/mozilla-rust-sdk-dev/instances/mozilla-rust-sdk"); // The cluster id let cluster_id = String::from( "projects/mozilla-rust-sdk-dev/instances/mozilla-rust-sdk/clusters/mozilla-rust-sdk-c1", ); // Google Cloud configuration. let admin_endpoint = "bigtableadmin.googleapis.com"; // The table name let table_name = String::from("projects/mozilla-rust-sdk-dev/instances/mozilla-rust-sdk/tables/hello-world"); let column_family_id = "cf1"; // Create a Bigtable client. let channel = connect(admin_endpoint); let admin_client = BigtableInstanceAdminClient::new(channel.clone()); // display cluster information let cluster = get_cluster(&admin_client, &cluster_id).unwrap(); dbg!(cluster); // create admin client for tables let admin_client = BigtableTableAdminClient::new(channel.clone()); // display current tables list_tables(&admin_client, &instance_id); // create a new table with a custom column family / gc rule let duration = Duration { seconds: 60 * 60 * 24 * 5, ..Default::default() }; let mut gc_rule = GcRule::new(); gc_rule.set_max_num_versions(2); gc_rule.set_max_age(duration); let column_family = ColumnFamily { gc_rule: MessageField::some(gc_rule), ..Default::default() }; let mut hash_map = HashMap::new(); hash_map.insert(column_family_id.to_string(), column_family); let table = Table { column_families: hash_map, ..Default::default() }; match create_table(&admin_client, &instance_id, &table_name, table) { Ok(table) => println!(" table {:?} created", table), Err(error) => println!(" failed to created table: {}", error), } // insert entries into new table println!("Insert entries into table"); let greetings = vec!["Hello World!", "Hello Cloud!", "Hello Rust!"]; let mut mutation_requests = Vec::new(); let column = "greeting"; for (i, greeting) in greetings.iter().enumerate() { let row_key = format!("greeting{}", i); let set_cell = SetCell { column_qualifier: column.to_string().into_bytes(), timestamp_micros: -1, value: greeting.to_string().into_bytes(), family_name: column_family_id.to_string(), ..Default::default() }; let mut mutation = Mutation::new(); mutation.set_set_cell(set_cell); let entry = Entry { row_key: row_key.into_bytes(), mutations: vec![mutation], ..Default::default() }; mutation_requests.push(entry); } // TODO:: fix this.admin_client // `.collect()` needs a type. // apply changes and check responses /* let channel = connect(endpoint); let request = MutateRowsRequest { table_name: table_name.to_string(), entries: mutation_requests, ..Default::default() }; let client = BigtableClient::new(channel.clone()); let response = client .mutate_rows(&request) .unwrap() .collect::>>() .await .unwrap(); for response in response.iter() { for entry in response.get_entries().iter() { let status = entry.get_status(); println!( " entry index: {}, status: {} - {}", entry.get_index(), status.code, status.message ); } } */ // display all tables, should include new table list_tables(&admin_client, &instance_id); // delete the table delete_table_async(&admin_client, &table_name) .unwrap() .await .map_err(|e| dbg!(e)) .expect("Failure"); // list of tables should not have deleted table list_tables(&admin_client, &instance_id); } fn main() { block_on(async_main()) }