Skip to content

Commit

Permalink
📈 add framework for send gRPC client metrics
Browse files Browse the repository at this point in the history
Signed-off-by: vankichi <[email protected]>
  • Loading branch information
vankichi committed Apr 9, 2024
1 parent 9bf4b6c commit a532b7d
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 15 deletions.
7 changes: 4 additions & 3 deletions internal/k8s/vald/benchmark/job/job_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ const (
RestartPolicyAlways RestartPolicy = "Always"
RestartPolicyOnFailure RestartPolicy = "OnFailure"
RestartPolicyNever RestartPolicy = "Never"
)

const (
volumeName = "vald-benchmark-job-config"
configMapName = "vald-benchmark-operator-config"
svcAccount = "vald-benchmark-operator"
)

var (
mode = int32(420)
)

type BenchmarkJobTpl interface {
CreateJobTpl(opts ...BenchmarkJobOption) (k8s.Job, error)
}
Expand Down Expand Up @@ -175,7 +177,6 @@ func (b *benchmarkJobTpl) CreateJobTpl(opts ...BenchmarkJobOption) (k8s.Job, err
}
// mount benchmark operator config map.
// It is used for bind only observability config for each benchmark job
mode := int32(420)
b.jobTpl.Spec.Template.Spec.Volumes = []corev1.Volume{
{
Name: volumeName,
Expand Down
79 changes: 79 additions & 0 deletions internal/net/grpc/interceptor/client/metric/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (C) 2019-2024 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric

import (
"context"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net/grpc/codes"
"github.com/vdaas/vald/internal/net/grpc/status"
"github.com/vdaas/vald/internal/observability/attribute"
"github.com/vdaas/vald/internal/observability/metrics"
"google.golang.org/grpc"
)

const (
latencyMetricsName = "client_latency"
completedRPCsMetricsName = "client_completed_rpcs"

gRPCMethodKeyName = "grpc_client_method"
gRPCStatus = "grpc_client_status"
)

func ClientMetricInterceptors() (grpc.UnaryClientInterceptor, grpc.StreamClientInterceptor, error) {
meter := metrics.GetMeter()

Check warning on line 37 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L36-L37

Added lines #L36 - L37 were not covered by tests

latencyHistgram, err := meter.Float64Histogram(
latencyMetricsName,
metrics.WithDescription("Client latency in milliseconds, by method"),
metrics.WithUnit(metrics.Milliseconds),
)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create latency metric")

Check warning on line 45 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L39-L45

Added lines #L39 - L45 were not covered by tests
}

completedRPCCnt, err := meter.Int64Counter(
completedRPCsMetricsName,
metrics.WithDescription("Count of RPCs by method and status"),
metrics.WithUnit(metrics.Milliseconds),
)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create completedRPCs metric")

Check warning on line 54 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L48-L54

Added lines #L48 - L54 were not covered by tests
}

record := func(ctx context.Context, method string, err error, latency float64) {
attrs := attributesFromError(method, err)
latencyHistgram.Record(ctx, latency, metrics.WithAttributes(attrs...))
completedRPCCnt.Add(ctx, 1, metrics.WithAttributes(attrs...))

Check warning on line 60 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L57-L60

Added lines #L57 - L60 were not covered by tests
}
// FIXME: implement sending metric data
log.Debug(record)
return nil, nil, nil

Check warning on line 64 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L63-L64

Added lines #L63 - L64 were not covered by tests
}

