Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

💤 Swap Zipkin server for Zipkin Receiver from OTel Collector Contrib #5045

Merged
merged 15 commits into from
Dec 27, 2023
Merged
1 change: 0 additions & 1 deletion .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ ignore:
- "proto-gen/*/*"
- "thrift-gen/*/*"
- "**/thrift-0.9.2/*"
- "swagger-gen/*/*"
- "**/main.go"

coverage:
Expand Down
10 changes: 0 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,6 @@ DATE=$(shell TZ=UTC0 git show --quiet --date='format-local:%Y-%m-%dT%H:%M:%SZ' -
BUILD_INFO_IMPORT_PATH=$(JAEGER_IMPORT_PATH)/pkg/version
BUILD_INFO=-ldflags "-X $(BUILD_INFO_IMPORT_PATH).commitSHA=$(GIT_SHA) -X $(BUILD_INFO_IMPORT_PATH).latestVersion=$(GIT_CLOSEST_TAG) -X $(BUILD_INFO_IMPORT_PATH).date=$(DATE)"

SWAGGER_VER=0.27.0
SWAGGER_IMAGE=quay.io/goswagger/swagger:v$(SWAGGER_VER)
SWAGGER=docker run --rm -it -u ${shell id -u} -v "${PWD}:/go/src/" -w /go/src/ $(SWAGGER_IMAGE)
SWAGGER_GEN_DIR=swagger-gen

MOCKERY=mockery
GOVERSIONINFO=goversioninfo
SYSOFILE=resource.syso
Expand Down Expand Up @@ -468,11 +463,6 @@ test-report:
init-submodules:
git submodule update --init --recursive

.PHONY: generate-zipkin-swagger
generate-zipkin-swagger: init-submodules
$(SWAGGER) generate server -f ./idl/swagger/zipkin2-api.yaml -t $(SWAGGER_GEN_DIR) -O PostSpans --exclude-main
rm $(SWAGGER_GEN_DIR)/restapi/operations/post_spans_urlbuilder.go $(SWAGGER_GEN_DIR)/restapi/server.go $(SWAGGER_GEN_DIR)/restapi/configure_zipkin.go $(SWAGGER_GEN_DIR)/models/trace.go $(SWAGGER_GEN_DIR)/models/list_of_traces.go $(SWAGGER_GEN_DIR)/models/dependency_link.go

.PHONY: generate-mocks
generate-mocks: install-tools
$(MOCKERY) --all --dir ./pkg/es/ --output ./pkg/es/mocks && rm pkg/es/mocks/ClientBuilder.go
Expand Down
33 changes: 15 additions & 18 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@

// state, read only
hServer *http.Server
zkServer *http.Server
grpcServer *grpc.Server
otlpReceiver receiver.Traces
zipkinReceiver receiver.Traces
tlsGRPCCertWatcherCloser io.Closer
tlsHTTPCertWatcherCloser io.Closer
tlsZipkinCertWatcherCloser io.Closer
Expand Down Expand Up @@ -141,20 +141,17 @@
c.tlsGRPCCertWatcherCloser = &options.GRPC.TLS
c.tlsHTTPCertWatcherCloser = &options.HTTP.TLS
c.tlsZipkinCertWatcherCloser = &options.Zipkin.TLS
zkServer, err := server.StartZipkinServer(&server.ZipkinServerParams{
HostPort: options.Zipkin.HTTPHostPort,
Handler: c.spanHandlers.ZipkinSpansHandler,
TLSConfig: options.Zipkin.TLS,
HealthCheck: c.hCheck,
CORSConfig: options.Zipkin.CORS,
Logger: c.logger,
MetricsFactory: c.metricsFactory,
KeepAlive: options.Zipkin.KeepAlive,
})
if err != nil {
return fmt.Errorf("could not start Zipkin server: %w", err)

if options.Zipkin.HTTPHostPort == "" {
c.logger.Info("Not listening for Zipkin HTTP traffic, port not configured")

Check warning on line 146 in cmd/collector/app/collector.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/collector.go#L146

Added line #L146 was not covered by tests
} else {
zipkinReceiver, err := handler.StartZipkinReceiver(options, c.logger, c.spanProcessor, c.tenancyMgr)
if err != nil {
return fmt.Errorf("could not start Zipkin receiver: %w", err)
}
c.zipkinReceiver = zipkinReceiver

}
c.zkServer = zkServer

if options.OTLP.Enabled {
otlpReceiver, err := handler.StartOTLPReceiver(options, c.logger, c.spanProcessor, c.tenancyMgr)
Expand Down Expand Up @@ -191,11 +188,11 @@
defer cancel()
}

// Stop Zipkin server
if c.zkServer != nil {
// Stop Zipkin receiver
if c.zipkinReceiver != nil {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := c.zkServer.Shutdown(timeout); err != nil {
c.logger.Fatal("failed to stop the Zipkin server", zap.Error(err))
if err := c.zipkinReceiver.Shutdown(timeout); err != nil {
c.logger.Fatal("failed to stop the Zipkin receiver", zap.Error(err))

Check warning on line 195 in cmd/collector/app/collector.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/collector.go#L195

Added line #L195 was not covered by tests
}
defer cancel()
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestCollector_StartErrors(t *testing.T) {

options = optionsForEphemeralPorts()
options.Zipkin.HTTPHostPort = ":-1"
run("Zipkin", options, "could not start Zipkin server")
run("Zipkin", options, "could not start Zipkin receiver")

options = optionsForEphemeralPorts()
options.OTLP.GRPC.HostPort = ":-1"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
[
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the existing zipkin_v1_merged_spans.json file (Zipkin JSON v1), converged into object model, and re-serialized as JSON. It uses different representations for many fields, like raw numbers instead of hex strings for IDs.

{
"trace_id": 5679353540597208576,
"name": "get /",
"id": 5679353540597208576,
"annotations": [
{
"timestamp": 1633073248674949,
"value": "sr",
"host": {
"ipv4": 0,
"port": 0,
"service_name": ""
}
},
{
"timestamp": 163307324868981,
"value": "ss",
"host": {
"ipv4": 0,
"port": 0,
"service_name": ""
}
}
],
"binary_annotations": null,
"debug": false,
"timestamp": 1633073248674949,
"duration": 14861
},
{
"trace_id": 5679353540597208576,
"name": "get /api",
"id": -3944181038374441761,
"parent_id": 5679353540597208576,
"annotations": [
{
"timestamp": 1633073248678309,
"value": "cs",
"host": {
"ipv4": 0,
"port": 0,
"service_name": ""
}
},
{
"timestamp": 1633073248681669,
"value": "sr",
"host": {
"ipv4": 0,
"port": 0,
"service_name": ""
}
},
{
"timestamp": 1633073248685029,
"value": "cr",
"host": {
"ipv4": 0,
"port": 0,
"service_name": ""
}
},
{
"timestamp": 1633073248688388,
"value": "ss",
"host": {
"ipv4": 0,
"port": 0,
"service_name": ""
}
}
],
"binary_annotations": null,
"debug": false,
"timestamp": 1633073248678309,
"duration": 3360
}
]
7 changes: 3 additions & 4 deletions cmd/collector/app/handler/thrift_span_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package handler

import (
"encoding/json"
"errors"
"os"
"testing"
Expand All @@ -26,7 +27,6 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
zipkinsanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
"github.com/jaegertracing/jaeger/cmd/collector/app/zipkin/zipkindeser"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestZipkinSpanHandler(t *testing.T) {
{
name: "dual client-server span",
expectedErr: nil,
filename: "testdata/zipkin_v1_merged_spans.json",
filename: "testdata/zipkin_thrift_v1_merged_spans.json",
},
}
for _, tc := range tests {
Expand All @@ -116,8 +116,7 @@ func TestZipkinSpanHandler(t *testing.T) {
if tc.filename != "" {
data, err := os.ReadFile(tc.filename)
require.NoError(t, err)
spans, err = zipkindeser.DeserializeJSON(data)
require.NoError(t, err)
require.NoError(t, json.Unmarshal(data, &spans))
} else {
spans = []*zipkincore.Span{
{
Expand Down
89 changes: 89 additions & 0 deletions cmd/collector/app/handler/zipkin_receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) 2023 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package handler

import (
"context"
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
noopmetric "go.opentelemetry.io/otel/metric/noop"
nooptrace "go.opentelemetry.io/otel/trace/noop"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/pkg/tenancy"
)

// StartZipkinReceiver starts Zipkin receiver from OTEL Collector.
func StartZipkinReceiver(
options *flags.CollectorOptions,
logger *zap.Logger,
spanProcessor processor.SpanProcessor,
tm *tenancy.Manager,
) (receiver.Traces, error) {
zipkinFactory := zipkinreceiver.NewFactory()
return startZipkinReceiver(
options,
logger,
spanProcessor,
tm,
zipkinFactory,
consumer.NewTraces,
zipkinFactory.CreateTracesReceiver,
)
}

// Some of OTELCOL constructor functions return errors when passed nil arguments,
// which is a situation we cannot reproduce. To test our own error handling, this
// function allows to mock those constructors.
func startZipkinReceiver(
options *flags.CollectorOptions,
logger *zap.Logger,
spanProcessor processor.SpanProcessor,
tm *tenancy.Manager,
// from here: params that can be mocked in tests
zipkinFactory receiver.Factory,
newTraces func(consume consumer.ConsumeTracesFunc, options ...consumer.Option) (consumer.Traces, error),
createTracesReceiver func(ctx context.Context, set receiver.CreateSettings,
cfg component.Config, nextConsumer consumer.Traces) (receiver.Traces, error),
) (receiver.Traces, error) {
receiverConfig := zipkinFactory.CreateDefaultConfig().(*zipkinreceiver.Config)
applyHTTPSettings(&receiverConfig.HTTPServerSettings, &flags.HTTPOptions{
HostPort: options.Zipkin.HTTPHostPort,
TLS: options.Zipkin.TLS,
CORS: options.HTTP.CORS,
// TODO keepAlive not supported?
})
receiverSettings := receiver.CreateSettings{
TelemetrySettings: component.TelemetrySettings{
Logger: logger,
TracerProvider: nooptrace.NewTracerProvider(),
MeterProvider: noopmetric.NewMeterProvider(), // TODO wire this with jaegerlib metrics?
},
}

consumerAdapter := newConsumerDelegate(logger, spanProcessor, tm)
nextConsumer, err := newTraces(consumerAdapter.consume)
if err != nil {
return nil, fmt.Errorf("could not create Zipkin consumer: %w", err)
}
rcvr, err := createTracesReceiver(
context.Background(),
receiverSettings,
receiverConfig,
nextConsumer,
)
if err != nil {
return nil, fmt.Errorf("could not create Zipkin receiver: %w", err)
}
if err := rcvr.Start(context.Background(), &otelHost{logger: logger}); err != nil {
return nil, fmt.Errorf("could not start Zipkin receiver: %w", err)
}
return rcvr, nil
}
Loading
Loading