diff --git a/bridge/svix-bridge-plugin-queue/src/config.rs b/bridge/svix-bridge-plugin-queue/src/config.rs index 216f5d0e5..ca9f34f87 100644 --- a/bridge/svix-bridge-plugin-queue/src/config.rs +++ b/bridge/svix-bridge-plugin-queue/src/config.rs @@ -12,45 +12,38 @@ pub use crate::{ sqs::{SqsInputOpts, SqsOutputOpts}, }; -#[derive(Deserialize)] -pub struct QueueSenderConfig { - pub name: String, - pub input: SenderInputOpts, - #[serde(default)] - pub transformation: Option, - pub output: SenderOutputOpts, -} - -impl QueueSenderConfig { - pub fn into_sender_input(self) -> Result, &'static str> { - // FIXME: see if this check is still needed. String transforms worked for the omniqueue redis receiver, I think? - if matches!(self.input, SenderInputOpts::Redis(_)) - && self - .transformation - .as_ref() - .map(|t| t.format() != TransformerInputFormat::Json) - .unwrap_or_default() - { - return Err("redis only supports json formatted transformations"); - } - - Ok(Box::new(QueueSender::new( - self.name, - self.input, - self.transformation, - self.output, - ))) +pub fn into_sender_input( + name: String, + input_opts: QueueInputOpts, + transformation: Option, + output: SenderOutputOpts, +) -> Result, &'static str> { + // FIXME: see if this check is still needed. String transforms worked for the omniqueue redis receiver, I think? + if matches!(input_opts, QueueInputOpts::Redis(_)) + && transformation + .as_ref() + .map(|t| t.format() != TransformerInputFormat::Json) + .unwrap_or_default() + { + return Err("redis only supports json formatted transformations"); } + + Ok(Box::new(QueueSender::new( + name, + input_opts, + transformation, + output, + ))) } pub async fn into_receiver_output( name: String, - opts: ReceiverOutputOpts, + opts: QueueOutputOpts, // Annoying to have to pass this, but certain backends (redis) only work with certain transformations (json). transformation: Option<&TransformationConfig>, ) -> Result, crate::Error> { // FIXME: see if this check is still needed. String transforms worked for the omniqueue redis receiver, I think? - if matches!(opts, ReceiverOutputOpts::Redis(_)) + if matches!(opts, QueueOutputOpts::Redis(_)) && transformation .as_ref() .map(|t| t.format() != TransformerInputFormat::Json) @@ -68,7 +61,7 @@ pub async fn into_receiver_output( // TODO: feature flag the variants, thread the features down through to generic-queue #[derive(Debug, Deserialize)] #[serde(tag = "type", rename_all = "lowercase")] -pub enum SenderInputOpts { +pub enum QueueInputOpts { #[serde(rename = "gcp-pubsub")] GCPPubSub(GCPPubSubInputOpts), RabbitMQ(RabbitMqInputOpts), @@ -78,7 +71,7 @@ pub enum SenderInputOpts { #[derive(Clone, Debug, Deserialize)] #[serde(tag = "type", rename_all = "lowercase")] -pub enum ReceiverOutputOpts { +pub enum QueueOutputOpts { #[serde(rename = "gcp-pubsub")] GCPPubSub(GCPPubSubOutputOpts), RabbitMQ(RabbitMqOutputOpts), @@ -92,9 +85,9 @@ mod tests { SenderOutputOpts, SvixSenderOutputOpts, TransformationConfig, TransformerInputFormat, }; - use super::{into_receiver_output, QueueSenderConfig}; + use super::{into_receiver_output, into_sender_input}; use crate::{ - config::{ReceiverOutputOpts, SenderInputOpts}, + config::{QueueInputOpts, QueueOutputOpts}, redis::{RedisInputOpts, RedisOutputOpts}, }; @@ -102,41 +95,40 @@ mod tests { // Revisit after `omniqueue` adoption. #[test] fn redis_sender_with_string_transformation_is_err() { - let cfg = QueueSenderConfig { - name: "redis-with-string-transformation".to_string(), - input: SenderInputOpts::Redis(RedisInputOpts { - dsn: "".to_string(), - max_connections: 0, - reinsert_on_nack: false, - queue_key: "".to_string(), - delayed_queue_key: None, - consumer_group: "".to_string(), - consumer_name: "".to_string(), - ack_deadline_ms: 2_000, - }), - transformation: Some(TransformationConfig::Explicit { + let input_opts = QueueInputOpts::Redis(RedisInputOpts { + dsn: "".to_string(), + max_connections: 0, + reinsert_on_nack: false, + queue_key: "".to_string(), + delayed_queue_key: None, + consumer_group: "".to_string(), + consumer_name: "".to_string(), + ack_deadline_ms: 2_000, + }); + + let err = into_sender_input( + "redis-with-string-transformation".to_owned(), + input_opts, + Some(TransformationConfig::Explicit { format: TransformerInputFormat::String, src: String::new(), }), - output: SenderOutputOpts::Svix(SvixSenderOutputOpts { + SenderOutputOpts::Svix(SvixSenderOutputOpts { token: "".to_string(), options: None, }), - }; - - assert_eq!( - cfg.into_sender_input() - .err() - .expect("invalid config didn't result in error"), - "redis only supports json formatted transformations" ) + .err() + .expect("invalid config didn't result in error"); + + assert_eq!(err, "redis only supports json formatted transformations") } // FIXME: can't support raw payload access for redis because it requires JSON internally. // Revisit after `omniqueue` adoption. #[tokio::test] async fn test_redis_receiver_string_transform_is_err() { - let redis_out = ReceiverOutputOpts::Redis(RedisOutputOpts { + let redis_out = QueueOutputOpts::Redis(RedisOutputOpts { dsn: "".to_string(), max_connections: 0, queue_key: "".to_string(), diff --git a/bridge/svix-bridge-plugin-queue/src/lib.rs b/bridge/svix-bridge-plugin-queue/src/lib.rs index d7f6d394e..6e047d265 100644 --- a/bridge/svix-bridge-plugin-queue/src/lib.rs +++ b/bridge/svix-bridge-plugin-queue/src/lib.rs @@ -18,7 +18,8 @@ mod redis; pub mod sender_input; mod sqs; -use error::Error; +pub use self::config::{into_receiver_output, into_sender_input}; +use self::error::Error; /// Newtype for [`omniqueue::queue::Delivery`]. /// diff --git a/bridge/svix-bridge-plugin-queue/src/receiver_output/mod.rs b/bridge/svix-bridge-plugin-queue/src/receiver_output/mod.rs index dc889722e..99bdc5726 100644 --- a/bridge/svix-bridge-plugin-queue/src/receiver_output/mod.rs +++ b/bridge/svix-bridge-plugin-queue/src/receiver_output/mod.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use omniqueue::DynProducer; use svix_bridge_types::{async_trait, ForwardRequest, ReceiverOutput}; -use crate::{config::ReceiverOutputOpts, error::Result}; +use crate::{config::QueueOutputOpts, error::Result}; #[derive(Clone)] pub struct QueueForwarder { @@ -16,13 +16,13 @@ pub struct QueueForwarder { impl QueueForwarder { pub async fn from_receiver_output_opts( name: String, - opts: ReceiverOutputOpts, + opts: QueueOutputOpts, ) -> Result { let sender = match opts { - ReceiverOutputOpts::GCPPubSub(cfg) => crate::gcp_pubsub::producer(&cfg).await?, - ReceiverOutputOpts::RabbitMQ(cfg) => crate::rabbitmq::producer(&cfg).await?, - ReceiverOutputOpts::Redis(cfg) => crate::redis::producer(&cfg).await?, - ReceiverOutputOpts::SQS(cfg) => crate::sqs::producer(&cfg).await?, + QueueOutputOpts::GCPPubSub(cfg) => crate::gcp_pubsub::producer(&cfg).await?, + QueueOutputOpts::RabbitMQ(cfg) => crate::rabbitmq::producer(&cfg).await?, + QueueOutputOpts::Redis(cfg) => crate::redis::producer(&cfg).await?, + QueueOutputOpts::SQS(cfg) => crate::sqs::producer(&cfg).await?, }; Ok(QueueForwarder { name, diff --git a/bridge/svix-bridge-plugin-queue/src/sender_input/mod.rs b/bridge/svix-bridge-plugin-queue/src/sender_input/mod.rs index b56062208..54050f69e 100644 --- a/bridge/svix-bridge-plugin-queue/src/sender_input/mod.rs +++ b/bridge/svix-bridge-plugin-queue/src/sender_input/mod.rs @@ -4,15 +4,13 @@ use svix_bridge_types::{ TransformerTx, }; -use crate::{ - config::SenderInputOpts, error::Error, gcp_pubsub, rabbitmq, run_inner, sqs, Consumer, -}; +use crate::{config::QueueInputOpts, error::Error, gcp_pubsub, rabbitmq, run_inner, sqs, Consumer}; pub struct QueueSender { name: String, source: String, system: String, - input_opts: SenderInputOpts, + input_opts: QueueInputOpts, transformation: Option, transformer_tx: Option, svix_client: Svix, @@ -24,28 +22,28 @@ impl std::fmt::Debug for QueueSender { } } -fn system_name(opts: &SenderInputOpts) -> &'static str { +fn system_name(opts: &QueueInputOpts) -> &'static str { match opts { - SenderInputOpts::GCPPubSub(_) => "gcp-pubsub", - SenderInputOpts::RabbitMQ(_) => "rabbitmq", - SenderInputOpts::Redis(_) => "redis", - SenderInputOpts::SQS(_) => "sqs", + QueueInputOpts::GCPPubSub(_) => "gcp-pubsub", + QueueInputOpts::RabbitMQ(_) => "rabbitmq", + QueueInputOpts::Redis(_) => "redis", + QueueInputOpts::SQS(_) => "sqs", } } -fn source_name(opts: &SenderInputOpts) -> &str { +fn source_name(opts: &QueueInputOpts) -> &str { match opts { - SenderInputOpts::GCPPubSub(opts) => &opts.subscription_id, - SenderInputOpts::RabbitMQ(opts) => &opts.queue_name, - SenderInputOpts::Redis(opts) => &opts.queue_key, - SenderInputOpts::SQS(opts) => &opts.queue_dsn, + QueueInputOpts::GCPPubSub(opts) => &opts.subscription_id, + QueueInputOpts::RabbitMQ(opts) => &opts.queue_name, + QueueInputOpts::Redis(opts) => &opts.queue_key, + QueueInputOpts::SQS(opts) => &opts.queue_dsn, } } impl QueueSender { pub fn new( name: String, - input: SenderInputOpts, + input: QueueInputOpts, transformation: Option, output: SenderOutputOpts, ) -> Self { @@ -89,10 +87,10 @@ impl Consumer for QueueSender { async fn consumer(&self) -> std::io::Result { Ok(match &self.input_opts { - SenderInputOpts::GCPPubSub(cfg) => gcp_pubsub::consumer(cfg).await, - SenderInputOpts::RabbitMQ(cfg) => rabbitmq::consumer(cfg).await, - SenderInputOpts::Redis(cfg) => crate::redis::consumer(cfg).await, - SenderInputOpts::SQS(cfg) => sqs::consumer(cfg).await, + QueueInputOpts::GCPPubSub(cfg) => gcp_pubsub::consumer(cfg).await, + QueueInputOpts::RabbitMQ(cfg) => rabbitmq::consumer(cfg).await, + QueueInputOpts::Redis(cfg) => crate::redis::consumer(cfg).await, + QueueInputOpts::SQS(cfg) => sqs::consumer(cfg).await, } .map_err(Error::from)?) } diff --git a/bridge/svix-bridge-plugin-queue/tests/it/gcp_pubsub_consumer.rs b/bridge/svix-bridge-plugin-queue/tests/it/gcp_pubsub_consumer.rs index 3926adb82..01fd3ab62 100644 --- a/bridge/svix-bridge-plugin-queue/tests/it/gcp_pubsub_consumer.rs +++ b/bridge/svix-bridge-plugin-queue/tests/it/gcp_pubsub_consumer.rs @@ -13,7 +13,7 @@ use google_cloud_pubsub::{ }; use serde_json::json; use svix_bridge_plugin_queue::{ - config::{GCPPubSubInputOpts, SenderInputOpts}, + config::{GCPPubSubInputOpts, QueueInputOpts}, sender_input::QueueSender, }; use svix_bridge_types::{ @@ -35,7 +35,7 @@ fn get_test_plugin( ) -> QueueSender { QueueSender::new( "test".into(), - SenderInputOpts::GCPPubSub(GCPPubSubInputOpts { + QueueInputOpts::GCPPubSub(GCPPubSubInputOpts { subscription_id, credentials_file: None, }), diff --git a/bridge/svix-bridge-plugin-queue/tests/it/rabbitmq_consumer.rs b/bridge/svix-bridge-plugin-queue/tests/it/rabbitmq_consumer.rs index 3f889efaa..7c2cd6d16 100644 --- a/bridge/svix-bridge-plugin-queue/tests/it/rabbitmq_consumer.rs +++ b/bridge/svix-bridge-plugin-queue/tests/it/rabbitmq_consumer.rs @@ -10,7 +10,7 @@ use lapin::{ }; use serde_json::json; use svix_bridge_plugin_queue::{ - config::{RabbitMqInputOpts, SenderInputOpts}, + config::{QueueInputOpts, RabbitMqInputOpts}, sender_input::QueueSender, }; use svix_bridge_types::{ @@ -31,7 +31,7 @@ fn get_test_plugin( ) -> QueueSender { QueueSender::new( "test".into(), - SenderInputOpts::RabbitMQ(RabbitMqInputOpts { + QueueInputOpts::RabbitMQ(RabbitMqInputOpts { uri: mq_uri.to_string(), queue_name: queue_name.to_string(), consumer_tag: None, diff --git a/bridge/svix-bridge-plugin-queue/tests/it/redis_stream_consumer.rs b/bridge/svix-bridge-plugin-queue/tests/it/redis_stream_consumer.rs index d4ff4b5c7..eea291774 100644 --- a/bridge/svix-bridge-plugin-queue/tests/it/redis_stream_consumer.rs +++ b/bridge/svix-bridge-plugin-queue/tests/it/redis_stream_consumer.rs @@ -6,7 +6,7 @@ use std::time::Duration; use redis::{AsyncCommands, Client}; use serde_json::json; use svix_bridge_plugin_queue::{ - config::{RedisInputOpts, SenderInputOpts}, + config::{QueueInputOpts, RedisInputOpts}, sender_input::QueueSender, }; use svix_bridge_types::{ @@ -26,7 +26,7 @@ fn get_test_plugin( ) -> QueueSender { QueueSender::new( "test".into(), - SenderInputOpts::Redis(RedisInputOpts { + QueueInputOpts::Redis(RedisInputOpts { dsn: "redis://localhost/".to_owned(), max_connections: 8, reinsert_on_nack: false, diff --git a/bridge/svix-bridge-plugin-queue/tests/it/sqs_consumer.rs b/bridge/svix-bridge-plugin-queue/tests/it/sqs_consumer.rs index 69e3c0135..9343cb414 100644 --- a/bridge/svix-bridge-plugin-queue/tests/it/sqs_consumer.rs +++ b/bridge/svix-bridge-plugin-queue/tests/it/sqs_consumer.rs @@ -8,7 +8,7 @@ use std::time::Duration; use aws_sdk_sqs::Client; use serde_json::json; use svix_bridge_plugin_queue::{ - config::{SenderInputOpts, SqsInputOpts}, + config::{QueueInputOpts, SqsInputOpts}, sender_input::QueueSender, }; use svix_bridge_types::{ @@ -35,7 +35,7 @@ fn get_test_plugin( ) -> QueueSender { QueueSender::new( "test".into(), - SenderInputOpts::SQS(SqsInputOpts { + QueueInputOpts::SQS(SqsInputOpts { queue_dsn, override_endpoint: true, }), diff --git a/bridge/svix-bridge/src/config/mod.rs b/bridge/svix-bridge/src/config/mod.rs index be29e9be9..009449ebc 100644 --- a/bridge/svix-bridge/src/config/mod.rs +++ b/bridge/svix-bridge/src/config/mod.rs @@ -10,10 +10,10 @@ use std::{ use serde::Deserialize; use shellexpand::LookupError; -use svix_bridge_plugin_queue::config::{ - into_receiver_output, QueueSenderConfig, ReceiverOutputOpts as QueueOutOpts, +use svix_bridge_plugin_queue::config::{QueueInputOpts, QueueOutputOpts}; +use svix_bridge_types::{ + ReceiverInputOpts, ReceiverOutput, SenderInput, SenderOutputOpts, TransformationConfig, }; -use svix_bridge_types::{ReceiverInputOpts, ReceiverOutput, SenderInput, TransformationConfig}; use tracing::Level; #[derive(Deserialize)] @@ -144,31 +144,49 @@ pub enum LogFormat { } /// Config for reading messages from plugins and forwarding to Svix. +#[derive(Deserialize)] +pub struct WebhookSenderConfig { + pub name: String, + pub input: SenderInputOpts, + #[serde(default)] + pub transformation: Option, + pub output: SenderOutputOpts, +} + #[derive(Deserialize)] #[serde(untagged)] -pub enum WebhookSenderConfig { - Queue(QueueSenderConfig), +pub enum SenderInputOpts { + Queue(QueueInputOpts), } impl WebhookSenderConfig { - pub fn name(&self) -> &str { - match self { - WebhookSenderConfig::Queue(cfg) => &cfg.name, + pub fn into_sender_input(self) -> Result, &'static str> { + match self.input { + SenderInputOpts::Queue(input_opts) => svix_bridge_plugin_queue::into_sender_input( + self.name, + input_opts, + self.transformation, + self.output, + ), } } +} + +impl WebhookSenderConfig { + pub fn name(&self) -> &str { + &self.name + } + pub fn transformation(&self) -> Option<&TransformationConfig> { - match self { - WebhookSenderConfig::Queue(cfg) => cfg.transformation.as_ref(), - } + self.transformation.as_ref() } } impl TryFrom for Box { type Error = &'static str; + fn try_from(value: WebhookSenderConfig) -> Result { - match value { - WebhookSenderConfig::Queue(backend) => backend.into_sender_input(), - } + value.into_sender_input() } } @@ -179,23 +197,25 @@ pub struct WebhookReceiverConfig { pub input: ReceiverInputOpts, #[serde(default)] pub transformation: Option, - pub output: ReceiverOut, + pub output: ReceiverOutputOpts, } #[derive(Deserialize)] #[serde(untagged)] -pub enum ReceiverOut { - Queue(QueueOutOpts), +pub enum ReceiverOutputOpts { + Queue(QueueOutputOpts), } impl WebhookReceiverConfig { pub async fn into_receiver_output(self) -> std::io::Result> { match self.output { - ReceiverOut::Queue(x) => { - into_receiver_output(self.name.clone(), x, self.transformation.as_ref()) - .await - .map_err(Into::into) - } + ReceiverOutputOpts::Queue(x) => svix_bridge_plugin_queue::into_receiver_output( + self.name.clone(), + x, + self.transformation.as_ref(), + ) + .await + .map_err(Into::into), } } } diff --git a/bridge/svix-bridge/src/config/tests.rs b/bridge/svix-bridge/src/config/tests.rs index 7145c545e..c294d0614 100644 --- a/bridge/svix-bridge/src/config/tests.rs +++ b/bridge/svix-bridge/src/config/tests.rs @@ -1,9 +1,9 @@ use std::collections::HashMap; -use svix_bridge_plugin_queue::config::{QueueSenderConfig, RabbitMqInputOpts, SenderInputOpts}; +use svix_bridge_plugin_queue::config::{QueueInputOpts, RabbitMqInputOpts}; use svix_bridge_types::{SenderOutputOpts, SvixSenderOutputOpts}; -use super::Config; +use super::{Config, SenderInputOpts}; use crate::config::{LogFormat, LogLevel, WebhookSenderConfig}; /// This is meant to be a kitchen sink config, hitting as many possible @@ -455,14 +455,14 @@ fn test_variable_substitution_repeated_lookups() { vars.insert(String::from("SVIX_TOKEN"), String::from("x")); let cfg = Config::from_src(src, Some(&vars)).unwrap(); - if let WebhookSenderConfig::Queue(QueueSenderConfig { + if let WebhookSenderConfig { input: - SenderInputOpts::RabbitMQ(RabbitMqInputOpts { + SenderInputOpts::Queue(QueueInputOpts::RabbitMQ(RabbitMqInputOpts { uri, queue_name, .. - }), + })), output: SenderOutputOpts::Svix(SvixSenderOutputOpts { token, .. }), .. - }) = &cfg.senders[0] + } = &cfg.senders[0] { assert_eq!(uri, "amqp://guest:guest@localhost:5672/%2f"); assert_eq!(queue_name, "one"); @@ -471,14 +471,14 @@ fn test_variable_substitution_repeated_lookups() { panic!("sender did not match expected pattern"); } - if let WebhookSenderConfig::Queue(QueueSenderConfig { + if let WebhookSenderConfig { input: - SenderInputOpts::RabbitMQ(RabbitMqInputOpts { + SenderInputOpts::Queue(QueueInputOpts::RabbitMQ(RabbitMqInputOpts { uri, queue_name, .. - }), + })), output: SenderOutputOpts::Svix(SvixSenderOutputOpts { token, .. }), .. - }) = &cfg.senders[1] + } = &cfg.senders[1] { assert_eq!(uri, "amqp://guest:guest@localhost:5672/%2f"); assert_eq!(queue_name, "two");