func attributesFromError(method string, err error) []attribute.KeyValue {
code := codes.OK // default error is success when error is nil
if err != nil {
st, _ := status.FromError(err)
if st != nil {
code = st.Code()

Check warning on line 72 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L67-L72

Added lines #L67 - L72 were not covered by tests
}
}
return []attribute.KeyValue{
attribute.String(gRPCMethodKeyName, method),
attribute.String(gRPCStatus, code.String()),

Check warning on line 77 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L75-L77

Added lines #L75 - L77 were not covered by tests
}
}
12 changes: 12 additions & 0 deletions internal/net/grpc/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (

"github.com/vdaas/vald/internal/backoff"
"github.com/vdaas/vald/internal/circuitbreaker"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net"
"github.com/vdaas/vald/internal/net/grpc/interceptor/client/metric"
"github.com/vdaas/vald/internal/net/grpc/interceptor/client/trace"
"github.com/vdaas/vald/internal/strings"
"github.com/vdaas/vald/internal/sync/errgroup"
Expand Down Expand Up @@ -353,6 +355,16 @@ func WithClientInterceptors(names ...string) Option {
grpc.WithUnaryInterceptor(trace.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(trace.StreamClientInterceptor()),
)
case "metricinterceptor", "metric":
uci, sci, err := metric.ClientMetricInterceptors()
if err != nil {
lerr := errors.NewErrCriticalOption("gRPCInterceptors", "metric", errors.Wrap(err, "failed to create interceptor"))
log.Warn(lerr.Error())

Check warning on line 362 in internal/net/grpc/option.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/option.go#L358-L362

Added lines #L358 - L362 were not covered by tests
}
g.dopts = append(g.dopts,
grpc.WithUnaryInterceptor(uci),
grpc.WithStreamInterceptor(sci),
)

Check warning on line 367 in internal/net/grpc/option.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/option.go#L364-L367

Added lines #L364 - L367 were not covered by tests
default:
}
}
Expand Down
35 changes: 23 additions & 12 deletions pkg/tools/benchmark/job/usecase/benchmarkd.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,34 @@ func New(cfg *config.Config) (r runner.Runner, err error) {
}
}

// bind metrics interceptor
var clientInterceptors []string
var obs observability.Observability
if cfg.Observability.Enabled {
obs, err = observability.NewWithConfig(
cfg.Observability,
infometrics.New("vald_benchmark_job_info", "Benchmark Job info", *cfg.Job),
)
if err != nil {
return nil, err

Check warning on line 76 in pkg/tools/benchmark/job/usecase/benchmarkd.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/job/usecase/benchmarkd.go#L68-L76

Added lines #L68 - L76 were not covered by tests
}
var str []string
str = append(str, "metric")
if cfg.Observability.Trace.Enabled {
str = append(str, "trace")

Check warning on line 81 in pkg/tools/benchmark/job/usecase/benchmarkd.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/job/usecase/benchmarkd.go#L78-L81

Added lines #L78 - L81 were not covered by tests
}
clientInterceptors = str

Check warning on line 83 in pkg/tools/benchmark/job/usecase/benchmarkd.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/job/usecase/benchmarkd.go#L83

Added line #L83 was not covered by tests
}

copts, err := cfg.Job.ClientConfig.Opts()
if err != nil {
return nil, err
}
if cfg.Job.ClientConfig.DialOption == nil {
copts = append(copts, grpc.WithInsecure(true))
copts = append(copts,
grpc.WithInsecure(true),
grpc.WithClientInterceptors(clientInterceptors...),
)

Check warning on line 94 in pkg/tools/benchmark/job/usecase/benchmarkd.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/job/usecase/benchmarkd.go#L91-L94

Added lines #L91 - L94 were not covered by tests
}
gcli := grpc.New(copts...)
vcli, err := vald.New(
Expand Down Expand Up @@ -131,17 +153,6 @@ func New(cfg *config.Config) (r runner.Runner, err error) {
}),
}

var obs observability.Observability
if cfg.Observability.Enabled {
obs, err = observability.NewWithConfig(
cfg.Observability,
infometrics.New("vald_benchmark_job_info", "Benchmark Job info", *cfg.Job),
)
if err != nil {
return nil, err
}
}

srv, err := starter.New(
starter.WithConfig(cfg.Server),
starter.WithREST(func(sc *iconf.Server) []server.Option {
Expand Down

0 comments on commit a532b7d

Please sign in to comment.