Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bridge: Add kafka receiver output #1345

Merged
merged 1 commit into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 71 additions & 19 deletions bridge/svix-bridge-plugin-kafka/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use rdkafka::{consumer::StreamConsumer, error::KafkaResult, ClientConfig};
use rdkafka::{
consumer::StreamConsumer, error::KafkaResult, producer::FutureProducer, ClientConfig,
};
use serde::Deserialize;
use svix_bridge_types::{SenderInput, SenderOutputOpts, TransformationConfig};
use svix_bridge_types::{ReceiverOutput, SenderInput, SenderOutputOpts, TransformationConfig};

use crate::{input::KafkaConsumer, Result};
use crate::{input::KafkaConsumer, KafkaProducer, Result};

#[derive(Clone, Deserialize)]
pub struct KafkaInputOpts {
Expand Down Expand Up @@ -40,25 +42,45 @@ impl KafkaInputOpts {
// messages are committed manually after webhook delivery was successful.
.set("enable.auto.commit", "false");

match self.security_protocol {
KafkaSecurityProtocol::Plaintext => {
config.set("security.protocol", "plaintext");
}
KafkaSecurityProtocol::Ssl => {
config.set("security.protocol", "ssl");
}
KafkaSecurityProtocol::SaslSsl {
sasl_username,
sasl_password,
} => {
config
.set("security.protocol", "sasl_ssl")
.set("sasl.mechanisms", "SCRAM-SHA-512")
.set("sasl.username", sasl_username)
.set("sasl.password", sasl_password);
self.security_protocol.apply(&mut config);
if let Some(debug_contexts) = self.debug_contexts {
if !debug_contexts.is_empty() {
config.set("debug", debug_contexts);
}
}

config.create()
}
}

#[derive(Clone, Deserialize)]
pub struct KafkaOutputOpts {
/// Comma-separated list of addresses.
///
/// Example: `localhost:9094`
#[serde(rename = "kafka_bootstrap_brokers")]
pub bootstrap_brokers: String,

/// The topic to listen to.
#[serde(rename = "kafka_topic")]
pub topic: String,

/// The value for 'security.protocol' in the kafka config.
#[serde(flatten)]
pub security_protocol: KafkaSecurityProtocol,

/// The 'debug' config value for rdkafka - enables more verbose logging
/// for the selected 'contexts'
#[serde(rename = "kafka_debug_contexts")]
pub debug_contexts: Option<String>,
}

impl KafkaOutputOpts {
pub(crate) fn create_producer(self) -> KafkaResult<FutureProducer> {
let mut config = ClientConfig::new();
config.set("bootstrap.servers", self.bootstrap_brokers);

self.security_protocol.apply(&mut config);
if let Some(debug_contexts) = self.debug_contexts {
if !debug_contexts.is_empty() {
config.set("debug", debug_contexts);
Expand All @@ -82,6 +104,29 @@ pub enum KafkaSecurityProtocol {
},
}

impl KafkaSecurityProtocol {
fn apply(self, config: &mut ClientConfig) {
match self {
KafkaSecurityProtocol::Plaintext => {
config.set("security.protocol", "plaintext");
}
KafkaSecurityProtocol::Ssl => {
config.set("security.protocol", "ssl");
}
KafkaSecurityProtocol::SaslSsl {
sasl_username,
sasl_password,
} => {
config
.set("security.protocol", "sasl_ssl")
.set("sasl.mechanisms", "SCRAM-SHA-512")
.set("sasl.username", sasl_username)
.set("sasl.password", sasl_password);
}
}
}
}

pub fn into_sender_input(
name: String,
opts: KafkaInputOpts,
Expand All @@ -95,3 +140,10 @@ pub fn into_sender_input(
output,
)?))
}

pub fn into_receiver_output(
name: String,
opts: KafkaOutputOpts,
) -> Result<Box<dyn ReceiverOutput>> {
Ok(Box::new(KafkaProducer::new(name, opts)?))
}
7 changes: 6 additions & 1 deletion bridge/svix-bridge-plugin-kafka/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
mod config;
mod error;
mod input;
mod output;

pub use self::{
config::{into_sender_input, KafkaInputOpts, KafkaSecurityProtocol},
config::{
into_receiver_output, into_sender_input, KafkaInputOpts, KafkaOutputOpts,
KafkaSecurityProtocol,
},
error::{Error, Result},
input::KafkaConsumer,
output::KafkaProducer,
};
48 changes: 48 additions & 0 deletions bridge/svix-bridge-plugin-kafka/src/output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use rdkafka::{
error::KafkaError,
producer::{FutureProducer, FutureRecord},
util::Timeout,
};
use svix_bridge_types::{async_trait, BoxError, ForwardRequest, ReceiverOutput};

use crate::config::KafkaOutputOpts;

/// Forwards webhook payloads to kafka.
pub struct KafkaProducer {
name: String,
topic: String,
producer: FutureProducer,
}

impl KafkaProducer {
pub fn new(name: String, opts: KafkaOutputOpts) -> Result<Self, KafkaError> {
let topic = opts.topic.clone();
let producer = opts.create_producer()?;

Ok(Self {
name,
topic,
producer,
})
}
}

#[async_trait]
impl ReceiverOutput for KafkaProducer {
fn name(&self) -> &str {
&self.name
}

async fn handle(&self, request: ForwardRequest) -> Result<(), BoxError> {
self.producer
.send(
FutureRecord::<(), _>::to(&self.topic)
.payload(&serde_json::to_vec(&request.payload)?),
Timeout::Never,
)
.await
.map_err(|(e, _msg)| e)?;

Ok(())
}
}
51 changes: 3 additions & 48 deletions bridge/svix-bridge-plugin-kafka/tests/it/kafka_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
use std::time::Duration;

use rdkafka::{
admin::{AdminClient, NewTopic, TopicReplication},
client::DefaultClientContext,
producer::{FutureProducer, FutureRecord},
types::RDKafkaErrorCode,
util::Timeout,
ClientConfig,
};
Expand All @@ -25,6 +22,8 @@ use wiremock::{
Mock, MockServer, ResponseTemplate,
};

use crate::{create_topic, delete_topic, kafka_admin_client, BROKER_HOST};

#[ctor::ctor]
fn test_setup() {
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
Expand All @@ -44,10 +43,8 @@ fn test_setup() {

/// Time to wait for the plugin to connect.
const CONNECT_WAIT_TIME: Duration = Duration::from_secs(10);
/// Teimt to wait for the plugin to receive a message sent by a test.
/// Time to wait for the plugin to receive a message sent by a test.
const CONSUME_WAIT_TIME: Duration = Duration::from_secs(1);
/// These tests assume a "vanilla" rabbitmq instance, using the default port, creds, exchange...
const BROKER_HOST: &str = "localhost:9094";

fn get_test_plugin(
svix_url: String,
Expand Down Expand Up @@ -87,14 +84,6 @@ fn kafka_producer() -> FutureProducer {
.unwrap()
}

fn kafka_admin_client() -> AdminClient<DefaultClientContext> {
// create does block I/O, but we don't care in tests
ClientConfig::new()
.set("bootstrap.servers", BROKER_HOST)
.create()
.unwrap()
}

async fn publish(producer: &FutureProducer, topic: &str, payload: &[u8]) {
info!(topic, "publishing message");
producer
Expand All @@ -106,40 +95,6 @@ async fn publish(producer: &FutureProducer, topic: &str, payload: &[u8]) {
.unwrap();
}

async fn create_topic(admin_client: &AdminClient<DefaultClientContext>, topic: &str) {
let new_topic = NewTopic::new(topic, 1, TopicReplication::Fixed(1));
if let Err(e) = admin_client
.create_topics(&[new_topic], &Default::default())
.await
{
if e.rdkafka_error_code() != Some(RDKafkaErrorCode::TopicAlreadyExists) {
panic!("{e}");
}
}
}

async fn delete_topic(admin_client: &AdminClient<DefaultClientContext>, topic: &str) {
admin_client
.delete_topics(&[topic], &Default::default())
.await
.unwrap();
}

macro_rules! unique_topic_name {
() => {
&format!(
"test_{}_{}",
file!()
.split('/')
.next_back()
.unwrap()
.strip_suffix(".rs")
.unwrap(),
line!()
)
};
}

/// Push a msg on the queue.
/// Check to see if the svix server sees a request.
#[tokio::test]
Expand Down
64 changes: 64 additions & 0 deletions bridge/svix-bridge-plugin-kafka/tests/it/kafka_producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::{sync::Arc, time::Duration};

use rdkafka::{
consumer::{Consumer, StreamConsumer},
ClientConfig, Message,
};
use serde_json::json;
use svix_bridge_plugin_kafka::{KafkaOutputOpts, KafkaProducer};
use svix_bridge_types::{ForwardRequest, ReceiverOutput as _};

use crate::{create_topic, delete_topic, kafka_admin_client, BROKER_HOST};

/// Time to wait for the consumer to be properly listening.
const LISTEN_WAIT_TIME: Duration = Duration::from_secs(5);

#[tokio::test]
async fn test_produce_ok() {
let topic = unique_topic_name!();
let admin_client = kafka_admin_client();
create_topic(&admin_client, topic).await;

// Start listening for messages
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", BROKER_HOST)
.set("group.id", "svix_bridge_test_group_id")
.create()
.unwrap();

consumer.subscribe(&[topic]).unwrap();

let consumer = Arc::new(consumer);
let recv_join_hdl = tokio::spawn({
let consumer = consumer.clone();
async move { consumer.recv().await.unwrap().detach() }
});
tokio::time::sleep(LISTEN_WAIT_TIME).await;

let payload = json!({ "test": "payload" });
let payload_s = payload.to_string();

// Only then actually send a message
let producer = KafkaProducer::new(
"test".into(),
KafkaOutputOpts {
bootstrap_brokers: BROKER_HOST.to_owned(),
topic: topic.to_owned(),
security_protocol: svix_bridge_plugin_kafka::KafkaSecurityProtocol::Plaintext,
debug_contexts: None,
},
)
.unwrap();
producer.handle(ForwardRequest { payload }).await.unwrap();

// Assert that the message is received
let msg = recv_join_hdl.await.unwrap();
assert_eq!(msg.payload(), Some(payload_s.as_bytes()));

// Assert that no further messages are received in the next second
tokio::time::timeout(Duration::from_secs(1), consumer.recv())
.await
.expect_err("there must be no further messages");

delete_topic(&admin_client, topic).await;
}
53 changes: 53 additions & 0 deletions bridge/svix-bridge-plugin-kafka/tests/it/main.rs
Original file line number Diff line number Diff line change
@@ -1 +1,54 @@
use rdkafka::{
admin::{AdminClient, NewTopic, TopicReplication},
client::DefaultClientContext,
types::RDKafkaErrorCode,
ClientConfig,
};

/// These tests assume a "vanilla" kafka instance, using the default port, creds, exchange...
const BROKER_HOST: &str = "localhost:9094";

fn kafka_admin_client() -> AdminClient<DefaultClientContext> {
// create does block I/O, but we don't care in tests
ClientConfig::new()
.set("bootstrap.servers", BROKER_HOST)
.create()
.unwrap()
}

async fn create_topic(admin_client: &AdminClient<DefaultClientContext>, topic: &str) {
let new_topic = NewTopic::new(topic, 1, TopicReplication::Fixed(1));
if let Err(e) = admin_client
.create_topics(&[new_topic], &Default::default())
.await
{
if e.rdkafka_error_code() != Some(RDKafkaErrorCode::TopicAlreadyExists) {
panic!("{e}");
}
}
}

async fn delete_topic(admin_client: &AdminClient<DefaultClientContext>, topic: &str) {
admin_client
.delete_topics(&[topic], &Default::default())
.await
.unwrap();
}

macro_rules! unique_topic_name {
() => {
&format!(
"test_{}_{}",
file!()
.split('/')
.next_back()
.unwrap()
.strip_suffix(".rs")
.unwrap(),
line!()
)
};
}

mod kafka_consumer;
mod kafka_producer;
Loading