diff --git a/omniqueue/Cargo.toml b/omniqueue/Cargo.toml index 89887e3..c32923b 100644 --- a/omniqueue/Cargo.toml +++ b/omniqueue/Cargo.toml @@ -16,6 +16,9 @@ aws-sdk-sqs = { version = "0.25", optional = true } bb8 = { version = "0.8", optional = true } bb8-redis = { version = "0.13", optional = true } futures = { version = "0.3", default-features = false, features = ["async-await", "std"] } +futures-util = { version = "0.3.28", optional = true } +google-cloud-googleapis = { version = "0.10.0", optional = true } +google-cloud-pubsub = { version = "0.18.0", optional = true } lapin = { version = "2", optional = true } rdkafka = { version = "0.29", features = ["cmake-build", "ssl", "tracing"] } redis = { version = "0.23", features = ["tokio-comp", "tokio-native-tls-comp", "streams"], optional = true } @@ -23,6 +26,7 @@ serde = { version = "1", features = ["derive", "rc"] } serde_json = "1" thiserror = "1" tokio = { version = "1", features = ["full"] } +tokio-util = { version = "0.7", optional = true } tracing = "0.1" [dev-dependencies] @@ -31,7 +35,13 @@ tokio-executor-trait = "2.1" tokio-reactor-trait = "1.1" [features] -default = ["memory_queue", "rabbitmq", "redis", "redis_cluster", "sqs"] +default = ["memory_queue", "gcp_pubsub", "rabbitmq", "redis", "redis_cluster", "sqs"] +gcp_pubsub = [ + "dep:futures-util", + "dep:google-cloud-googleapis", + "dep:google-cloud-pubsub", + "dep:tokio-util", +] memory_queue = [] rabbitmq = ["dep:lapin"] redis = ["dep:bb8", "dep:bb8-redis", "dep:redis"] diff --git a/omniqueue/src/backends/gcp_pubsub.rs b/omniqueue/src/backends/gcp_pubsub.rs new file mode 100644 index 0000000..16227b4 --- /dev/null +++ b/omniqueue/src/backends/gcp_pubsub.rs @@ -0,0 +1,272 @@ +use crate::{ + decoding::DecoderRegistry, + encoding::{CustomEncoder, EncoderRegistry}, + queue::{consumer::QueueConsumer, producer::QueueProducer, Acker, Delivery, QueueBackend}, + QueueError, +}; +use async_trait::async_trait; +use futures_util::StreamExt; +use google_cloud_googleapis::pubsub::v1::PubsubMessage; +use google_cloud_pubsub::client::{ + google_cloud_auth::credentials::CredentialsFile, Client, ClientConfig, +}; +use google_cloud_pubsub::subscriber::ReceivedMessage; +use google_cloud_pubsub::subscription::Subscription; +use serde::Serialize; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::{any::TypeId, collections::HashMap}; + +pub struct GcpPubSubBackend; + +type Payload = Vec; +type Encoders = EncoderRegistry; +type Decoders = DecoderRegistry; + +// FIXME: topic/subscription are each for read/write. Split config up? +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct GcpPubSubConfig { + pub topic_id: String, + pub subscription_id: String, + pub credentials_file: Option, +} + +/// Make a `ClientConfig` from a `CredentialsFile` on disk. +async fn configure_client_from_file>( + cred_file_path: P, +) -> Result { + let bytes = std::fs::read(cred_file_path).map_err(QueueError::generic)?; + let creds: CredentialsFile = serde_json::from_slice(&bytes).map_err(QueueError::generic)?; + ClientConfig::default() + .with_credentials(creds) + .await + .map_err(QueueError::generic) +} + +/// Making a `ClientConfig` via env vars is possible in two ways: +/// - setting `GOOGLE_APPLICATION_CREDENTIALS` to the file path to have it loaded automatically +/// - setting `GOOGLE_APPLICATION_CREDENTIALS_JSON` to the file contents (avoiding the need for a +/// file on disk). +async fn configure_client_from_env() -> Result { + ClientConfig::default() + .with_auth() + .await + .map_err(QueueError::generic) +} + +async fn get_client(cfg: &GcpPubSubConfig) -> Result { + let config = { + if let Some(fp) = &cfg.credentials_file { + tracing::trace!("reading gcp creds from file: {}", fp.display()); + configure_client_from_file(&fp).await? + } else { + tracing::trace!("reading gcp creds from env"); + configure_client_from_env().await? + } + }; + Client::new(config).await.map_err(QueueError::generic) +} + +impl GcpPubSubConsumer { + async fn new( + client: Client, + subscription_id: String, + registry: Decoders, + ) -> Result { + Ok(Self { + client, + registry, + subscription_id: Arc::new(subscription_id), + }) + } +} + +impl GcpPubSubProducer { + async fn new(client: Client, topic_id: String, registry: Encoders) -> Result { + let topic = client.topic(&topic_id); + // Only warn if the topic doesn't exist at this point. + // If it gets created after the fact, we should be able to still use it when available, + // otherwise if it's still missing at that time, error. + if !topic.exists(None).await.map_err(QueueError::generic)? { + tracing::warn!("topic {} does not exist", &topic_id); + } + Ok(Self { + client, + registry, + topic_id: Arc::new(topic_id), + }) + } +} + +#[async_trait] +impl QueueBackend for GcpPubSubBackend { + type Config = GcpPubSubConfig; + + type PayloadIn = Payload; + type PayloadOut = Payload; + + type Producer = GcpPubSubProducer; + type Consumer = GcpPubSubConsumer; + + async fn new_pair( + config: Self::Config, + custom_encoders: Encoders, + custom_decoders: Decoders, + ) -> Result<(GcpPubSubProducer, GcpPubSubConsumer), QueueError> { + let client = get_client(&config).await?; + Ok(( + GcpPubSubProducer::new(client.clone(), config.topic_id, custom_encoders).await?, + GcpPubSubConsumer::new(client, config.subscription_id, custom_decoders).await?, + )) + } + + async fn producing_half( + config: Self::Config, + custom_encoders: EncoderRegistry, + ) -> Result { + let client = get_client(&config).await?; + GcpPubSubProducer::new(client, config.topic_id, custom_encoders).await + } + + async fn consuming_half( + config: Self::Config, + custom_decoders: DecoderRegistry, + ) -> Result { + let client = get_client(&config).await?; + GcpPubSubConsumer::new(client, config.subscription_id, custom_decoders).await + } +} + +pub struct GcpPubSubProducer { + client: Client, + registry: Encoders, + topic_id: Arc, +} + +impl std::fmt::Debug for GcpPubSubProducer { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("GcpPubSubProducer") + .field("topic_id", &self.topic_id) + .finish() + } +} + +#[async_trait] +impl QueueProducer for GcpPubSubProducer { + type Payload = Payload; + + fn get_custom_encoders(&self) -> &HashMap>> { + self.registry.as_ref() + } + + async fn send_raw(&self, payload: &Self::Payload) -> Result<(), QueueError> { + let msg = PubsubMessage { + data: payload.to_vec(), + ..Default::default() + }; + + // N.b. defer the creation of a publisher/topic until needed. Helps recover when + // the topic does not yet exist, but will soon. + // Might be more expensive to recreate each time, but overall more reliable. + let topic = self.client.topic(&self.topic_id); + + // Publishing to a non-existent topic will cause the publisher to wait (forever?) + // Giving this error will allow dependents to handle the error case immediately when this + // happens, instead of holding the connection open indefinitely. + if !topic.exists(None).await.map_err(QueueError::generic)? { + return Err(QueueError::Generic( + format!("topic {} does not exist", &self.topic_id).into(), + )); + } + // FIXME: may need to expose `PublisherConfig` to caller so they can tweak this + let publisher = topic.new_publisher(None); + let awaiter = publisher.publish(msg).await; + awaiter.get().await.map_err(QueueError::generic)?; + Ok(()) + } + + async fn send_serde_json(&self, payload: &P) -> Result<(), QueueError> { + self.send_raw(&serde_json::to_vec(&payload)?).await + } +} + +pub struct GcpPubSubConsumer { + client: Client, + registry: Decoders, + subscription_id: Arc, +} +impl std::fmt::Debug for GcpPubSubConsumer { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("GcpPubSubConsumer") + .field("subscription_id", &self.subscription_id) + .finish() + } +} + +async fn subscription(client: &Client, subscription_id: &str) -> Result { + let subscription = client.subscription(subscription_id); + if !subscription + .exists(None) + .await + .map_err(QueueError::generic)? + { + return Err(QueueError::Generic( + format!("subscription {} does not exist", &subscription_id).into(), + )); + } + Ok(subscription) +} + +#[async_trait] +impl QueueConsumer for GcpPubSubConsumer { + type Payload = Payload; + + async fn receive(&mut self) -> Result { + let subscription = subscription(&self.client, &self.subscription_id).await?; + let mut stream = subscription + .subscribe(None) + .await + .map_err(QueueError::generic)?; + + let mut recv_msg = stream.next().await.ok_or_else(|| QueueError::NoData)?; + // FIXME: would be nice to avoid having to move the data out here. + // While it's possible to ack via a subscription and an ack_id, nack is only + // possible via a `ReceiveMessage`. This means we either need to hold 2 copies of + // the payload, or move the bytes out so they can be returned _outside of the Acker_. + let payload = recv_msg.message.data.drain(..).collect(); + Ok(Delivery { + decoders: self.registry.clone(), + acker: Box::new(GcpPubSubAcker { + recv_msg, + subscription_id: self.subscription_id.clone(), + }), + payload: Some(payload), + }) + } +} + +pub struct GcpPubSubAcker { + recv_msg: ReceivedMessage, + subscription_id: Arc, +} + +impl std::fmt::Debug for GcpPubSubAcker { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("GcpPubSubAcker") + .field("ack_id", &self.recv_msg.ack_id()) + .field("message_id", &self.recv_msg.message.message_id) + .field("subscription_id", &self.subscription_id) + .finish() + } +} + +#[async_trait] +impl Acker for GcpPubSubAcker { + async fn ack(&mut self) -> Result<(), QueueError> { + self.recv_msg.ack().await.map_err(QueueError::generic) + } + + async fn nack(&mut self) -> Result<(), QueueError> { + self.recv_msg.nack().await.map_err(QueueError::generic) + } +} diff --git a/omniqueue/src/backends/mod.rs b/omniqueue/src/backends/mod.rs index a346c28..8d4c2a9 100644 --- a/omniqueue/src/backends/mod.rs +++ b/omniqueue/src/backends/mod.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "gcp_pubsub")] +pub mod gcp_pubsub; #[cfg(feature = "memory_queue")] pub mod memory_queue; #[cfg(feature = "rabbitmq")] diff --git a/omniqueue/tests/gcp_pubsub.rs b/omniqueue/tests/gcp_pubsub.rs new file mode 100644 index 0000000..6a06316 --- /dev/null +++ b/omniqueue/tests/gcp_pubsub.rs @@ -0,0 +1,198 @@ +//! Support for Google Cloud Pub/Sub. +//! +//! In this system subscriptions are like queue bindings to topics. +//! Consumers need a subscription id to start receiving messages. +//! We don't have any public API for managing/creating/deleting subscriptions in this module, so +//! this is left to the user to do via whatever method they like. +//! +//! - +//! - +//! - (how to publish messages ad hoc, helpful for debugging) +//! +//! Don't have a better place to mention this just yet. +//! When testing against the gcloud emulator, you need to set `PUBSUB_EMULATOR_HOST` to the bind +//! address, and `PUBSUB_PROJECT_ID` (matching however the emulator was configured). +//! This should bypass the need for credentials and so on. +//! ```sh +//! export PUBSUB_EMULATOR_HOST=localhost:8085 +//! export PUBSUB_PROJECT_ID=local-project +//! ``` +//! > N.b. the rust client hardcodes the project id to `local-project` when it sees the +//! > `PUBSUB_EMULATOR_HOST` env var in use, so if you see errors about resources not found etc, it +//! > might be because of a project mismatch. +//! +//! To use the `gcloud` CLI with the emulator (useful for creating topics/subscriptions), you have +//! to configure an override for the pubsub API: +//! +//! ```sh +//! gcloud config set api_endpoint_overrides/pubsub "http://${PUBSUB_EMULATOR_HOST}/" +//! ``` +//! Note that you'll also have to manually set it back to the default as needed: +//! ```sh +//! gcloud config unset api_endpoint_overrides/pubsub +//! ``` +//! h/t +//! +//! Also note, and this is odd, `gcloud` will prompt you to login even though you're trying to +//! connect to a local process. +//! Go ahead and follow the prompts to get your CLI working. +//! +//! I guess it still wants to talk to GCP for other interactions other than the pubsub API. +//! +//! ## Example `gcloud` usage: +//! ```sh +//! gcloud --project=local-project pubsub topics create tester +//! gcloud --project=local-project pubsub topics create dead-letters +//! gcloud --project=local-project pubsub subscriptions create local-1 \ +//! --topic=tester \ +//! --dead-letter-topic=dead-letters \ +//! --max-delivery-attempts=5 +//! gcloud --project local-project pubsub topics publish tester --message='{"my message": 1234}' +//! ``` +//! + +use google_cloud_googleapis::pubsub::v1::DeadLetterPolicy; +use google_cloud_pubsub::client::{Client, ClientConfig}; +use google_cloud_pubsub::subscription::SubscriptionConfig; + +use omniqueue::backends::gcp_pubsub::{GcpPubSubBackend, GcpPubSubConfig}; +use omniqueue::queue::{ + consumer::QueueConsumer, producer::QueueProducer, QueueBackend, QueueBuilder, Static, +}; +use serde::{Deserialize, Serialize}; + +const DEFAULT_PUBSUB_EMULATOR_HOST: &str = "localhost:8085"; +/// Controls how many times a message can be nack'd before it lands on the dead letter topic. +const MAX_DELIVERY_ATTEMPTS: i32 = 5; + +async fn get_client() -> Client { + // The `Default` impl for `ClientConfig` looks for this env var. When set it branches for + // local-mode use using the addr in the env var and a hardcoded project id of `local-project`. + if std::env::var("PUBSUB_EMULATOR_HOST").is_err() { + std::env::set_var("PUBSUB_EMULATOR_HOST", DEFAULT_PUBSUB_EMULATOR_HOST); + } + Client::new(ClientConfig::default()).await.unwrap() +} + +// FIXME: check to see if there's already one of these in here somewhere +fn random_chars() -> impl Iterator { + std::iter::repeat_with(fastrand::alphanumeric) +} + +/// Returns a [`QueueBuilder`] configured to connect to the GCP emulator instance spawned by the +/// file `testing-docker-compose.yaml` in the root of the repository. +/// +/// Additionally this will make a temporary topic/subscription on that instance for the duration of +/// the test such as to ensure there is no stealing. +async fn make_test_queue() -> QueueBuilder { + let client = get_client().await; + + let topic_name: String = "topic-".chars().chain(random_chars().take(8)).collect(); + // Need to define a dead letter topic to avoid the "bad" test cases from pulling the nacked + // messages again and again. + let dead_letter_topic_name: String = "topic-".chars().chain(random_chars().take(8)).collect(); + let subscription_name: String = "subscription-" + .chars() + .chain(random_chars().take(8)) + .collect(); + + let topic = client.create_topic(&topic_name, None, None).await.unwrap(); + let dead_letter_topic = client + .create_topic(&dead_letter_topic_name, None, None) + .await + .unwrap(); + let subscription = client + .create_subscription( + &subscription_name, + &topic_name, + SubscriptionConfig { + // Messages published to the topic need to supply a unique ID to make use of this + enable_exactly_once_delivery: true, + dead_letter_policy: Some(DeadLetterPolicy { + dead_letter_topic: dead_letter_topic.fully_qualified_name().into(), + max_delivery_attempts: MAX_DELIVERY_ATTEMPTS, + }), + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + let config = GcpPubSubConfig { + topic_id: topic.id(), + subscription_id: subscription.id(), + credentials_file: None, + }; + + GcpPubSubBackend::builder(config) +} + +#[tokio::test] +async fn test_raw_send_recv() { + let payload = b"{\"test\": \"data\"}"; + let (p, mut c) = make_test_queue().await.build_pair().await.unwrap(); + + p.send_raw(&payload.to_vec()).await.unwrap(); + + let d = c.receive().await.unwrap(); + assert_eq!(d.borrow_payload().unwrap(), payload); +} + +#[tokio::test] +async fn test_bytes_send_recv() { + let payload = b"hello"; + let (p, mut c) = make_test_queue().await.build_pair().await.unwrap(); + + p.send_bytes(payload).await.unwrap(); + + let d = c.receive().await.unwrap(); + assert_eq!(d.borrow_payload().unwrap(), payload); + d.ack().await.unwrap(); +} + +#[derive(Debug, Deserialize, Serialize, PartialEq)] +pub struct ExType { + a: u8, +} + +#[tokio::test] +async fn test_serde_send_recv() { + let payload = ExType { a: 2 }; + let (p, mut c) = make_test_queue().await.build_pair().await.unwrap(); + + p.send_serde_json(&payload).await.unwrap(); + + let d = c.receive().await.unwrap(); + assert_eq!(d.payload_serde_json::().unwrap().unwrap(), payload); + d.ack().await.unwrap(); +} + +#[tokio::test] +async fn test_custom_send_recv() { + let payload = ExType { a: 3 }; + + let encoder = |p: &ExType| Ok(vec![p.a]); + let decoder = |p: &Vec| { + Ok(ExType { + a: p.first().copied().unwrap_or(0), + }) + }; + + let (p, mut c) = make_test_queue() + .await + .with_encoder(encoder) + .with_decoder(decoder) + .build_pair() + .await + .unwrap(); + + p.send_custom(&payload).await.unwrap(); + + let d = c.receive().await.unwrap(); + assert_eq!(d.payload_custom::().unwrap().unwrap(), payload); + + // Because it doesn't use JSON, this should fail: + d.payload_serde_json::().unwrap_err(); + d.ack().await.unwrap(); +} diff --git a/testing-docker-compose.yml b/testing-docker-compose.yml index 21c71c0..c8b8b6a 100644 --- a/testing-docker-compose.yml +++ b/testing-docker-compose.yml @@ -72,3 +72,13 @@ services: REDIS_NODES: "redis-cluster redis-cluster-node-0 redis-cluster-node-1 redis-cluster-node-2 redis-cluster-node-3 redis-cluster-node-4" ports: - "6385:6379" + + gcp-pubsub: + image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators + ports: + - "8085:8085" + command: [ + "gcloud", "beta", "emulators", "pubsub", "start", + "--project", "local-project", + "--host-port", "0.0.0.0:8085" + ]