Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add GCP PubSub support #13

Merged
merged 1 commit into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion omniqueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ aws-sdk-sqs = { version = "0.25", optional = true }
bb8 = { version = "0.8", optional = true }
bb8-redis = { version = "0.13", optional = true }
futures = { version = "0.3", default-features = false, features = ["async-await", "std"] }
futures-util = { version = "0.3.28", optional = true }
google-cloud-googleapis = { version = "0.10.0", optional = true }
google-cloud-pubsub = { version = "0.18.0", optional = true }
lapin = { version = "2", optional = true }
rdkafka = { version = "0.29", features = ["cmake-build", "ssl", "tracing"] }
redis = { version = "0.23", features = ["tokio-comp", "tokio-native-tls-comp", "streams"], optional = true }
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
thiserror = "1"
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", optional = true }
tracing = "0.1"

[dev-dependencies]
Expand All @@ -31,7 +35,13 @@ tokio-executor-trait = "2.1"
tokio-reactor-trait = "1.1"

[features]
default = ["memory_queue", "rabbitmq", "redis", "redis_cluster", "sqs"]
default = ["memory_queue", "gcp_pubsub", "rabbitmq", "redis", "redis_cluster", "sqs"]
gcp_pubsub = [
"dep:futures-util",
"dep:google-cloud-googleapis",
"dep:google-cloud-pubsub",
"dep:tokio-util",
]
memory_queue = []
rabbitmq = ["dep:lapin"]
redis = ["dep:bb8", "dep:bb8-redis", "dep:redis"]
Expand Down
272 changes: 272 additions & 0 deletions omniqueue/src/backends/gcp_pubsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
use crate::{
decoding::DecoderRegistry,
encoding::{CustomEncoder, EncoderRegistry},
queue::{consumer::QueueConsumer, producer::QueueProducer, Acker, Delivery, QueueBackend},
QueueError,
};
use async_trait::async_trait;
use futures_util::StreamExt;
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::client::{
google_cloud_auth::credentials::CredentialsFile, Client, ClientConfig,
};
use google_cloud_pubsub::subscriber::ReceivedMessage;
use google_cloud_pubsub::subscription::Subscription;
use serde::Serialize;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::{any::TypeId, collections::HashMap};

pub struct GcpPubSubBackend;

type Payload = Vec<u8>;
type Encoders = EncoderRegistry<Payload>;
type Decoders = DecoderRegistry<Payload>;

// FIXME: topic/subscription are each for read/write. Split config up?
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct GcpPubSubConfig {
pub topic_id: String,
pub subscription_id: String,
pub credentials_file: Option<PathBuf>,
}

/// Make a `ClientConfig` from a `CredentialsFile` on disk.
async fn configure_client_from_file<P: AsRef<Path>>(
cred_file_path: P,
) -> Result<ClientConfig, QueueError> {
let bytes = std::fs::read(cred_file_path).map_err(QueueError::generic)?;
let creds: CredentialsFile = serde_json::from_slice(&bytes).map_err(QueueError::generic)?;
ClientConfig::default()
.with_credentials(creds)
.await
.map_err(QueueError::generic)
}

/// Making a `ClientConfig` via env vars is possible in two ways:
/// - setting `GOOGLE_APPLICATION_CREDENTIALS` to the file path to have it loaded automatically
/// - setting `GOOGLE_APPLICATION_CREDENTIALS_JSON` to the file contents (avoiding the need for a
/// file on disk).
async fn configure_client_from_env() -> Result<ClientConfig, QueueError> {
ClientConfig::default()
.with_auth()
.await
.map_err(QueueError::generic)
}

async fn get_client(cfg: &GcpPubSubConfig) -> Result<Client, QueueError> {
let config = {
if let Some(fp) = &cfg.credentials_file {
tracing::trace!("reading gcp creds from file: {}", fp.display());
configure_client_from_file(&fp).await?
} else {
tracing::trace!("reading gcp creds from env");
configure_client_from_env().await?
}
};
Client::new(config).await.map_err(QueueError::generic)
}

impl GcpPubSubConsumer {
async fn new(
client: Client,
subscription_id: String,
registry: Decoders,
) -> Result<Self, QueueError> {
Ok(Self {
client,
registry,
subscription_id: Arc::new(subscription_id),
})
}
}

impl GcpPubSubProducer {
async fn new(client: Client, topic_id: String, registry: Encoders) -> Result<Self, QueueError> {
let topic = client.topic(&topic_id);
// Only warn if the topic doesn't exist at this point.
// If it gets created after the fact, we should be able to still use it when available,
// otherwise if it's still missing at that time, error.
if !topic.exists(None).await.map_err(QueueError::generic)? {
tracing::warn!("topic {} does not exist", &topic_id);
}
Ok(Self {
client,
registry,
topic_id: Arc::new(topic_id),
})
}
}

