diff --git a/pkg/async/cloudevent/factory.go b/pkg/async/cloudevent/factory.go index e7b7368f19..a43d8749b5 100644 --- a/pkg/async/cloudevent/factory.go +++ b/pkg/async/cloudevent/factory.go @@ -63,15 +63,19 @@ func NewCloudEventsPublisher(ctx context.Context, config runtimeInterfaces.Cloud return cloudEventImplementations.NewCloudEventsPublisher(&cloudEventImplementations.PubSubSender{Pub: publisher}, scope, config.EventsPublisherConfig.EventTypes) case cloudEventImplementations.Kafka: saramaConfig := sarama.NewConfig() - saramaConfig.Version = config.KafkaConfig.Version + var err error + saramaConfig.Version, err = sarama.ParseKafkaVersion(config.KafkaConfig.Version) + if err != nil { + logger.Fatalf(ctx, "failed to parse kafka version, %v", err) + panic(err) + } sender, err := kafka_sarama.NewSender(config.KafkaConfig.Brokers, saramaConfig, config.EventsPublisherConfig.TopicName) if err != nil { panic(err) } - defer sender.Close(ctx) client, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) if err != nil { - logger.Fatalf(ctx, "failed to create client, %v", err) + logger.Fatalf(ctx, "failed to create kafka client, %v", err) panic(err) } return cloudEventImplementations.NewCloudEventsPublisher(&cloudEventImplementations.KafkaSender{Client: client}, scope, config.EventsPublisherConfig.EventTypes) diff --git a/pkg/async/cloudevent/factory_test.go b/pkg/async/cloudevent/factory_test.go index a9b66c38af..00ac69380f 100644 --- a/pkg/async/cloudevent/factory_test.go +++ b/pkg/async/cloudevent/factory_test.go @@ -56,7 +56,10 @@ func TestInvalidKafkaConfig(t *testing.T) { Enable: true, Type: implementations.Kafka, EventsPublisherConfig: runtimeInterfaces.EventsPublisherConfig{TopicName: "topic"}, + KafkaConfig: runtimeInterfaces.KafkaConfig{Version: "0.8.2.0"}, } NewCloudEventsPublisher(context.Background(), cfg, promutils.NewTestScope()) + cfg.KafkaConfig = runtimeInterfaces.KafkaConfig{Version: "2.1.0"} + NewCloudEventsPublisher(context.Background(), cfg, promutils.NewTestScope()) t.Errorf("did not panic") } diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index ee053b7a86..16b1f921dc 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -1,7 +1,6 @@ package interfaces import ( - "github.com/Shopify/sarama" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/config" @@ -205,7 +204,8 @@ type GCPConfig struct { } type KafkaConfig struct { - Version sarama.KafkaVersion + // The version of Kafka, e.g. 2.1.0, 0.8.2.0 + Version string `json:"version"` // kafka broker addresses Brokers []string `json:"brokers"` }