From 253952ad5c6664e4c3b1d0758356bdc261eedcab Mon Sep 17 00:00:00 2001 From: Owen Nelson Date: Wed, 9 Aug 2023 10:17:26 -0700 Subject: [PATCH] add support for GCP PubSub Largely this is a port of code written for `svix-bridge` in the `generic-queue` crate. Adds tests modeled after the rest of the backends here, but with GCP-specific setup and configuration. Scheduled delivery is not implemented since this is not natively supported by GCP PubSub. There are workarounds, but they rely on conventions established by calling code so I'm not sure it makes sense to codify here in the lib. --- omniqueue/Cargo.toml | 12 +- omniqueue/src/backends/gcp_pubsub.rs | 272 +++++++++++++++++++++++++++ omniqueue/src/backends/mod.rs | 2 + omniqueue/tests/gcp_pubsub.rs | 198 +++++++++++++++++++ testing-docker-compose.yml | 10 + 5 files changed, 493 insertions(+), 1 deletion(-) create mode 100644 omniqueue/src/backends/gcp_pubsub.rs create mode 100644 omniqueue/tests/gcp_pubsub.rs diff --git a/omniqueue/Cargo.toml b/omniqueue/Cargo.toml index 89887e3..c32923b 100644 --- a/omniqueue/Cargo.toml +++ b/omniqueue/Cargo.toml @@ -16,6 +16,9 @@ aws-sdk-sqs = { version = "0.25", optional = true } bb8 = { version = "0.8", optional = true } bb8-redis = { version = "0.13", optional = true } futures = { version = "0.3", default-features = false, features = ["async-await", "std"] } +futures-util = { version = "0.3.28", optional = true } +google-cloud-googleapis = { version = "0.10.0", optional = true } +google-cloud-pubsub = { version = "0.18.0", optional = true } lapin = { version = "2", optional = true } rdkafka = { version = "0.29", features = ["cmake-build", "ssl", "tracing"] } redis = { version = "0.23", features = ["tokio-comp", "tokio-native-tls-comp", "streams"], optional = true } @@ -23,6 +26,7 @@ serde = { version = "1", features = ["derive", "rc"] } serde_json = "1" thiserror = "1" tokio = { version = "1", features = ["full"] } +tokio-util = { version = "0.7", optional = true } tracing = "0.1" [dev-dependencies] @@ -31,7 +35,13 @@ tokio-executor-trait = "2.1" tokio-reactor-trait = "1.1" [features] -default = ["memory_queue", "rabbitmq", "redis", "redis_cluster", "sqs"] +default = ["memory_queue", "gcp_pubsub", "rabbitmq", "redis", "redis_cluster", "sqs"] +gcp_pubsub = [ + "dep:futures-util", + "dep:google-cloud-googleapis", + "dep:google-cloud-pubsub", + "dep:tokio-util", +] memory_queue = [] rabbitmq = ["dep:lapin"] redis = ["dep:bb8", "dep:bb8-redis", "dep:redis"] diff --git a/omniqueue/src/backends/gcp_pubsub.rs b/omniqueue/src/backends/gcp_pubsub.rs new file mode 100644 index 0000000..5256fea --- /dev/null +++ b/omniqueue/src/backends/gcp_pubsub.rs @@ -0,0 +1,272 @@ +use crate::{ + decoding::DecoderRegistry, + encoding::{CustomEncoder, EncoderRegistry}, + queue::{consumer::QueueConsumer, producer::QueueProducer, Acker, Delivery, QueueBackend}, + QueueError, +}; +use async_trait::async_trait; +use futures_util::StreamExt; +use google_cloud_googleapis::pubsub::v1::PubsubMessage; +use google_cloud_pubsub::client::{ + google_cloud_auth::credentials::CredentialsFile, Client, ClientConfig, +}; +use google_cloud_pubsub::subscriber::ReceivedMessage; +use google_cloud_pubsub::subscription::Subscription; +use serde::Serialize; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::{any::TypeId, collections::HashMap}; + +pub struct GcpPubSubBackend; + +type Payload = Vec; +type Encoders = EncoderRegistry; +type Decoders = DecoderRegistry; + +// FIXME: topic/subscription are each for read/write. Split config up? +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct GcpPubSubConfig { + pub topic_id: String, + pub subscription_id: String, + pub credentials_file: Option, +} + +/// Make a `ClientConfig` from a `CredentialsFile` on disk. +async fn configure_client_from_file>( + cred_file_path: P, +) -> Result { + let bytes = std::fs::read(cred_file_path).map_err(QueueError::generic)?; + let creds: CredentialsFile = serde_json::from_slice(&bytes).map_err(QueueError::generic)?; + ClientConfig::default() + .with_credentials(creds) + .await + .map_err(QueueError::generic) +} + +/// Making a `ClientConfig` via env vars is possible in two ways: +/// - setting `GOOGLE_APPLICATION_CREDENTIALS` to the file path to have it loaded automatically +/// - setting `GOOGLE_APPLICATION_CREDENTIALS_JSON` to the file contents (avoiding the need for a +/// file on disk). +async fn configure_client_from_env() -> Result { + ClientConfig::default() + .with_auth() + .await + .map_err(QueueError::generic) +} + +async fn get_client(cfg: &GcpPubSubConfig) -> Result { + let config = { + if let Some(fp) = &cfg.credentials_file { + tracing::trace!("reading gcp creds from file: {}", fp.display()); + configure_client_from_file(&fp).await? + } else { + tracing::trace!("reading gcp creds from env"); + configure_client_from_env().await? + } + }; + Client::new(config).await.map_err(QueueError::generic) +} + +impl GcpPubSubConsumer { + async fn new( + client: Client, + subscription_id: String, + registry: Decoders, + ) -> Result { + Ok(Self { + client, + registry, + subscription_id: Arc::new(subscription_id), + }) + } +} + +impl GcpPubSubProducer { + async fn new(client: Client, topic_id: String, registry: Encoders) -> Result { + let topic = client.topic(&topic_id); + // Only warn if the topic doesn't exist at this point. + // If it gets created after the fact, we should be able to still use it when available, + // otherwise if it's still missing at that time, error. + if !topic.exists(None).await.map_err(QueueError::generic)? { + tracing::warn!("topic {} does not exist", &topic_id); + } + Ok(Self { + client, + registry, + topic_id: Arc::new(topic_id), + }) + } +} + +#[async_trait] +impl QueueBackend for GcpPubSubBackend { + type Config = GcpPubSubConfig; + + type PayloadIn = Payload; + type PayloadOut = Payload; + + type Producer = GcpPubSubProducer; + type Consumer = GcpPubSubConsumer; + + async fn new_pair( + config: Self::Config, + custom_encoders: Encoders, + custom_decoders: Decoders, + ) -> Result<(GcpPubSubProducer, GcpPubSubConsumer), QueueError> { + let client = get_client(&config).await?; + Ok(( + GcpPubSubProducer::new(client.clone(), config.topic_id, custom_encoders).await?, + GcpPubSubConsumer::new(client, config.subscription_id, custom_decoders).await?, + )) + } + + async fn producing_half( + config: Self::Config, + custom_encoders: EncoderRegistry, + ) -> Result { + let client = get_client(&config).await?; + GcpPubSubProducer::new(client, config.topic_id, custom_encoders).await + } + + async fn consuming_half( + config: Self::Config, + custom_decoders: DecoderRegistry, + ) -> Result { + let client = get_client(&config).await?; + GcpPubSubConsumer::new(client, config.subscription_id, custom_decoders).await + } +} + +pub struct GcpPubSubProducer { + client: Client, + registry: Encoders, + topic_id: Arc, +} + +impl std::fmt::Debug for GcpPubSubProducer { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("GcpPubSubProducer") + .field("topic_id", &self.topic_id) + .finish() + } +} + +#[async_trait] +impl QueueProducer for GcpPubSubProducer { + type Payload = Payload; + + fn get_custom_encoders(&self) -> &HashMap>> { + self.registry.as_ref() + } + + async fn send_raw(&self, payload: &Self::Payload) -> Result<(), QueueError> { + let msg = PubsubMessage { + data: payload.to_vec(), + ..Default::default() + }; + + // N.b. defer the creation of a publisher/topic until needed. Helps recover when + // the topic does not yet exist, but will soon. + // Might be more expensive to recreate each time, but overall more reliable. + let topic = self.client.topic(&self.topic_id); + + // Publishing to a non-existent topic will cause the publisher to wait (forever?) + // Giving this error will allow dependents to handle the error case immediately when this + // happens, instead of holding the connection open indefinitely. + if !topic.exists(None).await.map_err(QueueError::generic)? { + return Err(QueueError::Generic( + format!("topic {} does not exist", &topic.id()).into(), + )); + } + // FIXME: may need to expose `PublisherConfig` to caller so they can tweak this + let publisher = topic.new_publisher(None); + let awaiter = publisher.publish(msg).await; + awaiter.get().await.map_err(QueueError::generic)?; + Ok(()) + } + + async fn send_serde_json(&self, payload: &P) -> Result<(), QueueError> { + self.send_raw(&serde_json::to_vec(&payload)?).await + } +} + +pub struct GcpPubSubConsumer { + client: Client, + registry: Decoders, + subscription_id: Arc, +} +impl std::fmt::Debug for GcpPubSubConsumer { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("GcpPubSubConsumer") + .field("subscription_id", &self.subscription_id) + .finish() + } +} + +async fn subscription(client: &Client, subscription_id: &str) -> Result { + let subscription = client.subscription(subscription_id); + if !subscription + .exists(None) + .await + .map_err(QueueError::generic)? + { + return Err(QueueError::Generic( + format!("subscription {} does not exist", &subscription_id).into(), + )); + } + Ok(subscription) +} + +#[async_trait] +impl QueueConsumer for GcpPubSubConsumer { + type Payload = Payload; + + async fn receive(&mut self) -> Result { + let subscription = subscription(&self.client, &self.subscription_id).await?; + let mut stream = subscription + .subscribe(None) + .await + .map_err(QueueError::generic)?; + + let mut recv_msg = stream.next().await.ok_or_else(|| QueueError::NoData)?; + // FIXME: would be nice to avoid having to move the data out here. + // While it's possible to ack via a subscription and an ack_id, nack is only + // possible via a `ReceiveMessage`. This means we either need to hold 2 copies of + // the payload, or move the bytes out so they can be returned _outside of the Acker_. + let payload = recv_msg.message.data.drain(..).collect(); + Ok(Delivery { + decoders: self.registry.clone(), + acker: Box::new(GcpPubSubAcker { + recv_msg, + subscription_id: self.subscription_id.clone(), + }), + payload: Some(payload), + }) + } +} + +pub struct GcpPubSubAcker { + recv_msg: ReceivedMessage, + subscription_id: Arc, +} + +impl std::fmt::Debug for GcpPubSubAcker { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("GcpPubSubAcker") + .field("ack_id", &self.recv_msg.ack_id()) + .field("message_id", &self.recv_msg.message.message_id) + .field("subscription_id", &self.subscription_id) + .finish() + } +} + +#[async_trait] +impl Acker for GcpPubSubAcker { + async fn ack(&mut self) -> Result<(), QueueError> { + self.recv_msg.ack().await.map_err(QueueError::generic) + } + + async fn nack(&mut self) -> Result<(), QueueError> { + self.recv_msg.nack().await.map_err(QueueError::generic) + } +} diff --git a/omniqueue/src/backends/mod.rs b/omniqueue/src/backends/mod.rs index a346c28..8d4c2a9 100644 --- a/omniqueue/src/backends/mod.rs +++ b/omniqueue/src/backends/mod.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "gcp_pubsub")] +pub mod gcp_pubsub; #[cfg(feature = "memory_queue")] pub mod memory_queue; #[cfg(feature = "rabbitmq")] diff --git a/omniqueue/tests/gcp_pubsub.rs b/omniqueue/tests/gcp_pubsub.rs new file mode 100644 index 0000000..b56d1f6 --- /dev/null +++ b/omniqueue/tests/gcp_pubsub.rs @@ -0,0 +1,198 @@ +//! Support for Google Cloud Pub/Sub. +//! +//! In this system subscriptions are like queue bindings to topics. +//! Consumers need a subscription id to start receiving messages. +//! We don't have any public API for managing/creating/deleting subscriptions in this module, so +//! this is left to the user to do via whatever method they like. +//! +//! - +//! - +//! - (how to publish messages ad hoc, helpful for debugging) +//! +//! Don't have a better place to mention this just yet. +//! When testing against the gcloud emulator, you need to set `PUBSUB_EMULATOR_HOST` to the bind +//! address, and `PUBSUB_PROJECT_ID` (matching however the emulator was configured). +//! This should bypass the need for credentials and so on. +//! ```sh +//! export PUBSUB_EMULATOR_HOST=localhost:8085 +//! export PUBSUB_PROJECT_ID=local-project +//! ``` +//! > N.b. the rust client hardcodes the project id to `local-project` when it sees the +//! > `PUBSUB_EMULATOR_HOST` env var in use, so if you see errors about resources not found etc, it +//! > might be because of a project mismatch. +//! +//! To use the `gcloud` CLI with the emulator (useful for creating topics/subscriptions), you have +//! to configure an override for the pubsub API: +//! +//! ```sh +//! gcloud config set api_endpoint_overrides/pubsub "http://${PUBSUB_EMULATOR_HOST}/" +//! ``` +//! Note that you'll also have to manually set it back to the default as needed: +//! ```sh +//! gcloud config unset api_endpoint_overrides/pubsub +//! ``` +//! h/t +//! +//! Also note, and this is odd, `gcloud` will prompt you to login even though you're trying to +//! connect to a local process. +//! Go ahead and follow the prompts to get your CLI working. +//! +//! I guess it still wants to talk to GCP for other interactions other than the pubsub API. +//! +//! ## Example `gcloud` usage: +//! ```sh +//! gcloud --project=local-project pubsub topics create tester +//! gcloud --project=local-project pubsub topics create dead-letters +//! gcloud --project=local-project pubsub subscriptions create local-1 \ +//! --topic=tester \ +//! --dead-letter-topic=dead-letters \ +//! --max-delivery-attempts=5 +//! gcloud --project local-project pubsub topics publish tester --message='{"my message": 1234}' +//! ``` +//! + +use google_cloud_googleapis::pubsub::v1::DeadLetterPolicy; +use google_cloud_pubsub::client::{Client, ClientConfig}; +use google_cloud_pubsub::subscription::SubscriptionConfig; + +use omniqueue::backends::gcp_pubsub::{GcpPubSubBackend, GcpPubSubConfig}; +use omniqueue::queue::{ + consumer::QueueConsumer, producer::QueueProducer, QueueBackend, QueueBuilder, Static, +}; +use serde::{Deserialize, Serialize}; + +const DEFAULT_PUBSUB_EMULATOR_HOST: &str = "localhost:8085"; +/// Controls how many times a message can be nack'd before it lands on the dead letter topic. +const MAX_DELIVERY_ATTEMPTS: i32 = 5; + +async fn get_client() -> Client { + // The `Default` impl for `ClientConfig` looks for this env var. When set it branches for + // local-mode use using the addr in the env var and a hardcoded project id of `local-project`. + if std::env::var("PUBSUB_EMULATOR_HOST").is_err() { + std::env::set_var("PUBSUB_EMULATOR_HOST", DEFAULT_PUBSUB_EMULATOR_HOST); + } + Client::new(ClientConfig::default()).await.unwrap() +} + +// FIXME: check to see if there's already one of these in here somewhere +fn random_chars() -> impl Iterator { + std::iter::repeat_with(fastrand::alphanumeric) +} + +/// Returns a [`QueueBuilder`] configured to connect to the SQS instance spawned by the file +/// `testing-docker-compose.yaml` in the root of the repository. +/// +/// Additionally this will make a temporary queue on that instance for the duration of the test such +/// as to ensure there is no stealing.w +async fn make_test_queue() -> QueueBuilder { + let client = get_client().await; + + let topic_name: String = "topic-".chars().chain(random_chars().take(8)).collect(); + // Need to define a dead letter topic to avoid the "bad" test cases from pulling the nacked + // messages again and again. + let dead_letter_topic_name: String = "topic-".chars().chain(random_chars().take(8)).collect(); + let subscription_name: String = "subscription-" + .chars() + .chain(random_chars().take(8)) + .collect(); + + let topic = client.create_topic(&topic_name, None, None).await.unwrap(); + let dead_letter_topic = client + .create_topic(&dead_letter_topic_name, None, None) + .await + .unwrap(); + let subscription = client + .create_subscription( + &subscription_name, + &topic_name, + SubscriptionConfig { + // Messages published to the topic need to supply a unique ID to make use of this + enable_exactly_once_delivery: true, + dead_letter_policy: Some(DeadLetterPolicy { + dead_letter_topic: dead_letter_topic.fully_qualified_name().into(), + max_delivery_attempts: MAX_DELIVERY_ATTEMPTS, + }), + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + let config = GcpPubSubConfig { + topic_id: topic.id(), + subscription_id: subscription.id(), + credentials_file: None, + }; + + GcpPubSubBackend::builder(config) +} + +#[tokio::test] +async fn test_raw_send_recv() { + let payload = b"{\"test\": \"data\"}"; + let (p, mut c) = make_test_queue().await.build_pair().await.unwrap(); + + p.send_raw(&payload.to_vec()).await.unwrap(); + + let d = c.receive().await.unwrap(); + assert_eq!(d.borrow_payload().unwrap(), payload); +} + +#[tokio::test] +async fn test_bytes_send_recv() { + let payload = b"hello"; + let (p, mut c) = make_test_queue().await.build_pair().await.unwrap(); + + p.send_bytes(payload).await.unwrap(); + + let d = c.receive().await.unwrap(); + assert_eq!(d.borrow_payload().unwrap(), payload); + d.ack().await.unwrap(); +} + +#[derive(Debug, Deserialize, Serialize, PartialEq)] +pub struct ExType { + a: u8, +} + +#[tokio::test] +async fn test_serde_send_recv() { + let payload = ExType { a: 2 }; + let (p, mut c) = make_test_queue().await.build_pair().await.unwrap(); + + p.send_serde_json(&payload).await.unwrap(); + + let d = c.receive().await.unwrap(); + assert_eq!(d.payload_serde_json::().unwrap().unwrap(), payload); + d.ack().await.unwrap(); +} + +#[tokio::test] +async fn test_custom_send_recv() { + let payload = ExType { a: 3 }; + + let encoder = |p: &ExType| Ok(vec![p.a]); + let decoder = |p: &Vec| { + Ok(ExType { + a: p.first().copied().unwrap_or(0), + }) + }; + + let (p, mut c) = make_test_queue() + .await + .with_encoder(encoder) + .with_decoder(decoder) + .build_pair() + .await + .unwrap(); + + p.send_custom(&payload).await.unwrap(); + + let d = c.receive().await.unwrap(); + assert_eq!(d.payload_custom::().unwrap().unwrap(), payload); + + // Because it doesn't use JSON, this should fail: + d.payload_serde_json::().unwrap_err(); + d.ack().await.unwrap(); +} diff --git a/testing-docker-compose.yml b/testing-docker-compose.yml index 21c71c0..c8b8b6a 100644 --- a/testing-docker-compose.yml +++ b/testing-docker-compose.yml @@ -72,3 +72,13 @@ services: REDIS_NODES: "redis-cluster redis-cluster-node-0 redis-cluster-node-1 redis-cluster-node-2 redis-cluster-node-3 redis-cluster-node-4" ports: - "6385:6379" + + gcp-pubsub: + image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators + ports: + - "8085:8085" + command: [ + "gcloud", "beta", "emulators", "pubsub", "start", + "--project", "local-project", + "--host-port", "0.0.0.0:8085" + ]