#[async_trait]
impl QueueBackend for GcpPubSubBackend {
type Config = GcpPubSubConfig;

type PayloadIn = Payload;
type PayloadOut = Payload;

type Producer = GcpPubSubProducer;
type Consumer = GcpPubSubConsumer;

async fn new_pair(
config: Self::Config,
custom_encoders: Encoders,
custom_decoders: Decoders,
) -> Result<(GcpPubSubProducer, GcpPubSubConsumer), QueueError> {
let client = get_client(&config).await?;
Ok((
GcpPubSubProducer::new(client.clone(), config.topic_id, custom_encoders).await?,
GcpPubSubConsumer::new(client, config.subscription_id, custom_decoders).await?,
))
}

async fn producing_half(
config: Self::Config,
custom_encoders: EncoderRegistry<Self::PayloadIn>,
) -> Result<GcpPubSubProducer, QueueError> {
let client = get_client(&config).await?;
GcpPubSubProducer::new(client, config.topic_id, custom_encoders).await
}

async fn consuming_half(
config: Self::Config,
custom_decoders: DecoderRegistry<Self::PayloadOut>,
) -> Result<GcpPubSubConsumer, QueueError> {
let client = get_client(&config).await?;
GcpPubSubConsumer::new(client, config.subscription_id, custom_decoders).await
}
}

pub struct GcpPubSubProducer {
client: Client,
registry: Encoders,
topic_id: Arc<String>,
}

impl std::fmt::Debug for GcpPubSubProducer {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("GcpPubSubProducer")
.field("topic_id", &self.topic_id)
.finish()
}
}

#[async_trait]
impl QueueProducer for GcpPubSubProducer {
type Payload = Payload;

fn get_custom_encoders(&self) -> &HashMap<TypeId, Box<dyn CustomEncoder<Self::Payload>>> {
self.registry.as_ref()
}

async fn send_raw(&self, payload: &Self::Payload) -> Result<(), QueueError> {
let msg = PubsubMessage {
data: payload.to_vec(),
..Default::default()
};

// N.b. defer the creation of a publisher/topic until needed. Helps recover when
// the topic does not yet exist, but will soon.
// Might be more expensive to recreate each time, but overall more reliable.
let topic = self.client.topic(&self.topic_id);

// Publishing to a non-existent topic will cause the publisher to wait (forever?)
// Giving this error will allow dependents to handle the error case immediately when this
// happens, instead of holding the connection open indefinitely.
if !topic.exists(None).await.map_err(QueueError::generic)? {
return Err(QueueError::Generic(
format!("topic {} does not exist", &self.topic_id).into(),
));
}
// FIXME: may need to expose `PublisherConfig` to caller so they can tweak this
let publisher = topic.new_publisher(None);
let awaiter = publisher.publish(msg).await;
awaiter.get().await.map_err(QueueError::generic)?;
Ok(())
}

async fn send_serde_json<P: Serialize + Sync>(&self, payload: &P) -> Result<(), QueueError> {
self.send_raw(&serde_json::to_vec(&payload)?).await
}
}

pub struct GcpPubSubConsumer {
client: Client,
registry: Decoders,
subscription_id: Arc<String>,
}
impl std::fmt::Debug for GcpPubSubConsumer {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("GcpPubSubConsumer")
.field("subscription_id", &self.subscription_id)
.finish()
}
}

async fn subscription(client: &Client, subscription_id: &str) -> Result<Subscription, QueueError> {
let subscription = client.subscription(subscription_id);
if !subscription
.exists(None)
.await
.map_err(QueueError::generic)?
{
return Err(QueueError::Generic(
format!("subscription {} does not exist", &subscription_id).into(),
));
}
Ok(subscription)
}

#[async_trait]
impl QueueConsumer for GcpPubSubConsumer {
type Payload = Payload;

async fn receive(&mut self) -> Result<Delivery, QueueError> {
let subscription = subscription(&self.client, &self.subscription_id).await?;
let mut stream = subscription
.subscribe(None)
.await
.map_err(QueueError::generic)?;

let mut recv_msg = stream.next().await.ok_or_else(|| QueueError::NoData)?;
// FIXME: would be nice to avoid having to move the data out here.
// While it's possible to ack via a subscription and an ack_id, nack is only
// possible via a `ReceiveMessage`. This means we either need to hold 2 copies of
// the payload, or move the bytes out so they can be returned _outside of the Acker_.
let payload = recv_msg.message.data.drain(..).collect();
Ok(Delivery {
decoders: self.registry.clone(),
acker: Box::new(GcpPubSubAcker {
recv_msg,
subscription_id: self.subscription_id.clone(),
}),
payload: Some(payload),
})
}
}

pub struct GcpPubSubAcker {
recv_msg: ReceivedMessage,
subscription_id: Arc<String>,
}

impl std::fmt::Debug for GcpPubSubAcker {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("GcpPubSubAcker")
.field("ack_id", &self.recv_msg.ack_id())
.field("message_id", &self.recv_msg.message.message_id)
.field("subscription_id", &self.subscription_id)
.finish()
}
}

#[async_trait]
impl Acker for GcpPubSubAcker {
async fn ack(&mut self) -> Result<(), QueueError> {
self.recv_msg.ack().await.map_err(QueueError::generic)
}

async fn nack(&mut self) -> Result<(), QueueError> {
self.recv_msg.nack().await.map_err(QueueError::generic)
}
}
2 changes: 2 additions & 0 deletions omniqueue/src/backends/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "gcp_pubsub")]
pub mod gcp_pubsub;
#[cfg(feature = "memory_queue")]
pub mod memory_queue;
#[cfg(feature = "rabbitmq")]
Expand Down
Loading