Skip to content

Commit

Permalink
wip: test
Browse files Browse the repository at this point in the history
And again it doesn't f_ing work locally
  • Loading branch information
svix-jplatte committed Jun 20, 2024
1 parent 0509e2a commit 8b6e6be
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 49 deletions.
2 changes: 1 addition & 1 deletion bridge/svix-bridge-plugin-kafka/src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct KafkaProducer {
}

impl KafkaProducer {
pub(crate) fn new(name: String, opts: KafkaOutputOpts) -> Result<Self, KafkaError> {
pub fn new(name: String, opts: KafkaOutputOpts) -> Result<Self, KafkaError> {
let topic = opts.topic.clone();
let producer = opts.create_producer()?;

Expand Down
51 changes: 3 additions & 48 deletions bridge/svix-bridge-plugin-kafka/tests/it/kafka_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
use std::time::Duration;

use rdkafka::{
admin::{AdminClient, NewTopic, TopicReplication},
client::DefaultClientContext,
producer::{FutureProducer, FutureRecord},
types::RDKafkaErrorCode,
util::Timeout,
ClientConfig,
};
Expand All @@ -25,6 +22,8 @@ use wiremock::{
Mock, MockServer, ResponseTemplate,
};

use crate::{create_topic, delete_topic, kafka_admin_client, BROKER_HOST};

#[ctor::ctor]
fn test_setup() {
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
Expand All @@ -44,10 +43,8 @@ fn test_setup() {

/// Time to wait for the plugin to connect.
const CONNECT_WAIT_TIME: Duration = Duration::from_secs(10);
/// Teimt to wait for the plugin to receive a message sent by a test.
/// Time to wait for the plugin to receive a message sent by a test.
const CONSUME_WAIT_TIME: Duration = Duration::from_secs(1);
/// These tests assume a "vanilla" rabbitmq instance, using the default port, creds, exchange...
const BROKER_HOST: &str = "localhost:9094";

fn get_test_plugin(
svix_url: String,
Expand Down Expand Up @@ -87,14 +84,6 @@ fn kafka_producer() -> FutureProducer {
.unwrap()
}

fn kafka_admin_client() -> AdminClient<DefaultClientContext> {
// create does block I/O, but we don't care in tests
ClientConfig::new()
.set("bootstrap.servers", BROKER_HOST)
.create()
.unwrap()
}

async fn publish(producer: &FutureProducer, topic: &str, payload: &[u8]) {
info!(topic, "publishing message");
producer
Expand All @@ -106,40 +95,6 @@ async fn publish(producer: &FutureProducer, topic: &str, payload: &[u8]) {
.unwrap();
}

async fn create_topic(admin_client: &AdminClient<DefaultClientContext>, topic: &str) {
let new_topic = NewTopic::new(topic, 1, TopicReplication::Fixed(1));
if let Err(e) = admin_client
.create_topics(&[new_topic], &Default::default())
.await
{
if e.rdkafka_error_code() != Some(RDKafkaErrorCode::TopicAlreadyExists) {
panic!("{e}");
}
}
}

async fn delete_topic(admin_client: &AdminClient<DefaultClientContext>, topic: &str) {
admin_client
.delete_topics(&[topic], &Default::default())
.await
.unwrap();
}

macro_rules! unique_topic_name {
() => {
&format!(
"test_{}_{}",
file!()
.split('/')
.next_back()
.unwrap()
.strip_suffix(".rs")
.unwrap(),
line!()
)
};
}

/// Push a msg on the queue.
/// Check to see if the svix server sees a request.
#[tokio::test]
Expand Down
57 changes: 57 additions & 0 deletions bridge/svix-bridge-plugin-kafka/tests/it/kafka_producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::time::Duration;

use rdkafka::{
consumer::{Consumer, StreamConsumer},
ClientConfig, Message,
};
use serde_json::json;
use svix_bridge_plugin_kafka::{KafkaOutputOpts, KafkaProducer};
use svix_bridge_types::{ForwardRequest, ReceiverOutput as _};

use crate::{create_topic, delete_topic, kafka_admin_client, BROKER_HOST};

/// Time to wait for the consumer to be properly subscriber.
const SUBSCRIBER_WAIT_TIME: Duration = Duration::from_secs(5);

#[tokio::test]
async fn test_produce_ok() {
let topic = unique_topic_name!();
let admin_client = kafka_admin_client();
create_topic(&admin_client, topic).await;

let producer = KafkaProducer::new(
"test".into(),
KafkaOutputOpts {
bootstrap_brokers: BROKER_HOST.to_owned(),
topic: topic.to_owned(),
security_protocol: svix_bridge_plugin_kafka::KafkaSecurityProtocol::Plaintext,
debug_contexts: None,
},
)
.unwrap();

let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", BROKER_HOST)
.set("group.id", "svix_bridge_test_group_id")
.create()
.unwrap();

consumer.subscribe(&[topic]).unwrap();
tokio::time::sleep(SUBSCRIBER_WAIT_TIME).await;
dbg!();

let payload = json!({ "test": "payload" });
let payload_s = payload.to_string();
producer.handle(ForwardRequest { payload }).await.unwrap();
dbg!();

let msg = consumer.recv().await.unwrap();
dbg!();
assert_eq!(msg.payload(), Some(payload_s.as_bytes()));

tokio::time::timeout(Duration::from_secs(1), consumer.recv())
.await
.expect_err("there must be no further messages");

delete_topic(&admin_client, topic).await;
}
53 changes: 53 additions & 0 deletions bridge/svix-bridge-plugin-kafka/tests/it/main.rs
Original file line number Diff line number Diff line change
@@ -1 +1,54 @@
use rdkafka::{
admin::{AdminClient, NewTopic, TopicReplication},
client::DefaultClientContext,
types::RDKafkaErrorCode,
ClientConfig,
};

/// These tests assume a "vanilla" kafka instance, using the default port, creds, exchange...
const BROKER_HOST: &str = "localhost:9094";

fn kafka_admin_client() -> AdminClient<DefaultClientContext> {
// create does block I/O, but we don't care in tests
ClientConfig::new()
.set("bootstrap.servers", BROKER_HOST)
.create()
.unwrap()
}

async fn create_topic(admin_client: &AdminClient<DefaultClientContext>, topic: &str) {
let new_topic = NewTopic::new(topic, 1, TopicReplication::Fixed(1));
if let Err(e) = admin_client
.create_topics(&[new_topic], &Default::default())
.await
{
if e.rdkafka_error_code() != Some(RDKafkaErrorCode::TopicAlreadyExists) {
panic!("{e}");
}
}
}

async fn delete_topic(admin_client: &AdminClient<DefaultClientContext>, topic: &str) {
admin_client
.delete_topics(&[topic], &Default::default())
.await
.unwrap();
}

macro_rules! unique_topic_name {
() => {
&format!(
"test_{}_{}",
file!()
.split('/')
.next_back()
.unwrap()
.strip_suffix(".rs")
.unwrap(),
line!()
)
};
}

mod kafka_consumer;
mod kafka_producer;

0 comments on commit 8b6e6be

Please sign in to comment.