Skip to content

Commit

Permalink
OTLP receiver
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro committed May 26, 2022
1 parent 1f4d1d2 commit 8ae69d1
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 25 deletions.
24 changes: 21 additions & 3 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"time"

"github.com/uber/jaeger-lib/metrics"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/cmd/collector/app/server"
Expand All @@ -49,6 +51,7 @@ type Collector struct {
hServer *http.Server
zkServer *http.Server
grpcServer *grpc.Server
otlpReceiver component.TracesReceiver
tlsGRPCCertWatcherCloser io.Closer
tlsHTTPCertWatcherCloser io.Closer
tlsZipkinCertWatcherCloser io.Closer
Expand Down Expand Up @@ -106,7 +109,7 @@ func (c *Collector) Start(options *CollectorOptions) error {
MaxConnectionAgeGrace: options.GRPC.MaxConnectionAgeGrace,
})
if err != nil {
return fmt.Errorf("could not start gRPC collector %w", err)
return fmt.Errorf("could not start gRPC collector: %w", err)
}
c.grpcServer = grpcServer

Expand All @@ -120,7 +123,7 @@ func (c *Collector) Start(options *CollectorOptions) error {
Logger: c.logger,
})
if err != nil {
return fmt.Errorf("could not start the HTTP server %w", err)
return fmt.Errorf("could not start the HTTP server: %w", err)
}
c.hServer = httpServer

Expand All @@ -138,10 +141,16 @@ func (c *Collector) Start(options *CollectorOptions) error {
MetricsFactory: c.metricsFactory,
})
if err != nil {
return fmt.Errorf("could not start the Zipkin server %w", err)
return fmt.Errorf("could not start the Zipkin server: %w", err)
}
c.zkServer = zkServer

otlpReceiver, err := handler.StartOtelReceiver(c.logger, c.spanProcessor)
if err != nil {
return err
}
c.otlpReceiver = otlpReceiver

c.publishOpts(options)

return nil
Expand Down Expand Up @@ -178,6 +187,15 @@ func (c *Collector) Close() error {
defer cancel()
}

// OpenTelemetry OTLP receiver
if c.otlpReceiver != nil {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := c.otlpReceiver.Shutdown(timeout); err != nil {
c.logger.Fatal("failed to stop the OTLP receiver", zap.Error(err))
}
defer cancel()
}

if err := c.spanProcessor.Close(); err != nil {
c.logger.Error("failed to close span processor.", zap.Error(err))
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

"go.uber.org/zap"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/encoding/gzip"
_ "google.golang.org/grpc/encoding/gzip" // register zip encoding
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
Expand Down
108 changes: 108 additions & 0 deletions cmd/collector/app/handler/otlp_receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) 2022 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package handler

import (
"context"
"fmt"

otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.opentelemetry.io/otel"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

// A delegation function to assist in tests, because ProtoFromTraces never returns errors despite its API.
var protoFromTraces func(td ptrace.Traces) ([]*model.Batch, error) = otlp2jaeger.ProtoFromTraces

var _ component.Host = (*otelHost)(nil) // API check

type OtelReceiverOptions struct {
GRPCAddress string
HTTPAddress string
}

// StartOtelReceiver starts OpenTelemetry OTLP receiver listening on gRPC and HTTP ports.
func StartOtelReceiver(logger *zap.Logger, spanProcessor processor.SpanProcessor) (component.TracesReceiver, error) {
otlpFactory := otlpreceiver.NewFactory()
otlpReceiverConfig := otlpFactory.CreateDefaultConfig()
otlpReceiverSettings := component.ReceiverCreateSettings{
TelemetrySettings: component.TelemetrySettings{
Logger: logger,
TracerProvider: otel.GetTracerProvider(), // TODO we may always want no-op here, not the global default
},
}
// TODO re-implement the logic of NewGRPCHandler, it's fairly trivial
jaegerBatchHandler := NewGRPCHandler(logger, spanProcessor)
nextConsumer, err := consumer.NewTraces(consumer.ConsumeTracesFunc(func(ctx context.Context, ld ptrace.Traces) error {
batches, err := protoFromTraces(ld)
if err != nil {
return err
}
for _, batch := range batches {
// TODO generate metrics
_, err := jaegerBatchHandler.PostSpans(ctx, &api_v2.PostSpansRequest{
Batch: *batch,
})
if err != nil {
return err
}
}
return nil
}))
if err != nil {
return nil, fmt.Errorf("could not create the OTLP consumer: %w", err)
}
otlpReceiver, err := otlpFactory.CreateTracesReceiver(
context.Background(),
otlpReceiverSettings,
otlpReceiverConfig,
nextConsumer,
)
if err != nil {
return nil, fmt.Errorf("could not create the OTLP receiver: %w", err)
}
err = otlpReceiver.Start(context.Background(), &otelHost{logger: logger})
if err != nil {
return nil, fmt.Errorf("could not start the OTLP receiver: %w", err)
}
return otlpReceiver, nil
}

// otelHost is a mostly no-op implementation of OTEL component.Host
type otelHost struct {
logger *zap.Logger
}

func (h *otelHost) ReportFatalError(err error) {
h.logger.Fatal("OTLP receiver error", zap.Error(err))
}
func (*otelHost) GetFactory(kind component.Kind, componentType config.Type) component.Factory {
return nil
}
func (*otelHost) GetExtensions() map[config.ComponentID]component.Extension {
return nil
}
func (*otelHost) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter {
return nil
}
32 changes: 32 additions & 0 deletions cmd/collector/app/handler/otlp_receover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) 2022 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package handler

