Skip to content

Commit

Permalink
add support for GCP PubSub
Browse files Browse the repository at this point in the history
Largely this is a port of code written for `svix-bridge` in the
`generic-queue` crate.

Adds tests modeled after the rest of the backends here, but with
GCP-specific setup and configuration.

Scheduled delivery is not implemented since this is not natively
supported by GCP PubSub. There are workarounds, but they rely on
conventions established by calling code so I'm not sure it makes sense
to codify here in the lib.
  • Loading branch information
svix-onelson committed Aug 10, 2023
1 parent 8934ac0 commit 21dc933
Show file tree
Hide file tree
Showing 5 changed files with 496 additions and 1 deletion.
12 changes: 11 additions & 1 deletion omniqueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ aws-sdk-sqs = { version = "0.25", optional = true }
bb8 = { version = "0.8", optional = true }
bb8-redis = { version = "0.13", optional = true }
futures = { version = "0.3", default-features = false, features = ["async-await", "std"] }
futures-util = { version = "0.3.28", optional = true }
google-cloud-googleapis = { version = "0.10.0", optional = true }
google-cloud-pubsub = { version = "0.18.0", optional = true }
lapin = { version = "2", optional = true }
rdkafka = { version = "0.29", features = ["cmake-build", "ssl", "tracing"] }
redis = { version = "0.23", features = ["tokio-comp", "tokio-native-tls-comp", "streams"], optional = true }
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
thiserror = "1"
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", optional = true }
tracing = "0.1"

[dev-dependencies]
Expand All @@ -31,7 +35,13 @@ tokio-executor-trait = "2.1"
tokio-reactor-trait = "1.1"

[features]
default = ["memory_queue", "rabbitmq", "redis", "redis_cluster", "sqs"]
default = ["memory_queue", "gcp_pubsub", "rabbitmq", "redis", "redis_cluster", "sqs"]
gcp_pubsub = [
"dep:futures-util",
"dep:google-cloud-googleapis",
"dep:google-cloud-pubsub",
"dep:tokio-util",
]
memory_queue = []
rabbitmq = ["dep:lapin"]
redis = ["dep:bb8", "dep:bb8-redis", "dep:redis"]
Expand Down
275 changes: 275 additions & 0 deletions omniqueue/src/backends/gcp_pubsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
use crate::{
decoding::DecoderRegistry,
encoding::{CustomEncoder, EncoderRegistry},
queue::{consumer::QueueConsumer, producer::QueueProducer, Acker, Delivery, QueueBackend},
QueueError,
};
use async_trait::async_trait;
use futures_util::StreamExt;
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::client::{
google_cloud_auth::credentials::CredentialsFile, Client, ClientConfig,
};
use google_cloud_pubsub::subscriber::ReceivedMessage;
use google_cloud_pubsub::subscription::Subscription;
use serde::Serialize;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::{any::TypeId, collections::HashMap};

pub struct GcpPubSubBackend;

type Payload = Vec<u8>;
type Encoders = EncoderRegistry<Payload>;
type Decoders = DecoderRegistry<Payload>;

// FIXME: topic/subscription are each for read/write. Split config up?
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct GcpPubSubConfig {
pub topic_id: String,
pub subscription_id: String,
pub credentials_file: Option<PathBuf>,
}

/// Make a `ClientConfig` from a `CredentialsFile` on disk.
async fn configure_client_from_file<P: AsRef<Path>>(
cred_file_path: P,
) -> Result<ClientConfig, QueueError> {
let bytes = std::fs::read(cred_file_path).map_err(QueueError::generic)?;
let creds: CredentialsFile = serde_json::from_slice(&bytes).map_err(QueueError::generic)?;
ClientConfig::default()
.with_credentials(creds)
.await
.map_err(QueueError::generic)
}

/// Making a `ClientConfig` via env vars is possible in two ways:
/// - setting `GOOGLE_APPLICATION_CREDENTIALS` to the file path to have it loaded automatically
/// - setting `GOOGLE_APPLICATION_CREDENTIALS_JSON` to the file contents (avoiding the need for a
/// file on disk).
async fn configure_client_from_env() -> Result<ClientConfig, QueueError> {
ClientConfig::default()
.with_auth()
.await
.map_err(QueueError::generic)
}

async fn get_client(cfg: &GcpPubSubConfig) -> Result<Client, QueueError> {
let config = {
if let Some(fp) = &cfg.credentials_file {
tracing::trace!("reading gcp creds from file: {}", fp.display());
configure_client_from_file(&fp).await?
} else {
tracing::trace!("reading gcp creds from env");
configure_client_from_env().await?
}
};
Client::new(config).await.map_err(QueueError::generic)
}

impl GcpPubSubConsumer {
async fn new(
client: Client,
subscription_id: String,
registry: Decoders,
) -> Result<Self, QueueError> {
Ok(Self {
client,
registry,
subscription_id: Arc::new(subscription_id),
})
}
}

impl GcpPubSubProducer {
async fn new(client: Client, topic_id: String, registry: Encoders) -> Result<Self, QueueError> {
let topic = client.topic(&topic_id);
// Only warn if the topic doesn't exist at this point.
// If it gets created after the fact, we should be able to still use it when available,
// otherwise if it's still missing at that time, error.
if !topic.exists(None).await.map_err(QueueError::generic)? {
tracing::warn!("topic {} does not exist", &topic_id);
}
Ok(Self {
client,
registry,
topic_id: Arc::new(topic_id),
})
}
}

