Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka exporter and receiver configuration #5703

Merged
merged 30 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f02e2fc
Add configs for collector and ingester
joeyyy09 Jul 3, 2024
a728c2e
Update collector config
joeyyy09 Jul 10, 2024
fc7da59
Update ingester config
joeyyy09 Jul 14, 2024
53173c5
Add initial kafka_test.go
joeyyy09 Jul 14, 2024
b2133cc
Add debug logs in the config
joeyyy09 Jul 15, 2024
2ccff53
Update conig files for UI
joeyyy09 Jul 15, 2024
c9c32cc
Update conig files
joeyyy09 Jul 16, 2024
802413a
Add kafka e2e setup
joeyyy09 Jul 18, 2024
819ee21
Add kafka e2e setup
joeyyy09 Jul 18, 2024
d02136e
Add Readme for Kafka
joeyyy09 Jul 18, 2024
3cf0151
Update cmd/jaeger/internal/integration/README.md
joeyyy09 Jul 19, 2024
d09eaa4
Update cmd/jaeger/internal/integration/README.md
joeyyy09 Jul 19, 2024
0acf969
Update cmd/jaeger/internal/integration/README.md
joeyyy09 Jul 19, 2024
17ebd78
Fixes
joeyyy09 Jul 19, 2024
dfcba73
Fix broken syntax
joeyyy09 Jul 19, 2024
fe794c1
Add injectStorageCleaner parameter
joeyyy09 Jul 19, 2024
6ab1285
fix merge conflicts
joeyyy09 Jul 19, 2024
66ef1f9
fix merge conflicts
joeyyy09 Jul 19, 2024
4d4a882
Modify E2EStorageIntegration struct
joeyyy09 Jul 20, 2024
7cc6a6f
Fixes
joeyyy09 Jul 20, 2024
29581a4
Fixes
joeyyy09 Jul 20, 2024
580bdc9
Update Lint
joeyyy09 Jul 20, 2024
fea62c6
Fix Lint
joeyyy09 Jul 20, 2024
284acc9
Add e2eCleanup in e2eInitialize
joeyyy09 Jul 21, 2024
4ba5917
Fixes
joeyyy09 Jul 24, 2024
06e68ac
Lint fixes
joeyyy09 Jul 24, 2024
07f903e
fix ui
yurishkuro Jul 24, 2024
46121f5
Merge branch 'main' into kafka-config
yurishkuro Jul 24, 2024
4d624d6
schedule clean-up after reader/writer are created
yurishkuro Jul 24, 2024
d7c70cf
remove duplicate call to cleanup
yurishkuro Jul 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions cmd/jaeger/collector-with-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
service:
pipelines:
traces:
receivers: [otlp, jaeger]
processors: [batch]
exporters: [kafka]

receivers:
otlp:
protocols:
grpc:
http:
jaeger:
protocols:
grpc:
thrift_binary:
thrift_compact:
thrift_http:

processors:
batch:

exporters:
kafka:
brokers:
- localhost:9092
topic: "jaeger-spans"
encoding: otlp_proto
37 changes: 37 additions & 0 deletions cmd/jaeger/ingester-remote-storage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [kafka]
processors: [batch]
exporters: [jaeger_storage_exporter]
telemetry:
metrics:
address: 0.0.0.0:8889
logs:
level: debug

extensions:
jaeger_query:
trace_storage: some_storage

jaeger_storage:
backends:
some_storage:
memory:
max_traces: 100000

receivers:
kafka:
brokers:
- localhost:9092
topic: "jaeger-spans"
encoding: otlp_proto
initial_offset: earliest

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: some_storage
50 changes: 50 additions & 0 deletions cmd/jaeger/internal/integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,56 @@ flowchart LR
end
```

## Kafka Integration

The primary difference between the Kafka integration tests and other integration tests lies in the flow of data. In the standard tests, spans are written by the SpanWriter, sent through an RPC_client directly to a receiver, then to an exporter, and written to a storage backend. Spans are read by the SpanReader, which queries the jaeger_query process accessing the storage backend. In contrast, the Kafka tests introduce Kafka as an intermediary. Spans go from the SpanWriter through an RPC_client to an OTLP receiver in the Jaeger Collector, exported to Kafka, received by the Jaeger Ingester, and then stored. For details, see the [Architecture](#KafkaArchitecture) section below.


## Kafka Architecture

``` mermaid
flowchart LR
Test -->|writeSpan| SpanWriter
SpanWriter --> RPCW[RPC_client]
RPCW --> OTLP_Receiver[Receiver]
OTLP_Receiver --> CollectorExporter[Kafka Exporter]
CollectorExporter --> Kafka[Kafka]
Kafka --> IngesterReceiver[Kafka Receiver]
IngesterReceiver --> IngesterExporter[Exporter]
IngesterExporter --> StorageBackend[(In-Memory Store)]
Test -->|readSpan| SpanReader
SpanReader --> RPCR[RPC_client]
RPCR --> QueryProcess[jaeger_query]
StorageCleaner -->|purge| StorageBackend
QueryProcess --> StorageBackend


