Skip to content

Commit

Permalink
[jaeger-v2] Use environment variables in Kafka config (#6028)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Resolves #6027 

## Description of the changes
- Leverages the functionality added in
open-telemetry/opentelemetry-collector#5228 to
remove the logic for rewriting the config and simply setting an
environment variable for the Kafka integration tests.

## How was this change tested?
- The change is to an integration test so the CI passing is a good test
for this change

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 authored Sep 29, 2024
1 parent be48a84 commit 5598400
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 62 deletions.
4 changes: 2 additions & 2 deletions cmd/jaeger/config-kafka-collector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ exporters:
kafka:
brokers:
- localhost:9092
topic: "jaeger-spans"
encoding: otlp_proto
topic: ${env:KAFKA_TOPIC:-jaeger-spans}
encoding: ${env:KAFKA_ENCODING:-otlp_proto}
4 changes: 2 additions & 2 deletions cmd/jaeger/config-kafka-ingester.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ receivers:
kafka:
brokers:
- localhost:9092
topic: "jaeger-spans"
encoding: otlp_proto
topic: ${env:KAFKA_TOPIC:-jaeger-spans}
encoding: ${env:KAFKA_ENCODING:-otlp_proto}
initial_offset: earliest

processors:
Expand Down
10 changes: 10 additions & 0 deletions cmd/jaeger/internal/integration/e2e_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ type E2EStorageIntegration struct {
ConfigFile string
BinaryName string
HealthCheckEndpoint string

// EnvVarOverrides contains a map of environment variables to set.
// The key in the map is the environment variable to override and the value
// is the value of the environment variable to set.
// These variables are set upon initialization and are unset upon cleanup.
EnvVarOverrides map[string]string
}

// Binary is a wrapper around exec.Cmd to help running binaries in tests.
Expand Down Expand Up @@ -107,6 +113,10 @@ func (b *Binary) Start(t *testing.T) {
// it also initialize the SpanWriter and SpanReader below.
// This function should be called before any of the tests start.
func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
// set environment variable overrides
for key, value := range s.EnvVarOverrides {
t.Setenv(key, value)
}
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
if s.BinaryName == "" {
s.BinaryName = "jaeger-v2"
Expand Down
66 changes: 8 additions & 58 deletions cmd/jaeger/internal/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,68 +5,12 @@ package integration

import (
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/jaegertracing/jaeger/plugin/storage/integration"
)

// createConfigWithEncoding rewrites the base configuration files to use the given encoding
// and Kafka topic which are varied between the test runs.
//
// Once OTEL Collector supports default values for env vars
// (https://github.com/open-telemetry/opentelemetry-collector/issues/5228)
// we can change the config to use topic: "${KAFKA_TOPIC:-jaeger-spans}"
// and export a KAFKA_TOPIC var with random topic name in the tests.
func createConfigWithEncoding(t *testing.T, configFile string, targetEncoding string, uniqueTopic string) string {
data, err := os.ReadFile(configFile)
require.NoError(t, err, "Failed to read config file: %s", configFile)

var config map[string]any
err = yaml.Unmarshal(data, &config)
require.NoError(t, err, "Failed to unmarshal YAML data from config file: %s", configFile)

// Function to recursively search and replace the encoding and topic
var replaceEncodingAndTopic func(m map[string]any) int
replaceEncodingAndTopic = func(m map[string]any) int {
replacements := 0
for k, v := range m {
if k == "encoding" {
oldEncoding := v.(string)
m[k] = targetEncoding
t.Logf("Replaced encoding '%s' with '%s' in key: %s", oldEncoding, targetEncoding, k)
replacements++
} else if k == "topic" {
oldTopic := v.(string)
m[k] = uniqueTopic
t.Logf("Replaced topic '%s' with '%s' in key: %s", oldTopic, uniqueTopic, k)
replacements++
} else if subMap, ok := v.(map[string]any); ok {
replacements += replaceEncodingAndTopic(subMap)
}
}
return replacements
}

totalReplacements := replaceEncodingAndTopic(config)
require.Equal(t, 2, totalReplacements, "Expected exactly 2 replacements (encoding and topic), but got %d", totalReplacements)

newData, err := yaml.Marshal(config)
require.NoError(t, err, "Failed to marshal YAML data after encoding replacement")

tempFile := filepath.Join(t.TempDir(), fmt.Sprintf("config_%s.yaml", targetEncoding))
err = os.WriteFile(tempFile, newData, 0o600)
require.NoError(t, err, "Failed to write updated config file to: %s", tempFile)

t.Logf("Transformed configuration file %s to %s", configFile, tempFile)
return tempFile
}

func TestKafkaStorage(t *testing.T) {
integration.SkipUnlessEnv(t, "kafka")

Expand All @@ -93,24 +37,30 @@ func TestKafkaStorage(t *testing.T) {
// has access to the storage and allows the test to query it.
// We reuse E2EStorageIntegration struct to manage lifecycle of the collector,
// but the tests are run against the ingester.
envVarOverrides := map[string]string{
"KAFKA_TOPIC": uniqueTopic,
"KAFKA_ENCODING": test.encoding,
}

collector := &E2EStorageIntegration{
BinaryName: "jaeger-v2-collector",
ConfigFile: createConfigWithEncoding(t, "../../config-kafka-collector.yaml", test.encoding, uniqueTopic),
ConfigFile: "../../config-kafka-collector.yaml",
SkipStorageCleaner: true,
EnvVarOverrides: envVarOverrides,
}
collector.e2eInitialize(t, "kafka")
t.Log("Collector initialized")

ingester := &E2EStorageIntegration{
BinaryName: "jaeger-v2-ingester",
ConfigFile: createConfigWithEncoding(t, "../../config-kafka-ingester.yaml", test.encoding, uniqueTopic),
ConfigFile: "../../config-kafka-ingester.yaml",
HealthCheckEndpoint: "http://localhost:14133/status",
StorageIntegration: integration.StorageIntegration{
CleanUp: purge,
GetDependenciesReturnsSource: true,
SkipArchiveTest: true,
},
EnvVarOverrides: envVarOverrides,
}
ingester.e2eInitialize(t, "kafka")
t.Log("Ingester initialized")
Expand Down

0 comments on commit 5598400

Please sign in to comment.