Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run v2 e2e integration tests for Kafka #5782

Merged
merged 8 commits into from
Jul 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .github/workflows/ci-e2e-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ permissions: # added using https://github.com/step-security/secure-workflows
jobs:
kafka:
runs-on: ubuntu-latest
strategy:
matrix:
jaeger-version: [v1, v2] # Adjust if there are specific versions of Jaeger
name: Kafka Integration Tests ${{ matrix.jaeger-version }}
steps:
- name: Harden Runner
uses: step-security/harden-runner@17d0e2bd7d51742c71671bd19fa12bdc9d40a3d6 # v2.8.1
Expand All @@ -30,9 +34,9 @@ jobs:
with:
go-version: 1.22.x

- name: Run kafka integration tests
- name: Run Kafka integration tests
id: test-execution
run: bash scripts/kafka-integration-test.sh -k
run: bash scripts/kafka-integration-test.sh -j ${{ matrix.jaeger-version }}

- name: Output Kafka logs on failure
run: docker compose -f ${{ steps.test-execution.outputs.docker_compose_file }} logs
Expand All @@ -42,4 +46,4 @@ jobs:
uses: ./.github/actions/upload-codecov
with:
files: cover.out
flags: kafka
flags: kafka-${{ matrix.jaeger-version }}
7 changes: 6 additions & 1 deletion cmd/jaeger/collector-with-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ service:
receivers: [otlp, jaeger]
processors: [batch]
exporters: [kafka]

telemetry:
resource:
service.name: jaeger_collector
metrics:
level: detailed

receivers:
otlp:
protocols:
Expand Down
32 changes: 24 additions & 8 deletions cmd/jaeger/internal/integration/e2e_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ const otlpPort = 4317
// - At last, clean up anything declared in its own test functions.
// (e.g. close remote-storage)
type E2EStorageIntegration struct {
SkipStorageCleaner bool
integration.StorageIntegration
ConfigFile string

SkipStorageCleaner bool
ConfigFile string
HealthCheckEndpoint string
}

// e2eInitialize starts the Jaeger-v2 collector with the provided config file,
Expand All @@ -51,6 +53,11 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
if !s.SkipStorageCleaner {
configFile = createStorageCleanerConfig(t, s.ConfigFile, storage)
}

configFile, err := filepath.Abs(configFile)
require.NoError(t, err, "Failed to get absolute path of the config file")
require.FileExists(t, configFile, "Config file does not exist at the resolved path")

t.Logf("Starting Jaeger-v2 in the background with config file %s", configFile)

outFile, err := os.OpenFile(
Expand Down Expand Up @@ -79,22 +86,31 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
Stdout: outFile,
Stderr: errFile,
}
t.Logf("Running command: %v", cmd.Args)
require.NoError(t, cmd.Start())