#[async_trait]
impl QueueBackend for GcpPubSubBackend {
type Config = GcpPubSubConfig;

type PayloadIn = Payload;
type PayloadOut = Payload;

type Producer = GcpPubSubProducer;
type Consumer = GcpPubSubConsumer;

async fn new_pair(
config: Self::Config,
custom_encoders: Encoders,
custom_decoders: Decoders,
) -> Result<(GcpPubSubProducer, GcpPubSubConsumer), QueueError> {
let client = get_client(&config).await?;
Ok((
GcpPubSubProducer::new(client.clone(), config.topic_id, custom_encoders).await?,
GcpPubSubConsumer::new(client, config.subscription_id, custom_decoders).await?,
))
}

async fn producing_half(
config: Self::Config,
custom_encoders: EncoderRegistry<Self::PayloadIn>,
) -> Result<GcpPubSubProducer, QueueError> {
let client = get_client(&config).await?;
GcpPubSubProducer::new(client, config.topic_id, custom_encoders).await
}

async fn consuming_half(
config: Self::Config,
custom_decoders: DecoderRegistry<Self::PayloadOut>,
) -> Result<GcpPubSubConsumer, QueueError> {
let client = get_client(&config).await?;
GcpPubSubConsumer::new(client, config.subscription_id, custom_decoders).await
}
}

pub struct GcpPubSubProducer {
client: Client,
registry: Encoders,
topic_id: Arc<String>,
}

impl std::fmt::Debug for GcpPubSubProducer {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("GcpPubSubProducer")
.field("topic_id", &self.topic_id)
.finish()
}
}

#[async_trait]
impl QueueProducer for GcpPubSubProducer {
type Payload = Payload;

fn get_custom_encoders(&self) -> &HashMap<TypeId, Box<dyn CustomEncoder<Self::Payload>>> {
self.registry.as_ref()
}

async fn send_raw(&self, payload: &Self::Payload) -> Result<(), QueueError> {
let msg = PubsubMessage {
data: payload.to_vec(),
..Default::default()
};

// N.b. defer the creation of a publisher/topic until needed. Helps recover when
// the topic does not yet exist, but will soon.
// Might be more expensive to recreate each time, but overall more reliable.
let topic = self.client.topic(&self.topic_id);

// Publishing to a non-existent topic will cause the publisher to wait (forever?)
// Giving this error will allow dependents to handle the error case immediately when this
// happens, instead of holding the connection open indefinitely.
if !topic.exists(None).await.map_err(QueueError::generic)? {
return Err(QueueError::Generic(
format!("topic {} does not exist", &topic.id()).into(),
));
}
// FIXME: may need to expose `PublisherConfig` to caller so they can tweak this
let publisher = topic.new_publisher(None);
let awaiter = publisher.publish(msg).await;
awaiter.get().await.map_err(QueueError::generic)?;
Ok(())
}

async fn send_serde_json<P: Serialize + Sync>(&self, payload: &P) -> Result<(), QueueError> {
self.send_raw(&serde_json::to_vec(&payload)?).await
}
}

pub struct GcpPubSubConsumer {
client: Client,
registry: Decoders,
subscription_id: Arc<String>,
}
impl std::fmt::Debug for GcpPubSubConsumer {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("GcpPubSubConsumer")
.field("subscription_id", &self.subscription_id)
.finish()
}
}

async fn subscription(client: &Client, subscription_id: &str) -> Result<Subscription, QueueError> {
let subscription = client.subscription(subscription_id);
if !subscription
.exists(None)
.await
.map_err(QueueError::generic)?
{
return Err(QueueError::Generic(
format!("subscription {} does not exist", &subscription_id).into(),
));
}
Ok(subscription)
}

#[async_trait]
impl QueueConsumer for GcpPubSubConsumer {
type Payload = Payload;

async fn receive(&mut self) -> Result<Delivery, QueueError> {
let subscription = subscription(&self.client, &self.subscription_id).await?;
let mut stream = subscription
.subscribe(None)
.await
.map_err(QueueError::generic)?;

let mut recv_msg = stream.next().await.ok_or_else(|| QueueError::NoData)?;
// FIXME: would be nice to avoid having to move the data out here.
// While it's possible to ack via a subscription and an ack_id, nack is only
// possible via a `ReceiveMessage`. This means we either need to hold 2 copies of
// the payload, or move the bytes out so they can be returned _outside of the Acker_.
let payload = recv_msg.message.data.drain(..).collect();
Ok(Delivery {
decoders: self.registry.clone(),
acker: Box::new(GcpPubSubAcker {
recv_msg,
subscription_id: self.subscription_id.clone(),
}),
payload: Some(payload),
})
}
}

pub struct GcpPubSubAcker {
recv_msg: ReceivedMessage,
subscription_id: Arc<String>,
}

impl std::fmt::Debug for GcpPubSubAcker {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("GcpPubSubAcker")
.field("ack_id", &self.recv_msg.ack_id())
.field("message_id", &self.recv_msg.message.message_id)
.field("subscription_id", &self.subscription_id)
.finish()
}
}

#[async_trait]
impl Acker for GcpPubSubAcker {
async fn ack(&mut self) -> Result<(), QueueError> {
self.recv_msg.ack().await.map_err(QueueError::generic)
}

async fn nack(&mut self) -> Result<(), QueueError> {
self.recv_msg.nack().await.map_err(QueueError::generic)
}
}

#[cfg(test)]
mod tests {}
2 changes: 2 additions & 0 deletions omniqueue/src/backends/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "gcp_pubsub")]
pub mod gcp_pubsub;
#[cfg(feature = "memory_queue")]
pub mod memory_queue;
#[cfg(feature = "rabbitmq")]
Expand Down
Loading

0 comments on commit 21dc933

Please sign in to comment.