From ee57516fc6737e6ea6244b26a6f9534948b8e67a Mon Sep 17 00:00:00 2001 From: Mike Date: Thu, 31 Oct 2024 00:05:13 +0300 Subject: [PATCH] add optional sasl params to kafka config --- builtin/bins/dkron-executor-kafka/kafka.go | 22 +++++++++ .../bins/dkron-executor-kafka/kafka_test.go | 48 +++++++++++++++++++ .../bins/dkron-executor-kafka/scram_client.go | 37 ++++++++++++++ go.mod | 3 ++ go.sum | 7 +++ website/docs/usage/executors/kafka.md | 3 ++ 6 files changed, 120 insertions(+) create mode 100644 builtin/bins/dkron-executor-kafka/scram_client.go diff --git a/builtin/bins/dkron-executor-kafka/kafka.go b/builtin/bins/dkron-executor-kafka/kafka.go index e819307a5..6bab02894 100644 --- a/builtin/bins/dkron-executor-kafka/kafka.go +++ b/builtin/bins/dkron-executor-kafka/kafka.go @@ -77,6 +77,28 @@ func (s *Kafka) ExecuteImpl(args *dktypes.ExecuteRequest) ([]byte, error) { } } + if args.Config["saslUsername"] != "" && args.Config["saslPassword"] != "" { + config.Net.SASL.Enable = true + config.Net.SASL.User = args.Config["saslUsername"] + config.Net.SASL.Password = args.Config["saslPassword"] + config.Net.SASL.Handshake = true + + if args.Config["saslMechanism"] == "sha512" { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: SHA512} + } + config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + } else if args.Config["saslMechanism"] == "sha256" { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: SHA256} + } + config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 + } else { + return output.Bytes(), errors.New("invalid SASL mechanism, must be 'sha256' or 'sha512'") + } + + } + brokers := strings.Split(args.Config["brokerAddress"], ",") producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { diff --git a/builtin/bins/dkron-executor-kafka/kafka_test.go b/builtin/bins/dkron-executor-kafka/kafka_test.go index 8486d2fa9..0d4f66d17 100644 --- a/builtin/bins/dkron-executor-kafka/kafka_test.go +++ b/builtin/bins/dkron-executor-kafka/kafka_test.go @@ -45,3 +45,51 @@ func TestProduceExecuteWithoutKey(t *testing.T) { t.Fatal(err) } } + +func TestProduceExecuteWithSASL_SHA256(t *testing.T) { + pa := &dktypes.ExecuteRequest{ + JobName: "testJobWithSASL_SHA256", + Config: map[string]string{ + "topic": "test", + "brokerAddress": "testaddress", + "message": "{\"hello\":11}", + "saslUsername": "test", + "saslPassword": "dfdffs", + "saslMechanism": "sha256", + "tlsEnable": "true", + "tlsInsecureSkipVerify": "true", + "debug": "true", + }, + } + kafka := &Kafka{} + output, err := kafka.Execute(pa, nil) + fmt.Println(string(output.Output)) + fmt.Println(err) + if err != nil { + t.Fatal(err) + } +} + +func TestProduceExecuteWithSASL_SHA512(t *testing.T) { + pa := &dktypes.ExecuteRequest{ + JobName: "testJobWithSASL_SHA512", + Config: map[string]string{ + "topic": "test", + "brokerAddress": "testaddress", + "message": "{\"hello\":11}", + "saslUsername": "test", + "saslPassword": "dfdffs", + "saslMechanism": "sha512", + "tlsEnable": "true", + "tlsInsecureSkipVerify": "true", + "debug": "true", + }, + } + kafka := &Kafka{} + output, err := kafka.Execute(pa, nil) + fmt.Println(string(output.Output)) + fmt.Println(err) + if err != nil { + t.Fatal(err) + } +} diff --git a/builtin/bins/dkron-executor-kafka/scram_client.go b/builtin/bins/dkron-executor-kafka/scram_client.go new file mode 100644 index 000000000..5ccf92a2f --- /dev/null +++ b/builtin/bins/dkron-executor-kafka/scram_client.go @@ -0,0 +1,37 @@ +package main + +import ( + "crypto/sha256" + "crypto/sha512" + + "github.com/xdg-go/scram" +) + +var ( + SHA256 scram.HashGeneratorFcn = sha256.New + SHA512 scram.HashGeneratorFcn = sha512.New +) + +type XDGSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +func (x *XDGSCRAMClient) Done() bool { + return x.ClientConversation.Done() +} diff --git a/go.mod b/go.mod index 5366a668b..c3eb3c3eb 100644 --- a/go.mod +++ b/go.mod @@ -204,6 +204,9 @@ require ( github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect github.com/vmware/govmomi v0.18.0 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect diff --git a/go.sum b/go.sum index f236a2b20..b9e0a21d2 100644 --- a/go.sum +++ b/go.sum @@ -694,6 +694,12 @@ github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQ github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/vmware/govmomi v0.18.0 h1:f7QxSmP7meCtoAmiKZogvVbLInT+CZx6Px6K5rYsJZo= github.com/vmware/govmomi v0.18.0/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= @@ -914,6 +920,7 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= diff --git a/website/docs/usage/executors/kafka.md b/website/docs/usage/executors/kafka.md index a4eda3f77..7ec574c86 100644 --- a/website/docs/usage/executors/kafka.md +++ b/website/docs/usage/executors/kafka.md @@ -13,6 +13,9 @@ message: The body of the message to produce topic: The Kafka topic for this message tlsEnable: Enables TLS if set to true. Optional tlsInsecureSkipVerify: Disables verification of the remote SSL certificate's validity if set to true. Optional +saslUsername: The SASL username for authentication. If set, saslPassword and saslMechanism must also be provided. +saslPassword: The SASL password for authentication. If set, saslUsername and saslMechanism must also be provided. +saslMechanism: The SASL SCRAM mechanism to use, either "sha256" or "sha512". This is required if both saslUsername and saslPassword are provided. debug: Turns on debugging output if not empty ```