Skip to content

Commit

Permalink
Update kafka config (flyteorg#499)
Browse files Browse the repository at this point in the history
* Update kafka config

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* update cnofig

Signed-off-by: Kevin Su <[email protected]>

* Update pkg/runtime/interfaces/application_configuration.go

Co-authored-by: Dan Rammer <[email protected]>
Signed-off-by: Kevin Su <[email protected]>

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Co-authored-by: Dan Rammer <[email protected]>
  • Loading branch information
pingsutw and hamersaw authored Dec 14, 2022
1 parent 905e7ee commit 6037335
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
10 changes: 7 additions & 3 deletions pkg/async/cloudevent/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,19 @@ func NewCloudEventsPublisher(ctx context.Context, config runtimeInterfaces.Cloud
return cloudEventImplementations.NewCloudEventsPublisher(&cloudEventImplementations.PubSubSender{Pub: publisher}, scope, config.EventsPublisherConfig.EventTypes)
case cloudEventImplementations.Kafka:
saramaConfig := sarama.NewConfig()
saramaConfig.Version = config.KafkaConfig.Version
var err error
saramaConfig.Version, err = sarama.ParseKafkaVersion(config.KafkaConfig.Version)
if err != nil {
logger.Fatalf(ctx, "failed to parse kafka version, %v", err)
panic(err)
}
sender, err := kafka_sarama.NewSender(config.KafkaConfig.Brokers, saramaConfig, config.EventsPublisherConfig.TopicName)
if err != nil {
panic(err)
}
defer sender.Close(ctx)
client, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
logger.Fatalf(ctx, "failed to create client, %v", err)
logger.Fatalf(ctx, "failed to create kafka client, %v", err)
panic(err)
}
return cloudEventImplementations.NewCloudEventsPublisher(&cloudEventImplementations.KafkaSender{Client: client}, scope, config.EventsPublisherConfig.EventTypes)
Expand Down
3 changes: 3 additions & 0 deletions pkg/async/cloudevent/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ func TestInvalidKafkaConfig(t *testing.T) {
Enable: true,
Type: implementations.Kafka,
EventsPublisherConfig: runtimeInterfaces.EventsPublisherConfig{TopicName: "topic"},
KafkaConfig: runtimeInterfaces.KafkaConfig{Version: "0.8.2.0"},
}
NewCloudEventsPublisher(context.Background(), cfg, promutils.NewTestScope())
cfg.KafkaConfig = runtimeInterfaces.KafkaConfig{Version: "2.1.0"}
NewCloudEventsPublisher(context.Background(), cfg, promutils.NewTestScope())
t.Errorf("did not panic")
}
4 changes: 2 additions & 2 deletions pkg/runtime/interfaces/application_configuration.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package interfaces

import (
"github.com/Shopify/sarama"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/config"
Expand Down Expand Up @@ -205,7 +204,8 @@ type GCPConfig struct {
}

type KafkaConfig struct {
Version sarama.KafkaVersion
// The version of Kafka, e.g. 2.1.0, 0.8.2.0
Version string `json:"version"`
// kafka broker addresses
Brokers []string `json:"brokers"`
}
Expand Down

0 comments on commit 6037335

Please sign in to comment.