import (
"context"
"testing"

"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/stretchr/testify/require"
)

func TestOtlpReceiver(t *testing.T) {
spanProcessor := &mockSpanProcessor{}
logger, _ := testutils.NewLogger()
rec, err := StartOtelReceiver(logger, spanProcessor)
require.NoError(t, err)
err = rec.Shutdown(context.Background())
require.NoError(t, err)
}
9 changes: 5 additions & 4 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func main() {
if err != nil {
logger.Fatal("Failed to create sampling strategy store", zap.Error(err))
}
c := app.New(&app.CollectorParams{
collector := app.New(&app.CollectorParams{
ServiceName: serviceName,
Logger: logger,
MetricsFactory: metricsFactory,
Expand All @@ -110,12 +110,13 @@ func main() {
if err != nil {
logger.Fatal("Failed to initialize collector", zap.Error(err))
}
if err := c.Start(collectorOpts); err != nil {
// Start all Collector services
if err := collector.Start(collectorOpts); err != nil {
logger.Fatal("Failed to start collector", zap.Error(err))
}

// Wait for shutfown
svc.RunAndThen(func() {
if err := c.Close(); err != nil {
if err := collector.Close(); err != nil {
logger.Error("failed to cleanly close the collector", zap.Error(err))
}
if closer, ok := spanWriter.(io.Closer); ok {
Expand Down
25 changes: 19 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/hashicorp/go-plugin v1.4.4
github.com/kr/pretty v0.3.0
github.com/olivere/elastic v6.2.37+incompatible
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.52.0
github.com/opentracing-contrib/go-grpc v0.0.0-20191001143057-db30781987df
github.com/opentracing-contrib/go-stdlib v1.0.0
github.com/opentracing/opentracing-go v1.2.0
Expand All @@ -43,8 +44,10 @@ require (
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible
github.com/xdg-go/scram v1.1.1
go.opentelemetry.io/collector v0.52.0
go.opentelemetry.io/collector/pdata v0.52.0
go.opentelemetry.io/collector/semconv v0.52.0
go.opentelemetry.io/otel v1.7.0
go.uber.org/atomic v1.9.0
go.uber.org/automaxprocs v1.5.1
go.uber.org/zap v1.21.0
Expand All @@ -59,7 +62,7 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/VividCortex/gohistogram v1.0.0 // indirect
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
Expand All @@ -73,6 +76,8 @@ require (
github.com/fatih/color v1.13.0 // indirect
github.com/felixge/httpsnoop v1.0.2 // indirect
github.com/go-kit/kit v0.11.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/analysis v0.21.2 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
Expand All @@ -81,7 +86,6 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v1.12.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand All @@ -94,21 +98,26 @@ require (
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.14.4 // indirect
github.com/klauspost/compress v1.15.4 // indirect
github.com/knadh/koanf v1.4.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-testing-interface v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mostynb/go-grpc-compression v1.1.16 // indirect
github.com/oklog/run v1.1.0 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/onsi/ginkgo v1.16.4 // indirect
github.com/onsi/gomega v1.13.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.52.0 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
Expand All @@ -120,15 +129,19 @@ require (
github.com/rogpeppe/go-internal v1.6.2 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/stringprep v1.0.3 // indirect
go.mongodb.org/mongo-driver v1.8.3 // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.32.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac // indirect
Expand Down
Loading

0 comments on commit 8ae69d1

Please sign in to comment.