Skip to content

Commit

Permalink
add optional sasl params to kafka config (#1568)
Browse files Browse the repository at this point in the history
  • Loading branch information
mikekosulin authored Dec 23, 2024
1 parent d2d5fd9 commit c4d203f
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 0 deletions.
22 changes: 22 additions & 0 deletions builtin/bins/dkron-executor-kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,28 @@ func (s *Kafka) ExecuteImpl(args *dktypes.ExecuteRequest) ([]byte, error) {
}
}

if args.Config["saslUsername"] != "" && args.Config["saslPassword"] != "" {
config.Net.SASL.Enable = true
config.Net.SASL.User = args.Config["saslUsername"]
config.Net.SASL.Password = args.Config["saslPassword"]
config.Net.SASL.Handshake = true

if args.Config["saslMechanism"] == "sha512" {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
} else if args.Config["saslMechanism"] == "sha256" {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
} else {
return output.Bytes(), errors.New("invalid SASL mechanism, must be 'sha256' or 'sha512'")
}

}

brokers := strings.Split(args.Config["brokerAddress"], ",")
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
Expand Down
48 changes: 48 additions & 0 deletions builtin/bins/dkron-executor-kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,51 @@ func TestProduceExecuteWithoutKey(t *testing.T) {
t.Fatal(err)
}
}

func TestProduceExecuteWithSASL_SHA256(t *testing.T) {
pa := &dktypes.ExecuteRequest{
JobName: "testJobWithSASL_SHA256",
Config: map[string]string{
"topic": "test",
"brokerAddress": "testaddress",
"message": "{\"hello\":11}",
"saslUsername": "test",
"saslPassword": "dfdffs",
"saslMechanism": "sha256",
"tlsEnable": "true",
"tlsInsecureSkipVerify": "true",
"debug": "true",
},
}
kafka := &Kafka{}
output, err := kafka.Execute(pa, nil)
fmt.Println(string(output.Output))
fmt.Println(err)
if err != nil {
t.Fatal(err)
}
}

func TestProduceExecuteWithSASL_SHA512(t *testing.T) {
pa := &dktypes.ExecuteRequest{
JobName: "testJobWithSASL_SHA512",
Config: map[string]string{
"topic": "test",
"brokerAddress": "testaddress",
"message": "{\"hello\":11}",
"saslUsername": "test",
"saslPassword": "dfdffs",
"saslMechanism": "sha512",
"tlsEnable": "true",
"tlsInsecureSkipVerify": "true",
"debug": "true",
},
}
kafka := &Kafka{}
output, err := kafka.Execute(pa, nil)
fmt.Println(string(output.Output))
fmt.Println(err)
if err != nil {
t.Fatal(err)
}
}
37 changes: 37 additions & 0 deletions builtin/bins/dkron-executor-kafka/scram_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"crypto/sha256"
"crypto/sha512"

"github.com/xdg-go/scram"
)

var (
SHA256 scram.HashGeneratorFcn = sha256.New
SHA512 scram.HashGeneratorFcn = sha512.New
)

type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
github.com/vmware/govmomi v0.18.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,12 @@ github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQ
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/vmware/govmomi v0.18.0 h1:f7QxSmP7meCtoAmiKZogvVbLInT+CZx6Px6K5rYsJZo=
github.com/vmware/govmomi v0.18.0/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
Expand Down Expand Up @@ -915,6 +921,7 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
Expand Down
3 changes: 3 additions & 0 deletions website/docs/usage/executors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ message: The body of the message to produce
topic: The Kafka topic for this message
tlsEnable: Enables TLS if set to true. Optional
tlsInsecureSkipVerify: Disables verification of the remote SSL certificate's validity if set to true. Optional
saslUsername: The SASL username for authentication. If set, saslPassword and saslMechanism must also be provided.
saslPassword: The SASL password for authentication. If set, saslUsername and saslMechanism must also be provided.
saslMechanism: The SASL SCRAM mechanism to use, either "sha256" or "sha512". This is required if both saslUsername and saslPassword are provided.
debug: Turns on debugging output if not empty
```

Expand Down

0 comments on commit c4d203f

Please sign in to comment.