pub mod types; pub mod compression; pub mod mongo { use eyre::Result; use mongodb::bson::Document; use mongodb::error::{ErrorKind, WriteFailure}; use mongodb::{Client as MongoClient, Collection}; use std::env; pub async fn connect_mongo() -> Result { let mongo_uri = env::var("MONGO_URI")?; let client = MongoClient::with_uri_str(&mongo_uri).await?; return Ok(client); } pub async fn write_to_collection( message: String, collection: &Collection, ) -> Result<()> { println!("Writing to collection: {:?}", message); let json_value = serde_json::from_str(&message).unwrap(); let document = Document::from(json_value); let _ = match collection.insert_one(document, None).await { Ok(_) => { println!("Successfully written to collection!"); Ok(()) } Err(e) => match e.kind.as_ref() { ErrorKind::Write(WriteFailure::WriteError(write_error)) if write_error.code == 11000 => { println!("Duplicate key error: {:?}", e); // Handle duplicate key error, e.g., by updating the existing document or ignoring the insert Err(eyre::Report::new(e)) } _ => { println!("Error writing to collection: {:?}", e); Err(eyre::Report::new(e)) } }, }; return Ok(()); } } pub mod gcppubsub { use chrono::{Utc, DateTime}; use eyre::Result; use google_cloud_auth::credentials::CredentialsFile; use google_cloud_googleapis::pubsub::v1::PubsubMessage; use google_cloud_pubsub::client::{Client, ClientConfig}; use google_cloud_pubsub::publisher::Publisher; use google_cloud_pubsub::subscription::Subscription; use std::env; pub async fn connect_pub_sub() -> Result { let service_account_json = env::var("SERVICE_ACCOUNT_JSON")?; let credentials = CredentialsFile::new_from_str(&service_account_json).await?; let config = ClientConfig::default() .with_credentials(credentials) .await .unwrap(); let client = Client::new(config).await.unwrap(); return Ok(client); } pub async fn get_subscription(client: Client, sub_name: &str) -> Result { let subscription = client.subscription(&sub_name); return Ok(subscription); } pub async fn get_publisher(client: Client, topic_name: &str) -> Result { let topic: google_cloud_pubsub::topic::Topic = client.topic(topic_name); if !topic.exists(None).await? { panic!("Topic does not exist") } let publisher = topic.new_publisher(None); return Ok(publisher); } pub async fn publish_message(publisher: &Publisher, message: String, ordering_key: Option) -> Result<()> { //println!("Publishing message: {:?}", message); let publisher = publisher.clone(); let task = tokio::spawn(async move { let msg = PubsubMessage { data: message.into_bytes(), ordering_key: ordering_key.unwrap(), // Set ordering_key if needed (https://cloud.google.com/pubsub/docs/ordering) ..Default::default() }; // Send a message. There are also `publish_bulk` and `publish_immediately` methods. let mut awaiter = publisher.publish(msg).await; // The get method blocks until a server-generated ID or an error is returned for the published message. awaiter.get().await }); let message_id = task.await.unwrap()?; println!("Published message with ID: {:?} at time {:?}", message_id, Utc::now()); Ok(()) } } pub mod blockchain;