//! # google-cloud-pubsub //! //! Google Cloud Platform pub/sub library. //! //! * [About Cloud Pub/Sub](https://cloud.google.com/pubsub/) //! * [Pub/Sub API Documentation](https://cloud.google.com/pubsub/docs) //! //! ## Quickstart //! //! ### Authentication //! There are two ways to create a client that is authenticated against the google cloud. //! //! #### Automatically //! //! The function `with_auth()` will try and read the credentials from a file specified in the environment variable `GOOGLE_APPLICATION_CREDENTIALS`, `GOOGLE_APPLICATION_CREDENTIALS_JSON` or //! from a metadata server. //! //! This is also described in [google-cloud-auth](https://github.com/yoshidan/google-cloud-rust/blob/main/foundation/auth/README.md) //! //! ```rust //! use google_cloud_pubsub::client::{ClientConfig, Client}; //! //! async fn run() { //! let config = ClientConfig::default().with_auth().await.unwrap(); //! let client = Client::new(config).await.unwrap(); //! } //! ``` //! //! ### Manually //! //! When you cant use the `gcloud` authentication but you have a different way to get your credentials (e.g a different environment variable) //! you can parse your own version of the 'credentials-file' and use it like that: //! //! ```rust //! use google_cloud_auth::credentials::CredentialsFile; //! // or google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile //! use google_cloud_pubsub::client::{ClientConfig, Client}; //! //! async fn run(cred: CredentialsFile) { //! let config = ClientConfig::default().with_credentials(cred).await.unwrap(); //! let client = Client::new(config).await.unwrap(); //! } //! ``` //! //! ### Emulator //! For tests, you can use the [Emulator-Option](https://github.com/yoshidan/google-cloud-rust/blob/cbd5ed1315d7b828c89a50fe71fcbaf15ddc964b/pubsub/src/client.rs#L32) like that: //! Before executing the program, specify the address of the emulator in the following environment variable. //! //! ```sh //! export PUBSUB_EMULATOR_HOST=localhost:8681 //! ``` //! //! ### Publish Message //! //! ``` //! use google_cloud_pubsub::client::{Client, ClientConfig}; //! use google_cloud_googleapis::pubsub::v1::PubsubMessage; //! use google_cloud_pubsub::topic::TopicConfig; //! use google_cloud_pubsub::subscription::SubscriptionConfig; //! use google_cloud_gax::grpc::Status; //! use tokio::task::JoinHandle; //! use tokio_util::sync::CancellationToken; //! //! async fn run(config: ClientConfig) -> Result<(), Status> { //! //! // Create pubsub client. //! let client = Client::new(config).await.unwrap(); //! //! // Create topic. //! let topic = client.topic("test-topic"); //! if !topic.exists(None).await? { //! topic.create(None, None).await?; //! } //! //! // Start publisher. //! let publisher = topic.new_publisher(None); //! //! // Publish message. //! let tasks : Vec>> = (0..10).into_iter().map(|_i| { //! let publisher = publisher.clone(); //! tokio::spawn(async move { //! let msg = PubsubMessage { //! data: "abc".into(), //! // Set ordering_key if needed (https://cloud.google.com/pubsub/docs/ordering) //! ordering_key: "order".into(), //! ..Default::default() //! }; //! //! // Send a message. There are also `publish_bulk` and `publish_immediately` methods. //! let mut awaiter = publisher.publish(msg).await; //! //! // The get method blocks until a server-generated ID or an error is returned for the published message. //! awaiter.get().await //! }) //! }).collect(); //! //! // Wait for all publish task finish //! for task in tasks { //! let message_id = task.await.unwrap()?; //! } //! //! // Wait for publishers in topic finish. //! let mut publisher = publisher; //! publisher.shutdown(); //! //! Ok(()) //! } //! ``` //! //! ### Subscribe Message //! //! ``` //! use google_cloud_pubsub::client::{Client, ClientConfig}; //! use google_cloud_googleapis::pubsub::v1::PubsubMessage; //! use google_cloud_pubsub::subscription::SubscriptionConfig; //! use google_cloud_gax::grpc::Status; //! use std::time::Duration; //! use tokio_util::sync::CancellationToken; //! use futures_util::StreamExt; //! //! async fn run(config: ClientConfig) -> Result<(), Status> { //! //! // Create pubsub client. //! let client = Client::new(config).await.unwrap(); //! //! // Get the topic to subscribe to. //! let topic = client.topic("test-topic"); //! //! // Create subscription //! // If subscription name does not contain a "/", then the project is taken from client above. Otherwise, the //! // name will be treated as a fully qualified resource name //! let config = SubscriptionConfig { //! // Enable message ordering if needed (https://cloud.google.com/pubsub/docs/ordering) //! enable_message_ordering: true, //! ..Default::default() //! }; //! //! // Create subscription //! let subscription = client.subscription("test-subscription"); //! if !subscription.exists(None).await? { //! subscription.create(topic.fully_qualified_name(), config, None).await?; //! } //! //! // Token for cancel. //! let cancel = CancellationToken::new(); //! let cancel2 = cancel.clone(); //! tokio::spawn(async move { //! // Cancel after 10 seconds. //! tokio::time::sleep(Duration::from_secs(10)).await; //! cancel2.cancel(); //! }); //! //! // Receive blocks until the ctx is cancelled or an error occurs. //! // Or simply use the `subscription.subscribe` method. //! subscription.receive(|mut message, cancel| async move { //! // Handle data. //! println!("Got Message: {:?}", message.message.data); //! //! // Ack or Nack message. //! let _ = message.ack().await; //! }, cancel.clone(), None).await?; //! //! // Delete subscription if needed. //! subscription.delete(None).await?; //! //! Ok(()) //! } //! ``` //! //! ### Subscribe Message (Alternative Way) //! //! After canceling, wait until all pulled messages are processed. //! ``` //! use std::time::Duration; //! use futures_util::StreamExt; //! use google_cloud_pubsub::client::{Client, ClientConfig}; //! use google_cloud_googleapis::pubsub::v1::PubsubMessage; //! use google_cloud_pubsub::subscription::{SubscribeConfig, SubscriptionConfig}; //! use google_cloud_gax::grpc::Status; //! //! async fn run(config: ClientConfig) -> Result<(), Status> { //! // Creating Client, Topic and Subscription... //! let client = Client::new(config).await.unwrap(); //! let subscription = client.subscription("test-subscription"); //! //! // Read the messages as a stream //! let mut stream = subscription.subscribe(None).await.unwrap(); //! let cancellable = stream.cancellable(); //! let task = tokio::spawn(async move { //! // None if the stream is cancelled //! while let Some(message) = stream.next().await { //! message.ack().await.unwrap(); //! } //! }); //! tokio::time::sleep(Duration::from_secs(60)).await; //! cancellable.cancel(); //! let _ = task.await; //! Ok(()) //! } //! ``` //! //! Unprocessed messages are nack after cancellation. //! ``` //! use std::time::Duration; //! use google_cloud_pubsub::client::{Client, ClientConfig}; //! use google_cloud_googleapis::pubsub::v1::PubsubMessage; //! use google_cloud_pubsub::subscription::{SubscribeConfig, SubscriptionConfig}; //! use google_cloud_gax::grpc::Status; //! //! async fn run(config: ClientConfig) -> Result<(), Status> { //! // Creating Client, Topic and Subscription... //! let client = Client::new(config).await.unwrap(); //! let subscription = client.subscription("test-subscription"); //! //! // Read the messages as a stream //! let mut stream = subscription.subscribe(None).await.unwrap(); //! let cancellable = stream.cancellable(); //! let task = tokio::spawn(async move { //! // None if the tream is cancelled //! while let Some(message) = stream.read().await { //! message.ack().await.unwrap(); //! } //! }); //! tokio::time::sleep(Duration::from_secs(60)).await; //! cancellable.cancel(); //! let _ = task.await; //! Ok(()) //! } //! ``` pub mod apiv1; pub mod client; pub mod publisher; pub mod subscriber; pub mod subscription; pub mod topic; pub mod util;