diff --git a/modules/redpanda/admin_api.go b/modules/redpanda/admin_api.go index dbdcf0435c..4f99d5cd08 100644 --- a/modules/redpanda/admin_api.go +++ b/modules/redpanda/admin_api.go @@ -12,10 +12,19 @@ import ( type AdminAPIClient struct { BaseURL string + client *http.Client } func NewAdminAPIClient(baseURL string) *AdminAPIClient { - return &AdminAPIClient{BaseURL: baseURL} + return &AdminAPIClient{ + BaseURL: baseURL, + client: http.DefaultClient, + } +} + +func (cl *AdminAPIClient) WithHTTPClient(c *http.Client) *AdminAPIClient { + cl.client = c + return cl } type createUserRequest struct { @@ -46,7 +55,7 @@ func (cl *AdminAPIClient) CreateUser(ctx context.Context, username, password str } req.Header.Set("Content-Type", "application/json") - resp, err := http.DefaultClient.Do(req) + resp, err := cl.client.Do(req) if err != nil { return fmt.Errorf("request failed: %w", err) } diff --git a/modules/redpanda/redpanda.go b/modules/redpanda/redpanda.go index 3a30a24e75..9160524226 100644 --- a/modules/redpanda/redpanda.go +++ b/modules/redpanda/redpanda.go @@ -3,9 +3,12 @@ package redpanda import ( "bytes" "context" + "crypto/tls" + "crypto/x509" _ "embed" "fmt" "math" + "net/http" "os" "path/filepath" "text/template" @@ -193,6 +196,11 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, fmt.Errorf("failed to wait for Redpanda readiness: %w", err) } + scheme := "http" + if settings.EnableTLS { + scheme += "s" + } + // 9. Create Redpanda Service Accounts if configured to do so. if len(settings.ServiceAccounts) > 0 { adminAPIPort, err := container.MappedPort(ctx, nat.Port(defaultAdminAPIPort)) @@ -200,8 +208,27 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, fmt.Errorf("failed to get mapped Admin API port: %w", err) } - adminAPIUrl := fmt.Sprintf("http://%v:%d", hostIP, adminAPIPort.Int()) + adminAPIUrl := fmt.Sprintf("%s://%v:%d", scheme, hostIP, adminAPIPort.Int()) adminCl := NewAdminAPIClient(adminAPIUrl) + if settings.EnableTLS { + cert, err := tls.X509KeyPair(settings.cert, settings.key) + if err != nil { + return nil, fmt.Errorf("failed to create admin client with cert: %w", err) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(settings.cert) + adminCl = adminCl.WithHTTPClient(&http.Client{ + Timeout: 5 * time.Second, + Transport: &http.Transport{ + ForceAttemptHTTP2: true, + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + }, + }, + }) + } for username, password := range settings.ServiceAccounts { if err := adminCl.CreateUser(ctx, username, password); err != nil { @@ -210,11 +237,6 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize } } - scheme := "http" - if settings.EnableTLS { - scheme += "s" - } - return &Container{Container: container, urlScheme: scheme}, nil } diff --git a/modules/redpanda/redpanda_test.go b/modules/redpanda/redpanda_test.go index f33ac4221b..2c84d28c03 100644 --- a/modules/redpanda/redpanda_test.go +++ b/modules/redpanda/redpanda_test.go @@ -282,6 +282,50 @@ func TestRedpandaWithTLS(t *testing.T) { require.Error(t, results.FirstErr(), kerr.UnknownTopicOrPartition) } +func TestRedpandaWithTLSAndSASL(t *testing.T) { + cert, err := tls.X509KeyPair(localhostCert, localhostKey) + require.NoError(t, err, "failed to load key pair") + + ctx := context.Background() + + container, err := RunContainer(ctx, + WithTLS(localhostCert, localhostKey), + WithEnableSASL(), + WithEnableKafkaAuthorization(), + WithNewServiceAccount("superuser-1", "test"), + WithSuperusers("superuser-1"), + ) + require.NoError(t, err) + + t.Cleanup(func() { + if err := container.Terminate(ctx); err != nil { + t.Fatalf("failed to terminate container: %s", err) + } + }) + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(localhostCert) + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + } + + broker, err := container.KafkaSeedBroker(ctx) + require.NoError(t, err) + + kafkaCl, err := kgo.NewClient( + kgo.SeedBrokers(broker), + kgo.DialTLSConfig(tlsConfig), + kgo.SASL(scram.Auth{ + User: "superuser-1", + Pass: "test", + }.AsSha256Mechanism()), + ) + require.NoError(t, err) + defer kafkaCl.Close() +} + func TestRedpandaListener_Simple(t *testing.T) { ctx := context.Background()