From 9906369e67af60fdb245b65fc866f139f24d6f07 Mon Sep 17 00:00:00 2001 From: Mathieu Morlon Date: Mon, 8 Nov 2021 23:42:29 +0100 Subject: [PATCH 1/2] Compression codec --- pkg/config/env.go | 33 +++++++++++++++--------------- pkg/handler/data_recorder_kafka.go | 2 ++ 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/pkg/config/env.go b/pkg/config/env.go index 1a8f6d48..41ca9806 100644 --- a/pkg/config/env.go +++ b/pkg/config/env.go @@ -128,22 +128,23 @@ var Config = struct { RecorderFrameOutputMode string `env:"FLAGR_RECORDER_FRAME_OUTPUT_MODE" envDefault:"payload_string"` // Kafka related configurations for data records logging (Flagr Metrics) - RecorderKafkaVersion string `env:"FLAGR_RECORDER_KAFKA_VERSION" envDefault:"0.8.2.0"` - RecorderKafkaBrokers string `env:"FLAGR_RECORDER_KAFKA_BROKERS" envDefault:":9092"` - RecorderKafkaCertFile string `env:"FLAGR_RECORDER_KAFKA_CERTFILE" envDefault:""` - RecorderKafkaKeyFile string `env:"FLAGR_RECORDER_KAFKA_KEYFILE" envDefault:""` - RecorderKafkaCAFile string `env:"FLAGR_RECORDER_KAFKA_CAFILE" envDefault:""` - RecorderKafkaVerifySSL bool `env:"FLAGR_RECORDER_KAFKA_VERIFYSSL" envDefault:"false"` - RecorderKafkaSimpleSSL bool `env:"FLAGR_RECORDER_KAFKA_SIMPLE_SSL" envDefault:"false"` - RecorderKafkaSASLUsername string `env:"FLAGR_RECORDER_KAFKA_SASL_USERNAME" envDefault:""` - RecorderKafkaSASLPassword string `env:"FLAGR_RECORDER_KAFKA_SASL_PASSWORD" envDefault:""` - RecorderKafkaVerbose bool `env:"FLAGR_RECORDER_KAFKA_VERBOSE" envDefault:"true"` - RecorderKafkaTopic string `env:"FLAGR_RECORDER_KAFKA_TOPIC" envDefault:"flagr-records"` - RecorderKafkaRetryMax int `env:"FLAGR_RECORDER_KAFKA_RETRYMAX" envDefault:"5"` - RecorderKafkaRequiredAcks int16 `env:"FLAGR_RECORDER_KAFKA_REQUIRED_ACKS" envDefault:"1"` // 0: no response, 1: wait for local, -1: wait for all - RecorderKafkaFlushFrequency time.Duration `env:"FLAGR_RECORDER_KAFKA_FLUSHFREQUENCY" envDefault:"500ms"` - RecorderKafkaEncrypted bool `env:"FLAGR_RECORDER_KAFKA_ENCRYPTED" envDefault:"false"` - RecorderKafkaEncryptionKey string `env:"FLAGR_RECORDER_KAFKA_ENCRYPTION_KEY" envDefault:""` + RecorderKafkaVersion string `env:"FLAGR_RECORDER_KAFKA_VERSION" envDefault:"0.8.2.0"` + RecorderKafkaBrokers string `env:"FLAGR_RECORDER_KAFKA_BROKERS" envDefault:":9092"` + RecorderKafkaCompressionCodec int8 `env:"FLAGR_RECORDER_KAFKA_COMPRESSION_CODEC" envDefault:":0"` + RecorderKafkaCertFile string `env:"FLAGR_RECORDER_KAFKA_CERTFILE" envDefault:""` + RecorderKafkaKeyFile string `env:"FLAGR_RECORDER_KAFKA_KEYFILE" envDefault:""` + RecorderKafkaCAFile string `env:"FLAGR_RECORDER_KAFKA_CAFILE" envDefault:""` + RecorderKafkaVerifySSL bool `env:"FLAGR_RECORDER_KAFKA_VERIFYSSL" envDefault:"false"` + RecorderKafkaSimpleSSL bool `env:"FLAGR_RECORDER_KAFKA_SIMPLE_SSL" envDefault:"false"` + RecorderKafkaSASLUsername string `env:"FLAGR_RECORDER_KAFKA_SASL_USERNAME" envDefault:""` + RecorderKafkaSASLPassword string `env:"FLAGR_RECORDER_KAFKA_SASL_PASSWORD" envDefault:""` + RecorderKafkaVerbose bool `env:"FLAGR_RECORDER_KAFKA_VERBOSE" envDefault:"true"` + RecorderKafkaTopic string `env:"FLAGR_RECORDER_KAFKA_TOPIC" envDefault:"flagr-records"` + RecorderKafkaRetryMax int `env:"FLAGR_RECORDER_KAFKA_RETRYMAX" envDefault:"5"` + RecorderKafkaRequiredAcks int16 `env:"FLAGR_RECORDER_KAFKA_REQUIRED_ACKS" envDefault:"1"` // 0: no response, 1: wait for local, -1: wait for all + RecorderKafkaFlushFrequency time.Duration `env:"FLAGR_RECORDER_KAFKA_FLUSHFREQUENCY" envDefault:"500ms"` + RecorderKafkaEncrypted bool `env:"FLAGR_RECORDER_KAFKA_ENCRYPTED" envDefault:"false"` + RecorderKafkaEncryptionKey string `env:"FLAGR_RECORDER_KAFKA_ENCRYPTION_KEY" envDefault:""` // Kinesis related configurations for data records logging (Flagr Metrics) RecorderKinesisStreamName string `env:"FLAGR_RECORDER_KINESIS_STREAM_NAME" envDefault:"flagr-records"` diff --git a/pkg/handler/data_recorder_kafka.go b/pkg/handler/data_recorder_kafka.go index eb099122..112c4ad4 100644 --- a/pkg/handler/data_recorder_kafka.go +++ b/pkg/handler/data_recorder_kafka.go @@ -31,6 +31,7 @@ func mustParseKafkaVersion(version string) sarama.KafkaVersion { // NewKafkaRecorder creates a new Kafka recorder var NewKafkaRecorder = func() DataRecorder { cfg := sarama.NewConfig() + tlscfg := createTLSConfiguration( config.Config.RecorderKafkaCertFile, config.Config.RecorderKafkaKeyFile, @@ -49,6 +50,7 @@ var NewKafkaRecorder = func() DataRecorder { cfg.Net.SASL.Password = config.Config.RecorderKafkaSASLPassword } + cfg.Producer.Compression = sarama.CompressionCodec(config.Config.RecorderKafkaCompressionCodec) cfg.Producer.RequiredAcks = sarama.RequiredAcks(config.Config.RecorderKafkaRequiredAcks) cfg.Producer.Retry.Max = config.Config.RecorderKafkaRetryMax cfg.Producer.Flush.Frequency = config.Config.RecorderKafkaFlushFrequency From fad677c41b787bb09b5f93aafb0849feb1e9e806 Mon Sep 17 00:00:00 2001 From: glutamatt Date: Mon, 22 Nov 2021 18:53:48 +0100 Subject: [PATCH 2/2] Apply suggestions from code review Co-authored-by: Zhuojie Zhou --- pkg/config/env.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/env.go b/pkg/config/env.go index 41ca9806..f1f59e62 100644 --- a/pkg/config/env.go +++ b/pkg/config/env.go @@ -130,7 +130,7 @@ var Config = struct { // Kafka related configurations for data records logging (Flagr Metrics) RecorderKafkaVersion string `env:"FLAGR_RECORDER_KAFKA_VERSION" envDefault:"0.8.2.0"` RecorderKafkaBrokers string `env:"FLAGR_RECORDER_KAFKA_BROKERS" envDefault:":9092"` - RecorderKafkaCompressionCodec int8 `env:"FLAGR_RECORDER_KAFKA_COMPRESSION_CODEC" envDefault:":0"` + RecorderKafkaCompressionCodec int8 `env:"FLAGR_RECORDER_KAFKA_COMPRESSION_CODEC" envDefault:"0"` RecorderKafkaCertFile string `env:"FLAGR_RECORDER_KAFKA_CERTFILE" envDefault:""` RecorderKafkaKeyFile string `env:"FLAGR_RECORDER_KAFKA_KEYFILE" envDefault:""` RecorderKafkaCAFile string `env:"FLAGR_RECORDER_KAFKA_CAFILE" envDefault:""`