From 570340271cd678e31ce7e07eccfa54561b857e08 Mon Sep 17 00:00:00 2001 From: ConductorPete <57824285+ConductorPete@users.noreply.github.com> Date: Thu, 4 Nov 2021 07:15:27 +1000 Subject: [PATCH] Allow passing through a message key for Kafka executor (#1021) * Allow passing through a message key for Kafka executor * Add Kafka message key to website usage page in executors documentation * Add unit test for with/without Kafka executor message key --- builtin/bins/dkron-executor-kafka/kafka.go | 2 ++ .../bins/dkron-executor-kafka/kafka_test.go | 24 +++++++++++++++++-- website/content/usage/executors/kafka.md | 4 +++- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/builtin/bins/dkron-executor-kafka/kafka.go b/builtin/bins/dkron-executor-kafka/kafka.go index 6dea74950..7334f7c25 100644 --- a/builtin/bins/dkron-executor-kafka/kafka.go +++ b/builtin/bins/dkron-executor-kafka/kafka.go @@ -26,6 +26,7 @@ type Kafka struct { // "executor": "kafka", // "executor_config": { // "brokerAddress": "192.168.59.103:9092", // kafka broker url +// "key": "", // "message": "", // "topic": "publishTopic" // } @@ -78,6 +79,7 @@ func (s *Kafka) ExecuteImpl(args *dktypes.ExecuteRequest) ([]byte, error) { msg := &sarama.ProducerMessage{ Topic: args.Config["topic"], + Key: sarama.StringEncoder(args.Config["key"]), Value: sarama.StringEncoder(args.Config["message"]), } diff --git a/builtin/bins/dkron-executor-kafka/kafka_test.go b/builtin/bins/dkron-executor-kafka/kafka_test.go index 79b171f46..2c67a7851 100644 --- a/builtin/bins/dkron-executor-kafka/kafka_test.go +++ b/builtin/bins/dkron-executor-kafka/kafka_test.go @@ -7,12 +7,13 @@ import ( dktypes "github.com/distribworks/dkron/v3/plugin/types" ) -func TestProduceExecute(t *testing.T) { +func TestProduceExecuteWithKey(t *testing.T) { pa := &dktypes.ExecuteRequest{ - JobName: "testJob", + JobName: "testJobWithKey", Config: map[string]string{ "topic": "test", "brokerAddress": "testaddress", + "key": "testkey", "message": "{\"hello\":11}", "debug": "true", }, @@ -25,3 +26,22 @@ func TestProduceExecute(t *testing.T) { t.Fatal(err) } } + +func TestProduceExecuteWithoutKey(t *testing.T) { + pa := &dktypes.ExecuteRequest{ + JobName: "testJobWithoutKey", + Config: map[string]string{ + "topic": "test", + "brokerAddress": "testaddress", + "message": "{\"hello\":11}", + "debug": "true", + }, + } + kafka := &Kafka{} + output, err := kafka.Execute(pa, nil) + fmt.Println(string(output.Output)) + fmt.Println(err) + if err != nil { + t.Fatal(err) + } +} \ No newline at end of file diff --git a/website/content/usage/executors/kafka.md b/website/content/usage/executors/kafka.md index 01c62925a..e0af7c737 100644 --- a/website/content/usage/executors/kafka.md +++ b/website/content/usage/executors/kafka.md @@ -11,7 +11,8 @@ Params ``` brokerAddress: "IP:port" of the broker -message: The message to produce +key: The key of the message to produce +message: The body of the message to produce topic: The Kafka topic for this message debug: Turns on debugging output if not empty ``` @@ -22,6 +23,7 @@ Example "executor": "kafka", "executor_config": { "brokerAddress": "localhost:9092", + "key": "My key", "message": "My message", "topic": "my_topic" }