From ef07e646409395153c2501603df9440c404b880a Mon Sep 17 00:00:00 2001 From: Stephan Renatus Date: Wed, 24 Jan 2024 12:22:52 +0100 Subject: [PATCH] redpanda: allow using SASL and TLS together The Admin HTTP client used in the user setup would before fail when trying to talk to the admin API via HTTP. With TLS enabled, it'll have to use https. Signed-off-by: Stephan Renatus --- modules/redpanda/admin_api.go | 13 +++++++-- modules/redpanda/redpanda.go | 34 +++++++++++++++++++----- modules/redpanda/redpanda_test.go | 44 +++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 8 deletions(-) diff --git a/modules/redpanda/admin_api.go b/modules/redpanda/admin_api.go index dbdcf0435c..4f99d5cd08 100644 --- a/modules/redpanda/admin_api.go +++ b/modules/redpanda/admin_api.go @@ -12,10 +12,19 @@ import ( type AdminAPIClient struct { BaseURL string + client *http.Client } func NewAdminAPIClient(baseURL string) *AdminAPIClient { - return &AdminAPIClient{BaseURL: baseURL} + return &AdminAPIClient{ + BaseURL: baseURL, + client: http.DefaultClient, + } +} + +func (cl *AdminAPIClient) WithHTTPClient(c *http.Client) *AdminAPIClient { + cl.client = c + return cl } type createUserRequest struct { @@ -46,7 +55,7 @@ func (cl *AdminAPIClient) CreateUser(ctx context.Context, username, password str } req.Header.Set("Content-Type", "application/json") - resp, err := http.DefaultClient.Do(req) + resp, err := cl.client.Do(req) if err != nil { return fmt.Errorf("request failed: %w", err) } diff --git a/modules/redpanda/redpanda.go b/modules/redpanda/redpanda.go index 3a30a24e75..9160524226 100644 --- a/modules/redpanda/redpanda.go +++ b/modules/redpanda/redpanda.go @@ -3,9 +3,12 @@ package redpanda import ( "bytes" "context" + "crypto/tls" + "crypto/x509" _ "embed" "fmt" "math" + "net/http" "os" "path/filepath" "text/template" @@ -193,6 +196,11 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, fmt.Errorf("failed to wait for Redpanda readiness: %w", err) } + scheme := "http" + if settings.EnableTLS { + scheme += "s" + } + // 9. Create Redpanda Service Accounts if configured to do so. if len(settings.ServiceAccounts) > 0 { adminAPIPort, err := container.MappedPort(ctx, nat.Port(defaultAdminAPIPort)) @@ -200,8 +208,27 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, fmt.Errorf("failed to get mapped Admin API port: %w", err) } - adminAPIUrl := fmt.Sprintf("http://%v:%d", hostIP, adminAPIPort.Int()) + adminAPIUrl := fmt.Sprintf("%s://%v:%d", scheme, hostIP, adminAPIPort.Int()) adminCl := NewAdminAPIClient(adminAPIUrl) + if settings.EnableTLS { + cert, err := tls.X509KeyPair(settings.cert, settings.key) + if err != nil { + return nil, fmt.Errorf("failed to create admin client with cert: %w", err) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(settings.cert) + adminCl = adminCl.WithHTTPClient(&http.Client{ + Timeout: 5 * time.Second, + Transport: &http.Transport{ + ForceAttemptHTTP2: true, + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + }, + }, + }) + } for username, password := range settings.ServiceAccounts { if err := adminCl.CreateUser(ctx, username, password); err != nil { @@ -210,11 +237,6 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize } } - scheme := "http" - if settings.EnableTLS { - scheme += "s" - } - return &Container{Container: container, urlScheme: scheme}, nil } diff --git a/modules/redpanda/redpanda_test.go b/modules/redpanda/redpanda_test.go index f33ac4221b..2c84d28c03 100644 --- a/modules/redpanda/redpanda_test.go +++ b/modules/redpanda/redpanda_test.go @@ -282,6 +282,50 @@ func TestRedpandaWithTLS(t *testing.T) { require.Error(t, results.FirstErr(), kerr.UnknownTopicOrPartition) } +func TestRedpandaWithTLSAndSASL(t *testing.T) { + cert, err := tls.X509KeyPair(localhostCert, localhostKey) + require.NoError(t, err, "failed to load key pair") + + ctx := context.Background() + + container, err := RunContainer(ctx, + WithTLS(localhostCert, localhostKey), + WithEnableSASL(), + WithEnableKafkaAuthorization(), + WithNewServiceAccount("superuser-1", "test"), + WithSuperusers("superuser-1"), + ) + require.NoError(t, err) + + t.Cleanup(func() { + if err := container.Terminate(ctx); err != nil { + t.Fatalf("failed to terminate container: %s", err) + } + }) + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(localhostCert) + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + } + + broker, err := container.KafkaSeedBroker(ctx) + require.NoError(t, err) + + kafkaCl, err := kgo.NewClient( + kgo.SeedBrokers(broker), + kgo.DialTLSConfig(tlsConfig), + kgo.SASL(scram.Auth{ + User: "superuser-1", + Pass: "test", + }.AsSha256Mechanism()), + ) + require.NoError(t, err) + defer kafkaCl.Close() +} + func TestRedpandaListener_Simple(t *testing.T) { ctx := context.Background()