Skip to content

Commit

Permalink
Add query service with OTLP
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay committed Jul 1, 2021
1 parent 127293d commit ae53586
Show file tree
Hide file tree
Showing 20 changed files with 4,771 additions and 42 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ cmd/docs/*.1
cmd/docs/*.yaml
crossdock/crossdock-*
run-crossdock.log
proto-gen/.patched-otel-proto/
__pycache__

72 changes: 64 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -429,19 +429,18 @@ thrift: idl/thrift/jaeger.thrift thrift-image
rm -rf thrift-gen/*/*-remote thrift-gen/*/*.bak

idl/thrift/jaeger.thrift:
$(MAKE) idl-submodule
$(MAKE) init-submodules

.PHONY: idl-submodule
idl-submodule:
git submodule init
git submodule update
.PHONY: init-submodules
init-submodules:
git submodule update --init --recursive

.PHONY: thrift-image
thrift-image:
$(THRIFT) -version

.PHONY: generate-zipkin-swagger
generate-zipkin-swagger: idl-submodule
generate-zipkin-swagger: init-submodules
$(SWAGGER) generate server -f ./idl/swagger/zipkin2-api.yaml -t $(SWAGGER_GEN_DIR) -O PostSpans --exclude-main
rm $(SWAGGER_GEN_DIR)/restapi/operations/post_spans_urlbuilder.go $(SWAGGER_GEN_DIR)/restapi/server.go $(SWAGGER_GEN_DIR)/restapi/configure_zipkin.go $(SWAGGER_GEN_DIR)/models/trace.go $(SWAGGER_GEN_DIR)/models/list_of_traces.go $(SWAGGER_GEN_DIR)/models/dependency_link.go

Expand All @@ -458,10 +457,13 @@ generate-mocks: install-mockery
echo-version:
@echo $(GIT_CLOSEST_TAG)

PROTO_INTERMEDIATE_DIR = proto-gen/.patched-otel-proto
PROTOC := docker run --rm -u ${shell id -u} -v${PWD}:${PWD} -w${PWD} ${JAEGER_DOCKER_PROTOBUF} --proto_path=${PWD}
PROTO_INCLUDES := \
-Iidl/proto/api_v2 \
-Iidl/proto/api_v3 \
-Imodel/proto/metrics \
-I$(PROTO_INTERMEDIATE_DIR) \
-I/usr/include/github.com/gogo/protobuf
# Remapping of std types to gogo types (must not contain spaces)
PROTO_GOGO_MAPPINGS := $(shell echo \
Expand All @@ -473,9 +475,8 @@ PROTO_GOGO_MAPPINGS := $(shell echo \
Mmodel.proto=github.com/jaegertracing/jaeger/model \
| sed 's/ //g')


.PHONY: proto
proto:
proto: init-submodules proto-prepare-otel
# Generate gogo, swagger, go-validators, gRPC-storage-plugin output.
#
# -I declares import folders, in order of importance
Expand Down Expand Up @@ -552,6 +553,61 @@ proto:
--gogo_out=plugins=grpc,$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/zipkin \
idl/proto/zipkin.proto

$(PROTOC) \
$(PROTO_INCLUDES) \
--gogo_out=plugins=grpc,paths=source_relative,$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/otel \
$(PROTO_INTERMEDIATE_DIR)/common/v1/common.proto
$(PROTOC) \
$(PROTO_INCLUDES) \
--gogo_out=plugins=grpc,paths=source_relative,$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/otel \
$(PROTO_INTERMEDIATE_DIR)/resource/v1/resource.proto
$(PROTOC) \
$(PROTO_INCLUDES) \
--gogo_out=plugins=grpc,paths=source_relative,$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/otel \
$(PROTO_INTERMEDIATE_DIR)/trace/v1/trace.proto

# Target proto-prepare-otel modifies OTEL proto to use import path jaeger.proto.*
# The modification is needed because OTEL collector already uses opentelemetry.proto.*
# and two complied protobuf types cannot have the same import path. The root cause is that the compiled OTLP
# in the collector is in private package, hence it cannot be used in Jaeger.
# The following statements revert changes in OTEL proto and only modify go package.
# This way the service will use opentelemetry.proto.trace.v1.ResourceSpans but in reality at runtime
# it uses jaeger.proto.trace.v1.ResourceSpans which is the same type in a different package which
# prevents panic of two equal proto types.
rm -rf $(PROTO_INTERMEDIATE_DIR)/*
cp -R idl/opentelemetry-proto/* $(PROTO_INTERMEDIATE_DIR)
find $(PROTO_INTERMEDIATE_DIR) -name "*.proto" | xargs -L 1 sed -i 's+github.com/open-telemetry/opentelemetry-proto/gen/go+github.com/jaegertracing/jaeger/proto-gen/otel+g'
$(PROTOC) \
$(PROTO_INCLUDES) \
--gogo_out=plugins=grpc,$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/api_v3 \
idl/proto/api_v3/query_service.proto
$(PROTOC) \
$(PROTO_INCLUDES) \
--grpc-gateway_out=logtostderr=true,grpc_api_configuration=idl/proto/api_v3/query_service_http.yaml,$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/api_v3 \
idl/proto/api_v3/query_service.proto
$(PROTOC) \
$(PROTO_INCLUDES) \
--swagger_out=disable_default_errors=true,logtostderr=true,grpc_api_configuration=idl/proto/api_v3/query_service_http.yaml:$(PWD)/proto-gen/api_v3 \
idl/proto/api_v3/query_service.proto
rm -rf $(PROTO_INTERMEDIATE_DIR)

.PHONY: proto-prepare-otel
proto-prepare-otel:
@echo --
@echo -- Copying to $(PROTO_INTERMEDIATE_DIR)
@echo --
mkdir -p $(PROTO_INTERMEDIATE_DIR)
cp -R idl/opentelemetry-proto/opentelemetry/proto/* $(PROTO_INTERMEDIATE_DIR)

@echo --
@echo -- Editing proto
@echo --
@# Change:
@# go import from github.com/open-telemetry/opentelemetry-proto/gen/go/* to github.com/jaegertracing/jaeger/proto-gen/otel/*
@# proto package from opentelemetry.proto.* to jaeger.proto.*
@# remove import opentelemetry/proto
find $(PROTO_INTERMEDIATE_DIR) -name "*.proto" | xargs -L 1 sed -i -f otel_proto_patch.sed

.PHONY: proto-hotrod
proto-hotrod:
$(PROTOC) \
Expand Down
37 changes: 37 additions & 0 deletions cmd/query/app/apiv3/grpc_gateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiv3

import (
"context"
"net/http"

"github.com/gorilla/mux"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/proto-gen/api_v3"
)

// RegisterGRPCGateway registers api_v3 endpoints into provided mux.
func RegisterGRPCGateway(r *mux.Router, basePath string, grpcEndpoint string) error {
jsonpb := &runtime.JSONPb{}
grpcGatewayMux := runtime.NewServeMux(
runtime.WithMarshalerOption(runtime.MIMEWildcard, jsonpb),
)
r.PathPrefix("/v3/").Handler(http.StripPrefix(basePath, grpcGatewayMux))
opts := []grpc.DialOption{grpc.WithInsecure()}
return api_v3.RegisterQueryServiceHandlerFromEndpoint(context.Background(), grpcGatewayMux, grpcEndpoint, opts)
}
100 changes: 100 additions & 0 deletions cmd/query/app/apiv3/grpc_gateway_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiv3

import (
"bytes"
"encoding/json"
"fmt"
"net"
"net/http"
"strings"
"testing"

"github.com/gorilla/mux"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/model"
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" //force gogo codec registration
"github.com/jaegertracing/jaeger/proto-gen/api_v3"
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)

func TestGRPCGateway(t *testing.T) {
r := &spanstoremocks.Reader{}
traceID := model.NewTraceID(150, 160)
r.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).Return(
&model.Trace{
Spans: []*model.Span{
{
TraceID: traceID,
SpanID: model.NewSpanID(180),
OperationName: "foobar",
},
},
}, nil).Once()

q := querysvc.NewQueryService(r, &dependencyStoreMocks.Reader{}, querysvc.QueryServiceOptions{})
server := grpc.NewServer()
h := &Handler{
QueryService: q,
}
api_v3.RegisterQueryServiceServer(server, h)

lis, _ := net.Listen("tcp", ":0")
go func() {
err := server.Serve(lis)
require.NoError(t, err)
}()

router := &mux.Router{}
err := RegisterGRPCGateway(router, "", lis.Addr().String())
require.NoError(t, err)

httpLis, err := net.Listen("tcp", ":0")
require.NoError(t, err)
go func() {
err = http.Serve(httpLis, router)
require.NoError(t, err)
defer httpLis.Close()
}()
req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost%s/v3/traces/123", strings.Replace(httpLis.Addr().String(), "[::]", "", 1)), nil)
req.Header.Set("Content-Type", "application/json")
response, err := http.DefaultClient.Do(req)
buf := bytes.Buffer{}
_, err = buf.ReadFrom(response.Body)
require.NoError(t, err)

jsonpb := &runtime.JSONPb{}
var envelope envelope
err = json.Unmarshal(buf.Bytes(), &envelope)
require.NoError(t, err)
var spansResponse api_v3.SpansResponseChunk
err = jsonpb.Unmarshal(envelope.Result, &spansResponse)
require.NoError(t, err)
assert.Equal(t, 1, len(spansResponse.GetResourceSpans()))
assert.Equal(t, uint64ToTraceID(traceID.High, traceID.Low), spansResponse.GetResourceSpans()[0].GetInstrumentationLibrarySpans()[0].GetSpans()[0].GetTraceId())
}

// see https://github.com/grpc-ecosystem/grpc-gateway/issues/2189
type envelope struct {
Result json.RawMessage `json:"result"`
}
133 changes: 133 additions & 0 deletions cmd/query/app/apiv3/grpc_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiv3

import (
"context"

"github.com/gogo/protobuf/types"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v3"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// Handler implements api_v3.QueryServiceServer
type Handler struct {
QueryService *querysvc.QueryService
}

var _ api_v3.QueryServiceServer = (*Handler)(nil)

// GetTrace implements api_v3.QueryServiceServer's GetTrace
func (h *Handler) GetTrace(request *api_v3.GetTraceRequest, stream api_v3.QueryService_GetTraceServer) error {
traceID, err := model.TraceIDFromString(request.GetTraceId())
if err != nil {
return err
}

trace, err := h.QueryService.GetTrace(stream.Context(), traceID)
if err != nil {
return err
}
resourceSpans := jaegerSpansToOTLP(trace.GetSpans())
return stream.Send(&api_v3.SpansResponseChunk{
ResourceSpans: resourceSpans,
})
}

// FindTraces implements api_v3.QueryServiceServer's FindTraces
func (h *Handler) FindTraces(request *api_v3.FindTracesRequest, stream api_v3.QueryService_FindTracesServer) error {
query := request.GetQuery()
queryParams := &spanstore.TraceQueryParameters{
ServiceName: query.GetServiceName(),
OperationName: query.GetOperationName(),
Tags: query.GetAttributes(),
NumTraces: int(query.GetNumTraces()),
}
if query.GetStartTimeMin() != nil {
startTimeMin, err := types.TimestampFromProto(query.GetStartTimeMin())
if err != nil {
return err
}
queryParams.StartTimeMin = startTimeMin
}
if query.GetStartTimeMax() != nil {
startTimeMax, err := types.TimestampFromProto(query.GetStartTimeMax())
if err != nil {
return err
}
queryParams.StartTimeMax = startTimeMax
}
if query.GetDurationMin() != nil {
durationMin, err := types.DurationFromProto(query.GetDurationMin())
if err != nil {
return err
}
queryParams.DurationMin = durationMin
}
if query.GetStartTimeMax() != nil {
durationMax, err := types.DurationFromProto(query.GetDurationMax())
if err != nil {
return err
}
queryParams.DurationMax = durationMax
}

traces, err := h.QueryService.FindTraces(stream.Context(), queryParams)
if err != nil {
return err
}
for _, t := range traces {
resourceSpans := jaegerSpansToOTLP(t.GetSpans())
stream.Send(&api_v3.SpansResponseChunk{
ResourceSpans: resourceSpans,
})
}
return nil
}

// GetServices implements api_v3.QueryServiceServer's GetServices
func (h *Handler) GetServices(ctx context.Context, _ *api_v3.GetServicesRequest) (*api_v3.GetServicesResponse, error) {
services, err := h.QueryService.GetServices(ctx)
if err != nil {
return nil, err
}
return &api_v3.GetServicesResponse{
Services: services,
}, nil
}

// GetOperations implements api_v3.QueryService's GetOperations
func (h *Handler) GetOperations(ctx context.Context, request *api_v3.GetOperationsRequest) (*api_v3.GetOperationsResponse, error) {
operations, err := h.QueryService.GetOperations(ctx, spanstore.OperationQueryParameters{
ServiceName: request.GetService(),
SpanKind: request.GetSpanKind(),
})
if err != nil {
return nil, err
}
apiOperations := make([]*api_v3.Operation, len(operations))
for i := range operations {
apiOperations[i] = &api_v3.Operation{
Name: operations[i].Name,
SpanKind: operations[i].SpanKind,
}
}
return &api_v3.GetOperationsResponse{
Operations: apiOperations,
}, nil
}
Loading

0 comments on commit ae53586

Please sign in to comment.