// Wait for the binary to start and become ready to serve requests.
healthCheckEndpoint := s.HealthCheckEndpoint
if healthCheckEndpoint == "" {
healthCheckEndpoint = fmt.Sprintf("http://localhost:%d/", ports.QueryHTTP)
}
require.Eventually(t, func() bool {
url := fmt.Sprintf("http://localhost:%d/", ports.QueryHTTP)
t.Logf("Checking if Jaeger-v2 is available on %s", url)
t.Logf("Checking if Jaeger-v2 is available on %s", healthCheckEndpoint)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
require.NoError(t, err)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthCheckEndpoint, nil)
if err != nil {
t.Logf("HTTP request creation failed: %v", err)
return false
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Log(err)
t.Logf("HTTP request failed: %v", err)
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
}, 30*time.Second, 500*time.Millisecond, "Jaeger-v2 did not start")
}, 60*time.Second, 3*time.Second, "Jaeger-v2 did not start")
t.Log("Jaeger-v2 is ready")
t.Cleanup(func() {
if err := cmd.Process.Kill(); err != nil {
Expand Down
13 changes: 11 additions & 2 deletions cmd/jaeger/internal/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,21 @@ import (
func TestKafkaStorage(t *testing.T) {
integration.SkipUnlessEnv(t, "kafka")

// TODO these config files use topic: "jaeger-spans",
// but for integration tests we want to use random topic in each run.
// https://github.com/jaegertracing/jaeger/blob/ed5cc2981c34158d0650cb96cb2fafcb753bea70/plugin/storage/integration/kafka_test.go#L50-L51
// Once OTEL Collector supports default values for env vars
// (https://github.com/open-telemetry/opentelemetry-collector/issues/5228)
// we can change the config to use topic: "${KAFKA_TOPIC:-jaeger-spans}"
// and export a KAFKA_TOPIC var with random topic name in the tests.

collectorConfig := "../../collector-with-kafka.yaml"
ingesterConfig := "../../ingester-remote-storage.yaml"

collector := &E2EStorageIntegration{
SkipStorageCleaner: true,
ConfigFile: collectorConfig,
SkipStorageCleaner: true,
ConfigFile: collectorConfig,
HealthCheckEndpoint: "http://localhost:8888/metrics",
}

// Initialize and start the collector
Expand Down
112 changes: 84 additions & 28 deletions scripts/kafka-integration-test.sh
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,44 +1,100 @@
#!/bin/bash

set -e
set -euf -o pipefail

export STORAGE=kafka
compose_file="docker-compose/kafka-integration-test/docker-compose.yml"
echo "docker_compose_file=${compose_file}" >> "${GITHUB_OUTPUT:-/dev/null}"

# Check if the -k parameter is provided and start Kafka if it was
if [ "$1" == "-k" ]; then
echo "Starting Kafka using Docker Compose..."
docker compose -f "${compose_file}" up -d kafka
echo "docker_compose_file=${compose_file}" >> "${GITHUB_OUTPUT:-/dev/null}"
fi
jaeger_version=""
manage_kafka="true"

print_help() {
echo "Usage: $0 [-K] -j <jaeger_version>"
echo " -K: do not start or stop Kafka container (useful for local testing)"
echo " -j: major version of Jaeger to test (v1|v2)"
exit 1
}

parse_args() {
while getopts "j:Kh" opt; do
case "${opt}" in
j)
jaeger_version=${OPTARG}
;;
K)
manage_kafka="false"
;;
*)
print_help
;;
esac
done
if [ "$jaeger_version" != "v1" ] && [ "$jaeger_version" != "v2" ]; then
echo "Error: Invalid Jaeger version. Valid options are v1 or v2"
print_help
fi
}

setup_kafka() {
echo "Starting Kafka using Docker Compose..."
docker compose -f "${compose_file}" up -d kafka
}

teardown_kafka() {
echo "Stopping Kafka..."
docker compose -f "${compose_file}" down
}

# Check if Kafka is ready by attempting to list topics
is_kafka_ready() {
docker compose -f "${compose_file}" \
exec kafka /opt/bitnami/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server localhost:9092 \
>/dev/null 2>&1
docker compose -f "${compose_file}" \
exec kafka /opt/bitnami/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server localhost:9092 \
>/dev/null 2>&1
}

# Set the timeout in seconds
timeout=180
# Set the interval between checks in seconds
interval=5
# Calculate the end time
end_time=$((SECONDS + timeout))
wait_for_kafka() {
local timeout=180
local interval=5
local end_time=$((SECONDS + timeout))

while [ $SECONDS -lt $end_time ]; do
while [ $SECONDS -lt $end_time ]; do
if is_kafka_ready; then
break
return
fi
echo "Kafka broker not ready, waiting ${interval} seconds"
sleep $interval
done
done

if ! is_kafka_ready; then
echo "Timed out waiting for Kafka to start"
exit 1
fi
echo "Timed out waiting for Kafka to start"
exit 1
}

run_integration_test() {
export STORAGE=kafka
if [ "${jaeger_version}" = "v1" ]; then
make storage-integration-test
elif [ "${jaeger_version}" = "v2" ]; then
make jaeger-v2-storage-integration-test
else
echo "Unknown Jaeger version ${jaeger_version}."
print_help
fi
}

main() {
parse_args "$@"

echo "Executing Kafka integration test for version $2"
set -x

if [[ "$manage_kafka" == "true" ]]; then
setup_kafka
trap 'teardown_kafka' EXIT
fi
wait_for_kafka

run_integration_test
}

make storage-integration-test
main "$@"
Loading