subgraph Integration_Test_Executable
Test
SpanWriter
SpanReader
RPCW
RPCR
end

subgraph Jaeger Collector
OTLP_Receiver
CollectorExporter
end

subgraph Jaeger Ingester
IngesterReceiver
IngesterExporter
QueryProcess
StorageBackend
StorageCleaner[Storage Cleaner Extension]
end

subgraph Kafka
Topic
end
```

## Running tests locally

All integration tests can be run locally.
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/integration/badger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestBadgerStorage(t *testing.T) {
GetOperationsMissingSpanKind: true,
},
}
s.e2eInitialize(t, "badger")
s.e2eInitialize(t, "badger", true)
joeyyy09 marked this conversation as resolved.
Show resolved Hide resolved
t.Cleanup(func() {
s.e2eCleanUp(t)
})
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestCassandraStorage(t *testing.T) {
SkipList: integration.CassandraSkippedTests,
},
}
s.e2eInitialize(t, "cassandra")
s.e2eInitialize(t, "cassandra", true)
t.Cleanup(func() {
s.e2eCleanUp(t)
})
Expand Down
44 changes: 30 additions & 14 deletions cmd/jaeger/internal/integration/e2e_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ type E2EStorageIntegration struct {
// e2eInitialize starts the Jaeger-v2 collector with the provided config file,
// it also initialize the SpanWriter and SpanReader below.
// This function should be called before any of the tests start.
func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string, injectStorageCleaner bool) {
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
configFile := createStorageCleanerConfig(t, s.ConfigFile, storage)
configFile := createStorageCleanerConfig(t, s.ConfigFile, storage, injectStorageCleaner)
t.Logf("Starting Jaeger-v2 in the background with config file %s", configFile)

outFile, err := os.OpenFile(
Expand Down Expand Up @@ -124,28 +124,44 @@ func (s *E2EStorageIntegration) e2eCleanUp(t *testing.T) {
require.NoError(t, s.SpanWriter.(io.Closer).Close())
}

func createStorageCleanerConfig(t *testing.T, configFile string, storage string) string {
func createStorageCleanerConfig(t *testing.T, configFile string, storage string, injectStorageCleaner bool) string {
data, err := os.ReadFile(configFile)
require.NoError(t, err)

var config map[string]any
err = yaml.Unmarshal(data, &config)
require.NoError(t, err)

serviceAny, ok := config["service"]
require.True(t, ok)
service := serviceAny.(map[string]any)
service["extensions"] = append(service["extensions"].([]any), "storage_cleaner")

// Ensure extensions are correctly parsed before any modifications
extensionsAny, ok := config["extensions"]
require.True(t, ok)
extensions := extensionsAny.(map[string]any)
queryAny, ok := extensions["jaeger_query"]
require.True(t, ok)
traceStorageAny, ok := queryAny.(map[string]any)["trace_storage"]
require.True(t, ok)
traceStorage := traceStorageAny.(string)
extensions["storage_cleaner"] = map[string]string{"trace_storage": traceStorage}


if injectStorageCleaner{
// Add storage_cleaner to the service extensions
serviceAny, ok := config["service"]
require.True(t, ok)
service := serviceAny.(map[string]any)

// Ensure the extensions key is present and is a slice
if extList, ok := service["extensions"].([]any); ok {
service["extensions"] = append(extList, "storage_cleaner")
} else {
service["extensions"] = []any{"storage_cleaner"}
}

// Ensure jaeger_query is correctly parsed
queryAny, ok := extensions["jaeger_query"]
require.True(t, ok)
query := queryAny.(map[string]any)
trace_storage, ok := query["trace_storage"].(string)
require.True(t, ok)

extensions["storage_cleaner"] = map[string]string{"trace_storage": trace_storage}
}

// Ensure jaeger_storage is correctly parsed
jaegerStorageAny, ok := extensions["jaeger_storage"]
require.True(t, ok)
jaegerStorage := jaegerStorageAny.(map[string]any)
Expand Down
4 changes: 2 additions & 2 deletions cmd/jaeger/internal/integration/e2e_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ import "testing"
func TestCreateStorageCleanerConfig(t *testing.T) {
// Ensure that we can parse the existing configs correctly.
// This is faster to run than the full integration test.
createStorageCleanerConfig(t, "../../config-elasticsearch.yaml", "elasticsearch")
createStorageCleanerConfig(t, "../../config-opensearch.yaml", "opensearch")
createStorageCleanerConfig(t, "../../config-elasticsearch.yaml", "elasticsearch", true)
createStorageCleanerConfig(t, "../../config-opensearch.yaml", "opensearch", true)
}
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestElasticsearchStorage(t *testing.T) {
GetOperationsMissingSpanKind: true,
},
}
s.e2eInitialize(t, "elasticsearch")
s.e2eInitialize(t, "elasticsearch", true)
t.Cleanup(func() {
s.e2eCleanUp(t)
})
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestGRPCStorage(t *testing.T) {
}
s.CleanUp = s.cleanUp
s.initialize(t)
s.e2eInitialize(t, "grpc")
s.e2eInitialize(t, "grpc", true)
t.Cleanup(func() {
s.e2eCleanUp(t)
s.remoteStorage.Close(t)
joeyyy09 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
50 changes: 50 additions & 0 deletions cmd/jaeger/internal/integration/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package integration

import (
"testing"

"github.com/jaegertracing/jaeger/plugin/storage/integration"
)

func TestKafkaStorage(t *testing.T) {
integration.SkipUnlessEnv(t, "kafka")

collectorConfig := "../../collector-with-kafka.yaml"
ingesterConfig := "../../ingester-remote-storage.yaml"

collector := &E2EStorageIntegration{
ConfigFile: collectorConfig,
StorageIntegration: integration.StorageIntegration{
joeyyy09 marked this conversation as resolved.
Show resolved Hide resolved
GetDependenciesReturnsSource: true,
SkipArchiveTest: true,
},
}

// Initialize and start the collector
collector.e2eInitialize(t, "kafka-collector", false)


ingester := &E2EStorageIntegration{
ConfigFile: ingesterConfig,
StorageIntegration: integration.StorageIntegration{
CleanUp: purge,
GetDependenciesReturnsSource: true,
SkipArchiveTest: true,
},
}

// Initialize and start the ingester
ingester.e2eInitialize(t, "kafka-ingester", true)

// Set up cleanup for both collector and ingester
t.Cleanup(func() {
collector.e2eCleanUp(t)
ingester.e2eCleanUp(t)
})

// Run the span store tests
ingester.RunSpanStoreTests(t)
}
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/integration/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestMemoryStorage(t *testing.T) {
CleanUp: purge,
},
}
s.e2eInitialize(t, "memory")
s.e2eInitialize(t, "memory", true)
t.Cleanup(func() {
s.e2eCleanUp(t)
})
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/integration/opensearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestOpenSearchStorage(t *testing.T) {
GetOperationsMissingSpanKind: true,
},
}
s.e2eInitialize(t, "opensearch")
s.e2eInitialize(t, "opensearch", true)
t.Cleanup(func() {
s.e2eCleanUp(t)
})
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var fixtures embed.FS
// - in those functions it instantiates and populates this struct
// - it then calls RunAll.
//
// Some implementations may declate multuple tests, with different settings,
// Some implementations may declare multiple tests, with different settings,
// and RunAll() under different conditions.
type StorageIntegration struct {
SpanWriter spanstore.Writer
Expand Down Expand Up @@ -541,4 +541,4 @@ func (s *StorageIntegration) RunSpanStoreTests(t *testing.T) {
t.Run("GetTrace", s.testGetTrace)
t.Run("GetLargeSpans", s.testGetLargeSpan)
t.Run("FindTraces", s.testFindTraces)
}
}
Loading