From 6717f916dd3e017b3ffea6d1b88720a02cca60ff Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 15 Dec 2022 07:21:57 +0800 Subject: [PATCH] Update kafka config (#499) * Update kafka config Signed-off-by: Kevin Su * nit Signed-off-by: Kevin Su * update cnofig Signed-off-by: Kevin Su * Update pkg/runtime/interfaces/application_configuration.go Co-authored-by: Dan Rammer Signed-off-by: Kevin Su Signed-off-by: Kevin Su Signed-off-by: Kevin Su Co-authored-by: Dan Rammer --- flyteadmin/pkg/async/cloudevent/factory.go | 10 +++++++--- flyteadmin/pkg/async/cloudevent/factory_test.go | 3 +++ .../runtime/interfaces/application_configuration.go | 4 ++-- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/flyteadmin/pkg/async/cloudevent/factory.go b/flyteadmin/pkg/async/cloudevent/factory.go index e7b7368f19..a43d8749b5 100644 --- a/flyteadmin/pkg/async/cloudevent/factory.go +++ b/flyteadmin/pkg/async/cloudevent/factory.go @@ -63,15 +63,19 @@ func NewCloudEventsPublisher(ctx context.Context, config runtimeInterfaces.Cloud return cloudEventImplementations.NewCloudEventsPublisher(&cloudEventImplementations.PubSubSender{Pub: publisher}, scope, config.EventsPublisherConfig.EventTypes) case cloudEventImplementations.Kafka: saramaConfig := sarama.NewConfig() - saramaConfig.Version = config.KafkaConfig.Version + var err error + saramaConfig.Version, err = sarama.ParseKafkaVersion(config.KafkaConfig.Version) + if err != nil { + logger.Fatalf(ctx, "failed to parse kafka version, %v", err) + panic(err) + } sender, err := kafka_sarama.NewSender(config.KafkaConfig.Brokers, saramaConfig, config.EventsPublisherConfig.TopicName) if err != nil { panic(err) } - defer sender.Close(ctx) client, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) if err != nil { - logger.Fatalf(ctx, "failed to create client, %v", err) + logger.Fatalf(ctx, "failed to create kafka client, %v", err) panic(err) } return cloudEventImplementations.NewCloudEventsPublisher(&cloudEventImplementations.KafkaSender{Client: client}, scope, config.EventsPublisherConfig.EventTypes) diff --git a/flyteadmin/pkg/async/cloudevent/factory_test.go b/flyteadmin/pkg/async/cloudevent/factory_test.go index a9b66c38af..00ac69380f 100644 --- a/flyteadmin/pkg/async/cloudevent/factory_test.go +++ b/flyteadmin/pkg/async/cloudevent/factory_test.go @@ -56,7 +56,10 @@ func TestInvalidKafkaConfig(t *testing.T) { Enable: true, Type: implementations.Kafka, EventsPublisherConfig: runtimeInterfaces.EventsPublisherConfig{TopicName: "topic"}, + KafkaConfig: runtimeInterfaces.KafkaConfig{Version: "0.8.2.0"}, } NewCloudEventsPublisher(context.Background(), cfg, promutils.NewTestScope()) + cfg.KafkaConfig = runtimeInterfaces.KafkaConfig{Version: "2.1.0"} + NewCloudEventsPublisher(context.Background(), cfg, promutils.NewTestScope()) t.Errorf("did not panic") } diff --git a/flyteadmin/pkg/runtime/interfaces/application_configuration.go b/flyteadmin/pkg/runtime/interfaces/application_configuration.go index ee053b7a86..16b1f921dc 100644 --- a/flyteadmin/pkg/runtime/interfaces/application_configuration.go +++ b/flyteadmin/pkg/runtime/interfaces/application_configuration.go @@ -1,7 +1,6 @@ package interfaces import ( - "github.com/Shopify/sarama" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/config" @@ -205,7 +204,8 @@ type GCPConfig struct { } type KafkaConfig struct { - Version sarama.KafkaVersion + // The version of Kafka, e.g. 2.1.0, 0.8.2.0 + Version string `json:"version"` // kafka broker addresses Brokers []string `json:"brokers"` }