-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add MetricsQueryService grcp handler #3091
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
// Copyright (c) 2021 The Jaeger Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// Contains default parameter values used by handlers when optional request parameters are missing. | ||
|
||
package app | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" | ||
) | ||
|
||
var ( | ||
defaultDependencyLookbackDuration = time.Hour * 24 | ||
defaultTraceQueryLookbackDuration = time.Hour * 24 * 2 | ||
defaultMetricsQueryLookbackDuration = time.Hour | ||
defaultMetricsQueryStepDuration = 5 * time.Second | ||
defaultMetricsQueryRateDuration = 10 * time.Minute | ||
defaultMetricsSpanKinds = []string{metrics.SpanKind_SPAN_KIND_SERVER.String()} | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ package app | |
|
||
import ( | ||
"context" | ||
"errors" | ||
|
||
"github.com/opentracing/opentracing-go" | ||
"go.uber.org/zap" | ||
|
@@ -24,8 +25,11 @@ import ( | |
|
||
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc" | ||
"github.com/jaegertracing/jaeger/model" | ||
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" //force gogo codec registration | ||
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration | ||
"github.com/jaegertracing/jaeger/plugin/metrics/disabled" | ||
"github.com/jaegertracing/jaeger/proto-gen/api_v2" | ||
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" | ||
"github.com/jaegertracing/jaeger/storage/metricsstore" | ||
"github.com/jaegertracing/jaeger/storage/spanstore" | ||
) | ||
|
||
|
@@ -35,21 +39,30 @@ const ( | |
msgTraceNotFound = "trace not found" | ||
) | ||
|
||
var ( | ||
errGRPCMetricsQueryDisabled = status.Error(codes.Unimplemented, "metrics querying is currently disabled") | ||
errMissingServiceNames = status.Error(codes.InvalidArgument, "please provide at least one service name") | ||
errMissingQuantile = status.Error(codes.InvalidArgument, "please provide a quantile between (0, 1]") | ||
) | ||
|
||
// GRPCHandler implements the gRPC endpoint of the query service. | ||
type GRPCHandler struct { | ||
queryService *querysvc.QueryService | ||
logger *zap.Logger | ||
tracer opentracing.Tracer | ||
queryService *querysvc.QueryService | ||
metricsQueryService querysvc.MetricsQueryService | ||
logger *zap.Logger | ||
tracer opentracing.Tracer | ||
clock clock | ||
} | ||
|
||
// NewGRPCHandler returns a GRPCHandler | ||
func NewGRPCHandler(queryService *querysvc.QueryService, logger *zap.Logger, tracer opentracing.Tracer) *GRPCHandler { | ||
func NewGRPCHandler(queryService *querysvc.QueryService, metricsQueryService querysvc.MetricsQueryService, logger *zap.Logger, tracer opentracing.Tracer, clock clock) *GRPCHandler { | ||
gH := &GRPCHandler{ | ||
queryService: queryService, | ||
logger: logger, | ||
tracer: tracer, | ||
queryService: queryService, | ||
metricsQueryService: metricsQueryService, | ||
logger: logger, | ||
tracer: tracer, | ||
clock: clock, | ||
} | ||
|
||
return gH | ||
} | ||
|
||
|
@@ -177,3 +190,123 @@ func (g *GRPCHandler) GetDependencies(ctx context.Context, r *api_v2.GetDependen | |
|
||
return &api_v2.GetDependenciesResponse{Dependencies: dependencies}, nil | ||
} | ||
|
||
// GetLatencies is the gRPC handler to fetch latency metrics. | ||
func (g *GRPCHandler) GetLatencies(ctx context.Context, r *metrics.GetLatenciesRequest) (*metrics.GetMetricsResponse, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. recently we've seen some tickets showing that apparently with some client libs it's possible to send a request where r==nil, which crashes the server. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see you already have this check in g.newBaseQueryParameters, but should call this first then There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, at least the grpc clients generated in this project fail to marshal a nil message over the wire; but I trust in what you've seen from other client libs, so I've added a nil check for the request. Note, the original nil check in |
||
// Check for cases where clients do not provide the Quantile, which defaults to the float64's zero value. | ||
if r.Quantile == 0 { | ||
return nil, errMissingQuantile | ||
} | ||
bqp, err := g.newBaseQueryParameters(r.BaseRequest) | ||
if err := g.handleErr("failed to build parameters", err); err != nil { | ||
return nil, err | ||
} | ||
queryParams := metricsstore.LatenciesQueryParameters{ | ||
BaseQueryParameters: bqp, | ||
Quantile: r.Quantile, | ||
} | ||
m, err := g.metricsQueryService.GetLatencies(ctx, &queryParams) | ||
if err := g.handleErr("failed to fetch latencies", err); err != nil { | ||
return nil, err | ||
} | ||
return &metrics.GetMetricsResponse{Metrics: *m}, nil | ||
} | ||
|
||
// GetCallRates is the gRPC handler to fetch call rate metrics. | ||
func (g *GRPCHandler) GetCallRates(ctx context.Context, r *metrics.GetCallRatesRequest) (*metrics.GetMetricsResponse, error) { | ||
bqp, err := g.newBaseQueryParameters(r.BaseRequest) | ||
if err := g.handleErr("failed to build parameters", err); err != nil { | ||
return nil, err | ||
} | ||
queryParams := metricsstore.CallRateQueryParameters{ | ||
BaseQueryParameters: bqp, | ||
} | ||
m, err := g.metricsQueryService.GetCallRates(ctx, &queryParams) | ||
if err := g.handleErr("failed to fetch call rates", err); err != nil { | ||
return nil, err | ||
} | ||
return &metrics.GetMetricsResponse{Metrics: *m}, nil | ||
} | ||
|
||
// GetErrorRates is the gRPC handler to fetch error rate metrics. | ||
func (g *GRPCHandler) GetErrorRates(ctx context.Context, r *metrics.GetErrorRatesRequest) (*metrics.GetMetricsResponse, error) { | ||
bqp, err := g.newBaseQueryParameters(r.BaseRequest) | ||
if err := g.handleErr("failed to build parameters", err); err != nil { | ||
return nil, err | ||
} | ||
queryParams := metricsstore.ErrorRateQueryParameters{ | ||
BaseQueryParameters: bqp, | ||
} | ||
m, err := g.metricsQueryService.GetErrorRates(ctx, &queryParams) | ||
if err := g.handleErr("failed to fetch error rates", err); err != nil { | ||
return nil, err | ||
} | ||
return &metrics.GetMetricsResponse{Metrics: *m}, nil | ||
} | ||
|
||
// GetMinStepDuration is the gRPC handler to fetch the minimum step duration supported by the underlying metrics store. | ||
func (g *GRPCHandler) GetMinStepDuration(ctx context.Context, _ *metrics.GetMinStepDurationRequest) (*metrics.GetMinStepDurationResponse, error) { | ||
minStep, err := g.metricsQueryService.GetMinStepDuration(ctx, &metricsstore.MinStepDurationQueryParameters{}) | ||
if err := g.handleErr("failed to fetch min step duration", err); err != nil { | ||
return nil, err | ||
} | ||
return &metrics.GetMinStepDurationResponse{MinStep: minStep}, nil | ||
} | ||
|
||
func (g *GRPCHandler) handleErr(msg string, err error) error { | ||
if err == nil { | ||
return nil | ||
} | ||
g.logger.Error(msg, zap.Error(err)) | ||
|
||
// Avoid wrapping "expected" errors with an "Internal Server" error. | ||
switch { | ||
case errors.Is(err, disabled.ErrDisabled): | ||
return errGRPCMetricsQueryDisabled | ||
case errors.Is(err, errMissingServiceNames), errors.Is(err, errMissingQuantile): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rather than enumerating, could we check if it's already an instance of status.Error? |
||
return err | ||
} | ||
|
||
// Received an "unexpected" error. | ||
return status.Errorf(codes.Internal, "%s: %v", msg, err) | ||
} | ||
|
||
func (g *GRPCHandler) newBaseQueryParameters(r *metrics.MetricsQueryBaseRequest) (bqp metricsstore.BaseQueryParameters, err error) { | ||
if r == nil || len(r.ServiceNames) == 0 { | ||
return bqp, errMissingServiceNames | ||
} | ||
|
||
// Copy non-nullable params. | ||
bqp.GroupByOperation = r.GroupByOperation | ||
bqp.ServiceNames = r.ServiceNames | ||
|
||
// Initialize nullable params with defaults. | ||
defaultEndTime := g.clock.Now() | ||
bqp.EndTime = &defaultEndTime | ||
bqp.Lookback = &defaultMetricsQueryLookbackDuration | ||
bqp.RatePer = &defaultMetricsQueryRateDuration | ||
bqp.SpanKinds = defaultMetricsSpanKinds | ||
bqp.Step = &defaultMetricsQueryStepDuration | ||
|
||
// ... and override defaults with any provided request params. | ||
if r.EndTime != nil { | ||
bqp.EndTime = r.EndTime | ||
} | ||
if r.Lookback != nil { | ||
bqp.Lookback = r.Lookback | ||
} | ||
if r.Step != nil { | ||
bqp.Step = r.Step | ||
} | ||
if r.RatePer != nil { | ||
bqp.RatePer = r.RatePer | ||
} | ||
if len(r.SpanKinds) > 0 { | ||
spanKinds := make([]string, len(r.SpanKinds)) | ||
for i, v := range r.SpanKinds { | ||
spanKinds[i] = v.String() | ||
} | ||
bqp.SpanKinds = spanKinds | ||
} | ||
return bqp, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it worth adopting the functional options pattern here given the list of params is growing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lately I've been trying to stay away from functional options and just use an options/params struct, it's less work to implement and has better discoverability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this concrete example, we don't even need any additional struct, just remove this function and initialize GRPCHandler directly