Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jaeger-v2] Add kafka exporter and receiver configuration #4971

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ ignore:
- "thrift-gen/*/*"
- "**/thrift-0.9.2/*"
- "**/main.go"
- "cmd/jaeger/integration"
- "examples/hotrod"

coverage:
Expand Down
14 changes: 13 additions & 1 deletion .github/workflows/ci-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ permissions: # added using https://github.com/step-security/secure-workflows
jobs:
kafka:
runs-on: ubuntu-latest
strategy:
matrix:
name: [v1, v2]
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

steps:
- name: Harden Runner
uses: step-security/harden-runner@eb238b55efaa70779f274895e782ed17c84f2895 # v2.6.1
Expand All @@ -31,7 +35,15 @@ jobs:
go-version: 1.21.x

- name: Run kafka integration tests
run: bash scripts/kafka-integration-test.sh -k
run: |
case ${{ matrix.name }} in
v1)
bash scripts/kafka-integration-test.sh -k
;;
v2)
bash scripts/otel-kafka-integration-test.sh 3.6
;;
esac

- name: Output Kafka logs
run: docker logs kafka
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ cmd/collector/collector
cmd/collector/collector-*
cmd/ingester/ingester
cmd/ingester/ingester-*
cmd/jaeger/integration/results
cmd/remote-storage/remote-storage
cmd/remote-storage/remote-storage-*
cmd/es-index-cleaner/es-index-cleaner-*
Expand Down
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
SHELL := /bin/bash
JAEGER_IMPORT_PATH = github.com/jaegertracing/jaeger
STORAGE_PKGS = ./plugin/storage/integration/...
OTEL_INTEGRATION_PATH = ./cmd/jaeger/integration/...

# These DOCKER_xxx vars are used when building Docker images.
DOCKER_NAMESPACE?=jaegertracing
Expand Down Expand Up @@ -141,6 +142,13 @@ index-rollover-integration-test: docker-images-elastic
go clean -testcache
bash -c "set -e; set -o pipefail; $(GOTEST) -tags index_rollover -coverpkg=./... -coverprofile cover-index-rollover.out $(STORAGE_PKGS) $(COLORIZE)"

.PHONY: otel-integration-test
otel-integration-test:
# Expire tests results for storage integration tests since the environment might change
# even though the code remains the same.
go clean -testcache
bash -c "set -e; set -o pipefail; $(GOTEST) -coverpkg=./... -coverprofile cover.out $(OTEL_INTEGRATION_PATH) $(COLORIZE)"

.PHONY: cover
cover: nocover
bash -c "set -e; set -o pipefail; $(GOTEST) -tags=memory_storage_integration -timeout 5m -coverprofile cover.out ./... | tee test-results.json"
Expand Down
30 changes: 30 additions & 0 deletions cmd/jaeger/collector-with-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
service:
pipelines:
traces:
receivers: [otlp, jaeger, zipkin]
processors: [batch]
exporters: [kafka]

receivers:
otlp:
protocols:
grpc:
http:

jaeger:
protocols:
grpc:
thrift_binary:
thrift_compact:
thrift_http:

zipkin:

processors:
batch:

exporters:
kafka:
brokers:
- localhost:9092
encoding: otlp_proto # available encodings are otlp_proto, jaeger_proto, jaeger_json, zipkin_proto, zipkin_json, zipkin_thrift
31 changes: 31 additions & 0 deletions cmd/jaeger/ingester-with-remote.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
service:
extensions: [jaeger_storage]
pipelines:
traces:
receivers: [kafka]
processors: [batch]
exporters: [jaeger_storage_exporter] # same as in the default cmd/jaeger-v2/config.yaml
telemetry:
metrics:
address: 0.0.0.0:8889 # to avoid port conflict with collector-with-kafka.yaml

extensions:
jaeger_storage:
grpc:
memstore:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't know it's a memstore in this config, let's call it 'external-storage'

server: localhost:17271
connection-timeout: 5s

receivers:
kafka:
brokers:
- localhost:9092
encoding: otlp_proto # available encodings are otlp_proto, jaeger_proto, jaeger_json, zipkin_proto, zipkin_json, zipkin_thrift
initial_offset: earliest # consume messages from the beginning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we should parameterize this and default to latest checkpoint rather than earliest, since it would be very bad to run with earliest in production. The integration tests can override the value via env var.


processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: memstore
32 changes: 32 additions & 0 deletions cmd/jaeger/ingester.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
service:
extensions: [jaeger_storage]
pipelines:
traces:
receivers: [kafka]
processors: [batch]
exporters: [jaeger_storage_exporter] # same as in the default cmd/jaeger-v2/config.yaml
telemetry:
metrics:
address: 0.0.0.0:8889 # to avoid port conflict with collector-with-kafka.yaml

