From 8b6e6be6e4702bdc2f6f05f8f705e311127845a1 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Thu, 20 Jun 2024 16:36:20 +0200 Subject: [PATCH] wip: test And again it doesn't f_ing work locally --- bridge/svix-bridge-plugin-kafka/src/output.rs | 2 +- .../tests/it/kafka_consumer.rs | 51 +---------------- .../tests/it/kafka_producer.rs | 57 +++++++++++++++++++ .../svix-bridge-plugin-kafka/tests/it/main.rs | 53 +++++++++++++++++ 4 files changed, 114 insertions(+), 49 deletions(-) create mode 100644 bridge/svix-bridge-plugin-kafka/tests/it/kafka_producer.rs diff --git a/bridge/svix-bridge-plugin-kafka/src/output.rs b/bridge/svix-bridge-plugin-kafka/src/output.rs index 69faac059..beb403ca0 100644 --- a/bridge/svix-bridge-plugin-kafka/src/output.rs +++ b/bridge/svix-bridge-plugin-kafka/src/output.rs @@ -15,7 +15,7 @@ pub struct KafkaProducer { } impl KafkaProducer { - pub(crate) fn new(name: String, opts: KafkaOutputOpts) -> Result { + pub fn new(name: String, opts: KafkaOutputOpts) -> Result { let topic = opts.topic.clone(); let producer = opts.create_producer()?; diff --git a/bridge/svix-bridge-plugin-kafka/tests/it/kafka_consumer.rs b/bridge/svix-bridge-plugin-kafka/tests/it/kafka_consumer.rs index dbf4779a8..30f793740 100644 --- a/bridge/svix-bridge-plugin-kafka/tests/it/kafka_consumer.rs +++ b/bridge/svix-bridge-plugin-kafka/tests/it/kafka_consumer.rs @@ -5,10 +5,7 @@ use std::time::Duration; use rdkafka::{ - admin::{AdminClient, NewTopic, TopicReplication}, - client::DefaultClientContext, producer::{FutureProducer, FutureRecord}, - types::RDKafkaErrorCode, util::Timeout, ClientConfig, }; @@ -25,6 +22,8 @@ use wiremock::{ Mock, MockServer, ResponseTemplate, }; +use crate::{create_topic, delete_topic, kafka_admin_client, BROKER_HOST}; + #[ctor::ctor] fn test_setup() { use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -44,10 +43,8 @@ fn test_setup() { /// Time to wait for the plugin to connect. const CONNECT_WAIT_TIME: Duration = Duration::from_secs(10); -/// Teimt to wait for the plugin to receive a message sent by a test. +/// Time to wait for the plugin to receive a message sent by a test. const CONSUME_WAIT_TIME: Duration = Duration::from_secs(1); -/// These tests assume a "vanilla" rabbitmq instance, using the default port, creds, exchange... -const BROKER_HOST: &str = "localhost:9094"; fn get_test_plugin( svix_url: String, @@ -87,14 +84,6 @@ fn kafka_producer() -> FutureProducer { .unwrap() } -fn kafka_admin_client() -> AdminClient { - // create does block I/O, but we don't care in tests - ClientConfig::new() - .set("bootstrap.servers", BROKER_HOST) - .create() - .unwrap() -} - async fn publish(producer: &FutureProducer, topic: &str, payload: &[u8]) { info!(topic, "publishing message"); producer @@ -106,40 +95,6 @@ async fn publish(producer: &FutureProducer, topic: &str, payload: &[u8]) { .unwrap(); } -async fn create_topic(admin_client: &AdminClient, topic: &str) { - let new_topic = NewTopic::new(topic, 1, TopicReplication::Fixed(1)); - if let Err(e) = admin_client - .create_topics(&[new_topic], &Default::default()) - .await - { - if e.rdkafka_error_code() != Some(RDKafkaErrorCode::TopicAlreadyExists) { - panic!("{e}"); - } - } -} - -async fn delete_topic(admin_client: &AdminClient, topic: &str) { - admin_client - .delete_topics(&[topic], &Default::default()) - .await - .unwrap(); -} - -macro_rules! unique_topic_name { - () => { - &format!( - "test_{}_{}", - file!() - .split('/') - .next_back() - .unwrap() - .strip_suffix(".rs") - .unwrap(), - line!() - ) - }; -} - /// Push a msg on the queue. /// Check to see if the svix server sees a request. #[tokio::test] diff --git a/bridge/svix-bridge-plugin-kafka/tests/it/kafka_producer.rs b/bridge/svix-bridge-plugin-kafka/tests/it/kafka_producer.rs new file mode 100644 index 000000000..43ed01110 --- /dev/null +++ b/bridge/svix-bridge-plugin-kafka/tests/it/kafka_producer.rs @@ -0,0 +1,57 @@ +use std::time::Duration; + +use rdkafka::{ + consumer::{Consumer, StreamConsumer}, + ClientConfig, Message, +}; +use serde_json::json; +use svix_bridge_plugin_kafka::{KafkaOutputOpts, KafkaProducer}; +use svix_bridge_types::{ForwardRequest, ReceiverOutput as _}; + +use crate::{create_topic, delete_topic, kafka_admin_client, BROKER_HOST}; + +/// Time to wait for the consumer to be properly subscriber. +const SUBSCRIBER_WAIT_TIME: Duration = Duration::from_secs(5); + +#[tokio::test] +async fn test_produce_ok() { + let topic = unique_topic_name!(); + let admin_client = kafka_admin_client(); + create_topic(&admin_client, topic).await; + + let producer = KafkaProducer::new( + "test".into(), + KafkaOutputOpts { + bootstrap_brokers: BROKER_HOST.to_owned(), + topic: topic.to_owned(), + security_protocol: svix_bridge_plugin_kafka::KafkaSecurityProtocol::Plaintext, + debug_contexts: None, + }, + ) + .unwrap(); + + let consumer: StreamConsumer = ClientConfig::new() + .set("bootstrap.servers", BROKER_HOST) + .set("group.id", "svix_bridge_test_group_id") + .create() + .unwrap(); + + consumer.subscribe(&[topic]).unwrap(); + tokio::time::sleep(SUBSCRIBER_WAIT_TIME).await; + dbg!(); + + let payload = json!({ "test": "payload" }); + let payload_s = payload.to_string(); + producer.handle(ForwardRequest { payload }).await.unwrap(); + dbg!(); + + let msg = consumer.recv().await.unwrap(); + dbg!(); + assert_eq!(msg.payload(), Some(payload_s.as_bytes())); + + tokio::time::timeout(Duration::from_secs(1), consumer.recv()) + .await + .expect_err("there must be no further messages"); + + delete_topic(&admin_client, topic).await; +} diff --git a/bridge/svix-bridge-plugin-kafka/tests/it/main.rs b/bridge/svix-bridge-plugin-kafka/tests/it/main.rs index 726f2afd8..6cc0a90dc 100644 --- a/bridge/svix-bridge-plugin-kafka/tests/it/main.rs +++ b/bridge/svix-bridge-plugin-kafka/tests/it/main.rs @@ -1 +1,54 @@ +use rdkafka::{ + admin::{AdminClient, NewTopic, TopicReplication}, + client::DefaultClientContext, + types::RDKafkaErrorCode, + ClientConfig, +}; + +/// These tests assume a "vanilla" kafka instance, using the default port, creds, exchange... +const BROKER_HOST: &str = "localhost:9094"; + +fn kafka_admin_client() -> AdminClient { + // create does block I/O, but we don't care in tests + ClientConfig::new() + .set("bootstrap.servers", BROKER_HOST) + .create() + .unwrap() +} + +async fn create_topic(admin_client: &AdminClient, topic: &str) { + let new_topic = NewTopic::new(topic, 1, TopicReplication::Fixed(1)); + if let Err(e) = admin_client + .create_topics(&[new_topic], &Default::default()) + .await + { + if e.rdkafka_error_code() != Some(RDKafkaErrorCode::TopicAlreadyExists) { + panic!("{e}"); + } + } +} + +async fn delete_topic(admin_client: &AdminClient, topic: &str) { + admin_client + .delete_topics(&[topic], &Default::default()) + .await + .unwrap(); +} + +macro_rules! unique_topic_name { + () => { + &format!( + "test_{}_{}", + file!() + .split('/') + .next_back() + .unwrap() + .strip_suffix(".rs") + .unwrap(), + line!() + ) + }; +} + mod kafka_consumer; +mod kafka_producer;