extensions:
jaeger_storage:
memory:
memstore:
max_traces: 100000
memstore_archive:
max_traces: 100000

receivers:
kafka:
brokers:
- localhost:9092
encoding: otlp_proto # available encodings are otlp_proto, jaeger_proto, jaeger_json, zipkin_proto, zipkin_json, zipkin_thrift
initial_offset: earliest # consume messages from the beginning

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: memstore
16 changes: 16 additions & 0 deletions cmd/jaeger/integration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Integration

Jaeger v2 integration tests are built on top of [OTEL Testbed module](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/testbed). OTEL Testbed provide comprehensive tools for conducting end-to-end tests for the OTEL Collector, such as reproducible short-term benchmarks, correctness tests, long-running stability tests and maximum load stress tests. To learn more about OTEL Testbed, please refer to the their [README](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/testbed/README.md)

## kafka_test

Kafka e2e test checks if the pipelines through `kafka` and finally at `remote-storage` have stored match exactly with the provided data using `GoldenDataProvider` (Provides data from the "Golden" dataset generated using pairwise combinatorial testing a.k.a PICT techniques for use in correctness tests) and validated using `CorrectnessTestValidator`.

The pipelines are checked in 2 steps, which the first test case verifies if the spans sent to Kafka are correct, and the second one checks the spans stored in the remote storage.
![kafka diagram](kafka_diagram.jpeg)

To conduct the tests, run the following command:

```
scripts/otel-kafka-integration-test.sh [kafka_version=latest] [remote_storage_version=latest]
```
1 change: 1 addition & 0 deletions cmd/jaeger/integration/datareceivers/.nocover
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FIXME
59 changes: 59 additions & 0 deletions cmd/jaeger/integration/datareceivers/jaegerstorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package datareceivers

import (
"context"
"fmt"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"

"github.com/jaegertracing/jaeger/cmd/jaeger/integration/receivers/storagereceiver"
)

type jaegerStorageDataReceiver struct {
Port int
receiver receiver.Traces
}

func NewJaegerStorageDataReceiver(port int) testbed.DataReceiver {
return &jaegerStorageDataReceiver{Port: port}
}

func (dr *jaegerStorageDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, _ consumer.Logs) error {
factory := storagereceiver.NewFactory()
cfg := factory.CreateDefaultConfig().(*storagereceiver.Config)
cfg.GRPC.RemoteServerAddr = fmt.Sprintf("localhost:%d", dr.Port)
cfg.GRPC.RemoteConnectTimeout = time.Duration(5 * time.Second)
// TODO add support for other backends

var err error
set := receivertest.NewNopCreateSettings()
dr.receiver, err = factory.CreateTracesReceiver(context.Background(), set, cfg, tc)
if err != nil {
return err
}

return dr.receiver.Start(context.Background(), componenttest.NewNopHost())
}

func (dr *jaegerStorageDataReceiver) Stop() error {
return dr.receiver.Shutdown(context.Background())
}

func (dr *jaegerStorageDataReceiver) GenConfigYAMLStr() string {
return fmt.Sprintf(`
jaeger_storage_receiver:
grpc:
server: localhost:%d`, dr.Port)
}

func (dr *jaegerStorageDataReceiver) ProtocolName() string {
return "jaeger_storage_receiver"
}
57 changes: 57 additions & 0 deletions cmd/jaeger/integration/datareceivers/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package datareceivers
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
)

type kafkaDataReceiver struct {
testbed.DataReceiverBase
receiver receiver.Traces
}

func NewKafkaDataReceiver(port int) testbed.DataReceiver {
return &kafkaDataReceiver{DataReceiverBase: testbed.DataReceiverBase{Port: port}}
}

func (dr *kafkaDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, _ consumer.Logs) error {
factory := kafkareceiver.NewFactory()
cfg := factory.CreateDefaultConfig().(*kafkareceiver.Config)
cfg.Brokers = []string{fmt.Sprintf("localhost:%d", dr.Port)}
cfg.GroupID = "testbed_collector"

var err error
set := receivertest.NewNopCreateSettings()
dr.receiver, err = factory.CreateTracesReceiver(context.Background(), set, cfg, tc)
if err != nil {
return err
}

return dr.receiver.Start(context.Background(), componenttest.NewNopHost())
}

func (dr *kafkaDataReceiver) Stop() error {
return dr.receiver.Shutdown(context.Background())
}

func (dr *kafkaDataReceiver) GenConfigYAMLStr() string {
return fmt.Sprintf(`
kafka:
brokers:
- localhost:%d
encoding: otlp_proto`, dr.Port)
}

func (dr *kafkaDataReceiver) ProtocolName() string {
return "kafka"
}
Loading
Loading