From cc88e34a33658d81290e0019556f16e6f7815acc Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 2 Jun 2022 15:44:49 +0900 Subject: [PATCH 1/7] add backoff metrics Signed-off-by: hlts2 --- internal/backoff/backoff.go | 30 +++++++ internal/backoff/context.go | 37 +++++++++ .../client/v1/client/discoverer/discover.go | 1 + internal/net/grpc/client.go | 3 + internal/net/grpc/context.go | 37 +++++++++ .../observability/metrics/backoff/backoff.go | 79 +++++++++++++++++++ internal/runner/runner_race_test.go | 2 +- pkg/gateway/lb/handler/grpc/handler.go | 20 ++--- pkg/gateway/lb/usecase/vald.go | 7 +- pkg/manager/index/service/indexer.go | 10 +-- pkg/manager/index/usecase/indexer.go | 11 ++- 11 files changed, 217 insertions(+), 20 deletions(-) create mode 100644 internal/backoff/context.go create mode 100644 internal/net/grpc/context.go create mode 100644 internal/observability/metrics/backoff/backoff.go diff --git a/internal/backoff/backoff.go b/internal/backoff/backoff.go index 7eb7436acc..3177aff73e 100644 --- a/internal/backoff/backoff.go +++ b/internal/backoff/backoff.go @@ -30,6 +30,13 @@ import ( "github.com/vdaas/vald/internal/rand" ) +// NOTE: This variable is for observability package. +// This will be fixed when refactoring the observability package. +var ( + mu sync.RWMutex + metrics map[string]int64 = make(map[string]int64) +) + type backoff struct { wg sync.WaitGroup backoffFactor float64 @@ -86,6 +93,7 @@ func (b *backoff) Do(ctx context.Context, f func(ctx context.Context) (val inter dur := b.initialDuration jdur := b.jittedInitialDuration + name := FromBackoffName(ctx) dctx, cancel := context.WithDeadline(sctx, time.Now().Add(b.backoffTimeLimit)) defer cancel() @@ -119,6 +127,13 @@ func (b *backoff) Do(ctx context.Context, f func(ctx context.Context) (val inter if b.errLog { log.Error(err) } + // e.g. name = vald.v1.Exists/ip ...etc + if len(name) != 0 { + mu.Lock() + metrics[name] += 1 + mu.Unlock() + } + timer.Reset(time.Duration(jdur)) select { case <-dctx.Done(): @@ -153,3 +168,18 @@ func (b *backoff) addJitter(dur float64) float64 { func (b *backoff) Close() { b.wg.Wait() } + +func Metrics(_ context.Context) map[string]int64 { + mu.RLock() + defer mu.RUnlock() + + if len(metrics) == 0 { + return nil + } + + m := make(map[string]int64, len(metrics)) + for name, cnt := range metrics { + m[name] = cnt + } + return m +} diff --git a/internal/backoff/context.go b/internal/backoff/context.go new file mode 100644 index 0000000000..8d41c35bf8 --- /dev/null +++ b/internal/backoff/context.go @@ -0,0 +1,37 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package backoff + +import "context" + +type contextKey string + +const backoffNameContextKey contextKey = "backoff_name" + +// WithBackoffName returns a copy of parent in which the method associated with key (backoffNameContextKey). +func WithBackoffName(ctx context.Context, name string) context.Context { + return context.WithValue(ctx, backoffNameContextKey, name) +} + +// FromBackoffName returns the value associated with this context for key (backoffNameContextKey). +func FromBackoffName(ctx context.Context) string { + if val := ctx.Value(backoffNameContextKey); val != nil { + if name, ok := val.(string); ok { + return name + } + } + return "" +} diff --git a/internal/client/v1/client/discoverer/discover.go b/internal/client/v1/client/discoverer/discover.go index ed01041ccd..8b408e1cae 100644 --- a/internal/client/v1/client/discoverer/discover.go +++ b/internal/client/v1/client/discoverer/discover.go @@ -218,6 +218,7 @@ func (c *client) dnsDiscovery(ctx context.Context, ech chan<- error) (addrs []st } func (c *client) discover(ctx context.Context, ech chan<- error) (err error) { + ctx = grpc.WithGRPCMethod(ctx, "discoverer.v1.Discoverer/Nodes") if c.dscClient == nil || (c.autoconn && c.client == nil) { return errors.ErrGRPCClientNotFound } diff --git a/internal/net/grpc/client.go b/internal/net/grpc/client.go index 9186eeece2..0fabc9e0e0 100644 --- a/internal/net/grpc/client.go +++ b/internal/net/grpc/client.go @@ -550,6 +550,9 @@ func (g *gRPCClient) do(ctx context.Context, p pool.Conn, addr string, enableBac } }() if g.bo != nil && enableBackoff { + if method := FromGRPCMethod(sctx); len(method) != 0 { + sctx = backoff.WithBackoffName(ctx, method+"/"+addr) + } data, err = g.bo.Do(sctx, func(ictx context.Context) (r interface{}, ret bool, err error) { err = p.Do(func(conn *ClientConn) (err error) { if conn == nil { diff --git a/internal/net/grpc/context.go b/internal/net/grpc/context.go new file mode 100644 index 0000000000..6696519b11 --- /dev/null +++ b/internal/net/grpc/context.go @@ -0,0 +1,37 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package grpc + +import "context" + +type contextKey string + +const grpcMethodContextKey contextKey = "grpc_method" + +// WithGRPCMethod returns a copy of parent in which the method associated with key (grpcMethodContextKey). +func WithGRPCMethod(ctx context.Context, method string) context.Context { + return context.WithValue(ctx, grpcMethodContextKey, method) +} + +// FromGRPCMethod returns the value associated with this context for key (grpcMethodContextKey). +func FromGRPCMethod(ctx context.Context) string { + if v := ctx.Value(grpcMethodContextKey); v != nil { + if method, ok := v.(string); ok { + return method + } + } + return "" +} diff --git a/internal/observability/metrics/backoff/backoff.go b/internal/observability/metrics/backoff/backoff.go new file mode 100644 index 0000000000..52b5f7851b --- /dev/null +++ b/internal/observability/metrics/backoff/backoff.go @@ -0,0 +1,79 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package backoff + +import ( + "context" + + "github.com/vdaas/vald/internal/backoff" + "github.com/vdaas/vald/internal/observability/metrics" +) + +type backoffMetrics struct { + nameKey metrics.Key + retryCount metrics.Int64Measure +} + +func New() (metrics.Metric, error) { + key, err := metrics.NewKey("backoff_name") + if err != nil { + return nil, err + } + + return &backoffMetrics{ + nameKey: key, + retryCount: *metrics.Int64( + metrics.ValdOrg+"/backoff/retry_count", + "Backoff retry count", + metrics.UnitDimensionless), + }, nil +} + +func (*backoffMetrics) Measurement(_ context.Context) ([]metrics.Measurement, error) { + return []metrics.Measurement{}, nil +} + +func (bm *backoffMetrics) MeasurementWithTags(ctx context.Context) ([]metrics.MeasurementWithTags, error) { + ms := backoff.Metrics(ctx) + if len(ms) == 0 { + return []metrics.MeasurementWithTags{}, nil + } + + mts := make([]metrics.MeasurementWithTags, 0, len(ms)) + for name, cnt := range ms { + mts = append(mts, metrics.MeasurementWithTags{ + Measurement: bm.retryCount.M(cnt), + Tags: map[metrics.Key]string{ + bm.nameKey: name, + }, + }) + } + return mts, nil +} + +func (bm *backoffMetrics) View() []*metrics.View { + return []*metrics.View{ + { + Name: "backoff_retry_count", + Description: bm.retryCount.Description(), + Measure: &bm.retryCount, + TagKeys: []metrics.Key{ + bm.nameKey, + }, + Aggregation: metrics.LastValue(), + }, + } +} diff --git a/internal/runner/runner_race_test.go b/internal/runner/runner_race_test.go index 89a0d379df..30a52a4671 100644 --- a/internal/runner/runner_race_test.go +++ b/internal/runner/runner_race_test.go @@ -31,7 +31,7 @@ import ( "github.com/vdaas/vald/internal/test/goleak" ) -func TestDo(t *testing.T) { +func TestDo_for_race(t *testing.T) { type args struct { ctx context.Context opts []Option diff --git a/pkg/gateway/lb/handler/grpc/handler.go b/pkg/gateway/lb/handler/grpc/handler.go index 298f8bb67a..ca2e0de67e 100644 --- a/pkg/gateway/lb/handler/grpc/handler.go +++ b/pkg/gateway/lb/handler/grpc/handler.go @@ -68,7 +68,7 @@ func New(opts ...Option) vald.Server { } func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (id *payload.Object_ID, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".Exists") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Object/Exists"), apiName+".Exists") defer func() { if span != nil { span.End() @@ -200,7 +200,7 @@ func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (id *paylo } func (s *server) Search(ctx context.Context, req *payload.Search_Request) (res *payload.Search_Response, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".Search") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Search/Search"), apiName+".Search") defer func() { if span != nil { span.End() @@ -265,7 +265,7 @@ func (s *server) Search(ctx context.Context, req *payload.Search_Request) (res * func (s *server) SearchByID(ctx context.Context, req *payload.Search_IDRequest) ( res *payload.Search_Response, err error, ) { - ctx, span := trace.StartSpan(ctx, apiName+".SearchByID") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Search/SearchByID"), apiName+".SearchByID") defer func() { if span != nil { span.End() @@ -915,7 +915,7 @@ func (s *server) MultiSearchByID(ctx context.Context, reqs *payload.Search_Multi } func (s *server) LinearSearch(ctx context.Context, req *payload.Search_Request) (res *payload.Search_Response, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".LinearSearch") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Search/LinearSearch"), apiName+".LinearSearch") defer func() { if span != nil { span.End() @@ -978,7 +978,7 @@ func (s *server) LinearSearch(ctx context.Context, req *payload.Search_Request) func (s *server) LinearSearchByID(ctx context.Context, req *payload.Search_IDRequest) ( res *payload.Search_Response, err error, ) { - ctx, span := trace.StartSpan(ctx, apiName+".LinearSearchByID") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Search/LinearSearchByID"), apiName+".LinearSearchByID") defer func() { if span != nil { span.End() @@ -1347,7 +1347,7 @@ func (s *server) MultiLinearSearchByID(ctx context.Context, reqs *payload.Search } func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (ce *payload.Object_Location, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".Insert") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Insert/Insert"), apiName+".Insert") defer func() { if span != nil { span.End() @@ -1580,7 +1580,7 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) } func (s *server) MultiInsert(ctx context.Context, reqs *payload.Insert_MultiRequest) (locs *payload.Object_Locations, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiInsert") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Insert/MultiInsert"), apiName+".MultiInsert") defer func() { if span != nil { span.End() @@ -2444,7 +2444,7 @@ func (s *server) MultiUpsert(ctx context.Context, reqs *payload.Upsert_MultiRequ } func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (locs *payload.Object_Location, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".Remove") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Remove/Remove"), apiName+".Remove") defer func() { if span != nil { span.End() @@ -2610,7 +2610,7 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) } func (s *server) MultiRemove(ctx context.Context, reqs *payload.Remove_MultiRequest) (locs *payload.Object_Locations, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiRemove") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Remove/MultiRemove"), apiName+".MultiRemove") defer func() { if span != nil { span.End() @@ -2754,7 +2754,7 @@ func (s *server) MultiRemove(ctx context.Context, reqs *payload.Remove_MultiRequ } func (s *server) GetObject(ctx context.Context, req *payload.Object_VectorRequest) (vec *payload.Object_Vector, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".GetObject") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Object/GetObject"), apiName+".GetObject") defer func() { if span != nil { span.End() diff --git a/pkg/gateway/lb/usecase/vald.go b/pkg/gateway/lb/usecase/vald.go index d0f0635700..c10e3fe8da 100644 --- a/pkg/gateway/lb/usecase/vald.go +++ b/pkg/gateway/lb/usecase/vald.go @@ -26,6 +26,7 @@ import ( "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/net/grpc/metric" "github.com/vdaas/vald/internal/observability" + backoffmetrics "github.com/vdaas/vald/internal/observability/metrics/backoff" "github.com/vdaas/vald/internal/runner" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/servers/server" @@ -67,7 +68,11 @@ func New(cfg *config.Data) (r runner.Runner, err error) { var obs observability.Observability if cfg.Observability.Enabled { - obs, err = observability.NewWithConfig(cfg.Observability) + bom, err := backoffmetrics.New() + if err != nil { + return nil, err + } + obs, err = observability.NewWithConfig(cfg.Observability, bom) if err != nil { return nil, err } diff --git a/pkg/manager/index/service/indexer.go b/pkg/manager/index/service/indexer.go index 09b4c98522..5bf1c33b2d 100644 --- a/pkg/manager/index/service/indexer.go +++ b/pkg/manager/index/service/indexer.go @@ -118,7 +118,7 @@ func (idx *index) Start(ctx context.Context) (<-chan error, error) { return finalize() case err = <-dech: case <-it.C: - err = idx.execute(ctx, true, false) + err = idx.execute(grpc.WithGRPCMethod(ctx, "core.v1.Agent/CreateIndex"), true, false) if err != nil { ech <- err log.Error("an error occurred during indexing", err) @@ -126,7 +126,7 @@ func (idx *index) Start(ctx context.Context) (<-chan error, error) { } it.Reset(idx.indexDuration) case <-itl.C: - err = idx.execute(ctx, false, false) + err = idx.execute(grpc.WithGRPCMethod(ctx, "core.v1.Agent/CreateIndex"), false, false) if err != nil { ech <- err log.Error("an error occurred during indexing", err) @@ -134,7 +134,7 @@ func (idx *index) Start(ctx context.Context) (<-chan error, error) { } itl.Reset(idx.indexDurationLimit) case <-stl.C: - err = idx.execute(ctx, false, true) + err = idx.execute(grpc.WithGRPCMethod(ctx, "core.v1.Agent/CreateAndSaveIndex"), false, true) if err != nil { ech <- err log.Error("an error occurred during indexing and saving", err) @@ -159,7 +159,7 @@ func (idx *index) Start(ctx context.Context) (<-chan error, error) { return case addr := <-idx.saveIndexTargetAddrCh: idx.schMap.Delete(addr) - _, err = idx.client.GetClient().Do(ctx, addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (_ interface{}, err error) { + _, err = idx.client.GetClient().Do(grpc.WithGRPCMethod(ctx, "core.v1.Agent/SaveIndex"), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (_ interface{}, err error) { return agent.NewAgentClient(conn).SaveIndex(ctx, &payload.Empty{}, copts...) }) if err != nil { @@ -262,7 +262,7 @@ func (idx *index) waitForNextSaving(ctx context.Context) { } func (idx *index) loadInfos(ctx context.Context) (err error) { - ctx, span := trace.StartSpan(ctx, "vald/manager-index/service/Indexer.loadInfos") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "core.v1.Agent/IndexInfo"), "vald/manager-index/service/Indexer.loadInfos") defer func() { if span != nil { span.End() diff --git a/pkg/manager/index/usecase/indexer.go b/pkg/manager/index/usecase/indexer.go index d6f078cce7..592beb5a50 100644 --- a/pkg/manager/index/usecase/indexer.go +++ b/pkg/manager/index/usecase/indexer.go @@ -26,7 +26,8 @@ import ( "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/net/grpc/metric" "github.com/vdaas/vald/internal/observability" - metrics "github.com/vdaas/vald/internal/observability/metrics/manager/index" + backoffmetrics "github.com/vdaas/vald/internal/observability/metrics/backoff" + indexmetrics "github.com/vdaas/vald/internal/observability/metrics/manager/index" "github.com/vdaas/vald/internal/runner" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/servers/server" @@ -69,7 +70,11 @@ func New(cfg *config.Data) (r runner.Runner, err error) { var obs observability.Observability if cfg.Observability.Enabled { - obs, err = observability.NewWithConfig(cfg.Observability) + bom, err := backoffmetrics.New() + if err != nil { + return nil, err + } + obs, err = observability.NewWithConfig(cfg.Observability, bom) if err != nil { return nil, err } @@ -126,7 +131,7 @@ func New(cfg *config.Data) (r runner.Runner, err error) { if cfg.Observability.Enabled { obs, err = observability.NewWithConfig( cfg.Observability, - metrics.New(indexer), + indexmetrics.New(indexer), ) if err != nil { return nil, err From a2122a09e7d316a13df2b77654900f8fd38fe99e Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 2 Jun 2022 15:45:46 +0900 Subject: [PATCH 2/7] make format Signed-off-by: hlts2 --- pkg/manager/index/service/indexer.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/manager/index/service/indexer.go b/pkg/manager/index/service/indexer.go index 5bf1c33b2d..3d090a9378 100644 --- a/pkg/manager/index/service/indexer.go +++ b/pkg/manager/index/service/indexer.go @@ -159,9 +159,10 @@ func (idx *index) Start(ctx context.Context) (<-chan error, error) { return case addr := <-idx.saveIndexTargetAddrCh: idx.schMap.Delete(addr) - _, err = idx.client.GetClient().Do(grpc.WithGRPCMethod(ctx, "core.v1.Agent/SaveIndex"), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (_ interface{}, err error) { - return agent.NewAgentClient(conn).SaveIndex(ctx, &payload.Empty{}, copts...) - }) + _, err = idx.client.GetClient(). + Do(grpc.WithGRPCMethod(ctx, "core.v1.Agent/SaveIndex"), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (_ interface{}, err error) { + return agent.NewAgentClient(conn).SaveIndex(ctx, &payload.Empty{}, copts...) + }) if err != nil { log.Warnf("an error occurred while calling SaveIndex of %s: %s", addr, err) select { From e7334df1481b6ef758934cbba6e5d119e8db94db Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 2 Jun 2022 15:46:52 +0900 Subject: [PATCH 3/7] make gotests/gen Signed-off-by: hlts2 --- internal/backoff/backoff_test.go | 74 ++++ internal/backoff/context_test.go | 176 +++++++++ internal/net/grpc/context_test.go | 176 +++++++++ .../metrics/backoff/backoff_test.go | 365 ++++++++++++++++++ 4 files changed, 791 insertions(+) create mode 100644 internal/backoff/context_test.go create mode 100644 internal/net/grpc/context_test.go create mode 100644 internal/observability/metrics/backoff/backoff_test.go diff --git a/internal/backoff/backoff_test.go b/internal/backoff/backoff_test.go index 8a2539d9b6..3f776da931 100644 --- a/internal/backoff/backoff_test.go +++ b/internal/backoff/backoff_test.go @@ -617,3 +617,77 @@ func Test_backoff_Do(t *testing.T) { }) } } + +func TestMetrics(t *testing.T) { + type args struct { + in0 context.Context + } + type want struct { + want map[string]int64 + } + type test struct { + name string + args args + want want + checkFunc func(want, map[string]int64) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got map[string]int64) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + in0: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + in0: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, tc := range tests { + test := tc + t.Run(test.name, func(tt *testing.T) { + tt.Parallel() + defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + checkFunc := test.checkFunc + if test.checkFunc == nil { + checkFunc = defaultCheckFunc + } + + got := Metrics(test.args.in0) + if err := checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} diff --git a/internal/backoff/context_test.go b/internal/backoff/context_test.go new file mode 100644 index 0000000000..aa54849898 --- /dev/null +++ b/internal/backoff/context_test.go @@ -0,0 +1,176 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package backoff + +import ( + "context" + "reflect" + "testing" + + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/test/goleak" +) + +func TestWithBackoffName(t *testing.T) { + type args struct { + ctx context.Context + name string + } + type want struct { + want context.Context + } + type test struct { + name string + args args + want want + checkFunc func(want, context.Context) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got context.Context) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + ctx: nil, + name: "", + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + ctx: nil, + name: "", + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, tc := range tests { + test := tc + t.Run(test.name, func(tt *testing.T) { + tt.Parallel() + defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + checkFunc := test.checkFunc + if test.checkFunc == nil { + checkFunc = defaultCheckFunc + } + + got := WithBackoffName(test.args.ctx, test.args.name) + if err := checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func TestFromBackoffName(t *testing.T) { + type args struct { + ctx context.Context + } + type want struct { + want string + } + type test struct { + name string + args args + want want + checkFunc func(want, string) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got string) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + ctx: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + ctx: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, tc := range tests { + test := tc + t.Run(test.name, func(tt *testing.T) { + tt.Parallel() + defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + checkFunc := test.checkFunc + if test.checkFunc == nil { + checkFunc = defaultCheckFunc + } + + got := FromBackoffName(test.args.ctx) + if err := checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} diff --git a/internal/net/grpc/context_test.go b/internal/net/grpc/context_test.go new file mode 100644 index 0000000000..f60fda47ac --- /dev/null +++ b/internal/net/grpc/context_test.go @@ -0,0 +1,176 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package grpc + +import ( + "context" + "reflect" + "testing" + + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/test/goleak" +) + +func TestWithGRPCMethod(t *testing.T) { + type args struct { + ctx context.Context + method string + } + type want struct { + want context.Context + } + type test struct { + name string + args args + want want + checkFunc func(want, context.Context) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got context.Context) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + ctx: nil, + method: "", + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + ctx: nil, + method: "", + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, tc := range tests { + test := tc + t.Run(test.name, func(tt *testing.T) { + tt.Parallel() + defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + checkFunc := test.checkFunc + if test.checkFunc == nil { + checkFunc = defaultCheckFunc + } + + got := WithGRPCMethod(test.args.ctx, test.args.method) + if err := checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func TestFromGRPCMethod(t *testing.T) { + type args struct { + ctx context.Context + } + type want struct { + want string + } + type test struct { + name string + args args + want want + checkFunc func(want, string) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got string) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + ctx: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + ctx: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, tc := range tests { + test := tc + t.Run(test.name, func(tt *testing.T) { + tt.Parallel() + defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + checkFunc := test.checkFunc + if test.checkFunc == nil { + checkFunc = defaultCheckFunc + } + + got := FromGRPCMethod(test.args.ctx) + if err := checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} diff --git a/internal/observability/metrics/backoff/backoff_test.go b/internal/observability/metrics/backoff/backoff_test.go new file mode 100644 index 0000000000..7505b2c9a0 --- /dev/null +++ b/internal/observability/metrics/backoff/backoff_test.go @@ -0,0 +1,365 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package backoff + +import ( + "context" + "reflect" + "testing" + + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/observability/metrics" + "github.com/vdaas/vald/internal/test/goleak" +) + +func TestNew(t *testing.T) { + type want struct { + want metrics.Metric + err error + } + type test struct { + name string + want want + checkFunc func(want, metrics.Metric, error) error + beforeFunc func() + afterFunc func() + } + defaultCheckFunc := func(w want, got metrics.Metric, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, tc := range tests { + test := tc + t.Run(test.name, func(tt *testing.T) { + tt.Parallel() + defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) + if test.beforeFunc != nil { + test.beforeFunc() + } + if test.afterFunc != nil { + defer test.afterFunc() + } + checkFunc := test.checkFunc + if test.checkFunc == nil { + checkFunc = defaultCheckFunc + } + + got, err := New() + if err := checkFunc(test.want, got, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_backoffMetrics_Measurement(t *testing.T) { + type args struct { + in0 context.Context + } + type fields struct { + nameKey metrics.Key + retryCount metrics.Int64Measure + } + type want struct { + want []metrics.Measurement + err error + } + type test struct { + name string + args args + fields fields + want want + checkFunc func(want, []metrics.Measurement, error) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got []metrics.Measurement, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + in0: nil, + }, + fields: fields { + nameKey: nil, + retryCount: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + in0: nil, + }, + fields: fields { + nameKey: nil, + retryCount: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, tc := range tests { + test := tc + t.Run(test.name, func(tt *testing.T) { + tt.Parallel() + defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + checkFunc := test.checkFunc + if test.checkFunc == nil { + checkFunc = defaultCheckFunc + } + b := &backoffMetrics{ + nameKey: test.fields.nameKey, + retryCount: test.fields.retryCount, + } + + got, err := b.Measurement(test.args.in0) + if err := checkFunc(test.want, got, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_backoffMetrics_MeasurementWithTags(t *testing.T) { + type args struct { + ctx context.Context + } + type fields struct { + nameKey metrics.Key + retryCount metrics.Int64Measure + } + type want struct { + want []metrics.MeasurementWithTags + err error + } + type test struct { + name string + args args + fields fields + want want + checkFunc func(want, []metrics.MeasurementWithTags, error) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got []metrics.MeasurementWithTags, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + ctx: nil, + }, + fields: fields { + nameKey: nil, + retryCount: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + ctx: nil, + }, + fields: fields { + nameKey: nil, + retryCount: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, tc := range tests { + test := tc + t.Run(test.name, func(tt *testing.T) { + tt.Parallel() + defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + checkFunc := test.checkFunc + if test.checkFunc == nil { + checkFunc = defaultCheckFunc + } + bm := &backoffMetrics{ + nameKey: test.fields.nameKey, + retryCount: test.fields.retryCount, + } + + got, err := bm.MeasurementWithTags(test.args.ctx) + if err := checkFunc(test.want, got, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_backoffMetrics_View(t *testing.T) { + type fields struct { + nameKey metrics.Key + retryCount metrics.Int64Measure + } + type want struct { + want []*metrics.View + } + type test struct { + name string + fields fields + want want + checkFunc func(want, []*metrics.View) error + beforeFunc func() + afterFunc func() + } + defaultCheckFunc := func(w want, got []*metrics.View) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + fields: fields { + nameKey: nil, + retryCount: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + fields: fields { + nameKey: nil, + retryCount: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, tc := range tests { + test := tc + t.Run(test.name, func(tt *testing.T) { + tt.Parallel() + defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) + if test.beforeFunc != nil { + test.beforeFunc() + } + if test.afterFunc != nil { + defer test.afterFunc() + } + checkFunc := test.checkFunc + if test.checkFunc == nil { + checkFunc = defaultCheckFunc + } + bm := &backoffMetrics{ + nameKey: test.fields.nameKey, + retryCount: test.fields.retryCount, + } + + got := bm.View() + if err := checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} From a249e8468ecf359aeeeba09e455293e5cae6afcc Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 2 Jun 2022 19:21:51 +0900 Subject: [PATCH 4/7] add const variable for grpc method name Signed-off-by: hlts2 --- apis/grpc/v1/vald/vald.go | 62 ++++++++++++++++ pkg/agent/core/ngt/handler/grpc/handler.go | 54 +++++++------- pkg/gateway/filter/handler/grpc/handler.go | 84 +++++++++++----------- pkg/gateway/lb/handler/grpc/handler.go | 54 +++++++------- 4 files changed, 158 insertions(+), 96 deletions(-) diff --git a/apis/grpc/v1/vald/vald.go b/apis/grpc/v1/vald/vald.go index 16dab72fea..67104f3f8b 100644 --- a/apis/grpc/v1/vald/vald.go +++ b/apis/grpc/v1/vald/vald.go @@ -58,11 +58,73 @@ type Client interface { ObjectClient } +const ( + PackageName = "vald.v1" + + InsertRPCServiceName = "Insert" + InsertRPCName = "Insert" + StreamInsertRPCName = "StreamInsert" + MultiInsertRPCName = "MultiInsert" + + UpdateRPCServiceName = "Update" + UpdateRPCName = "Update" + StreamUpdateRPCName = "StreamUpdate" + MultiUpdateRPCName = "MultiUpdate" + + UpsertRPCServiceName = "Upsert" + UpsertRPCName = "Upsert" + StreamUpsertRPCName = "StreamUpsert" + MultiUpsertRPCName = "MultiUpsert" + + SearchRPCServiceName = "Search" + SearchRPCName = "Search" + SearchByIDRPCName = "SearchByID" + StreamSearchRPCName = "StreamSearch" + StreamSearchByIDRPCName = "StreamSearchByID" + MultiSearchRPCName = "MultiSearch" + MultiSearchByIDRPCName = "MultiSearchByID" + LinearSearchRPCName = "LinearSearch" + LinearSearchByIDRPCName = "LinearSearchByID" + StreamLinearSearchRPCName = "StreamLinearSearch" + StreamLinearSearchByIDRPCName = "StreamLinearSearchByID" + MultiLinearSearchRPCName = "MultiLinearSearch" + MultiLinearSearchByIDRPCName = "MultiLinearSearchByID" + + RemoveRPCServiceName = "Remove" + RemoveRPCName = "Remove" + StreamRemoveRPCName = "StreamRemove" + MultiRemoveRPCName = "MultiRemove" + + ObjectRPCServiceName = "Object" + ExistsRPCName = "Exists" + GetObjectRPCName = "GetObject" + StreamGetObjectRPCName = "StreamGetObject" +) + type ClientWithFilter interface { Client FilterClient } +const ( + FilterRPCServiceName = "Filter" + SearchObjectRPCName = "SearchObject" + MultiSearchObjectRPCName = "MultiSearchObject" + LinearSearchObjectRPCName = "LinearSearchObject" + MultiLinearSearchObjectRPCName = "MultiLinearSearchObject" + StreamLinearSearchObjectRPCName = "StreamLinearSearchObject" + StreamSearchObjectRPCName = "StreamSearchObject" + InsertObjectRPCName = "InsertObject" + StreamInsertObjectRPCName = "StreamInsertObject" + MultiInsertObjectRPCName = "MultiInsertObject" + UpdateObjectRPCName = "UpdateObject" + StreamUpdateObjectRPCName = "StreamUpdateObject" + MultiUpdateObjectRPCName = "MultiUpdateObject" + UpsertObjectRPCName = "UpsertObject" + StreamUpsertObjectRPCName = "StreamUpsertObject" + MultiUpsertObjectRPCName = "MultiUpsertObject" +) + type client struct { InsertClient UpdateClient diff --git a/pkg/agent/core/ngt/handler/grpc/handler.go b/pkg/agent/core/ngt/handler/grpc/handler.go index dae0593029..e3e12f209f 100644 --- a/pkg/agent/core/ngt/handler/grpc/handler.go +++ b/pkg/agent/core/ngt/handler/grpc/handler.go @@ -108,7 +108,7 @@ func (s *server) newLocation(uuid string) *payload.Object_Location { } func (s *server) Exists(ctx context.Context, uid *payload.Object_ID) (res *payload.Object_ID, err error) { - _, span := trace.StartSpan(ctx, apiName+".Exists") + _, span := trace.StartSpan(ctx, apiName+"/"+vald.ExistsRPCName) defer func() { if span != nil { span.End() @@ -165,7 +165,7 @@ func (s *server) Exists(ctx context.Context, uid *payload.Object_ID) (res *paylo } func (s *server) Search(ctx context.Context, req *payload.Search_Request) (res *payload.Search_Response, err error) { - _, span := trace.StartSpan(ctx, apiName+".Search") + _, span := trace.StartSpan(ctx, apiName+"/"+vald.SearchRPCName) defer func() { if span != nil { span.End() @@ -287,7 +287,7 @@ func (s *server) Search(ctx context.Context, req *payload.Search_Request) (res * } func (s *server) SearchByID(ctx context.Context, req *payload.Search_IDRequest) (res *payload.Search_Response, err error) { - _, span := trace.StartSpan(ctx, apiName+".SearchByID") + _, span := trace.StartSpan(ctx, apiName+"/"+vald.SearchByIDRPCName) defer func() { if span != nil { span.End() @@ -441,7 +441,7 @@ func toSearchResponse(dists []model.Distance, err error) (res *payload.Search_Re } func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamSearch") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamSearchRPCName) defer func() { if span != nil { span.End() @@ -488,7 +488,7 @@ func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) } func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamSearchByID") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamSearchByIDRPCName) defer func() { if span != nil { span.End() @@ -533,7 +533,7 @@ func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (er } func (s *server) MultiSearch(ctx context.Context, reqs *payload.Search_MultiRequest) (res *payload.Search_Responses, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiSearch") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiSearchRPCName) defer func() { if span != nil { span.End() @@ -603,7 +603,7 @@ func (s *server) MultiSearch(ctx context.Context, reqs *payload.Search_MultiRequ } func (s *server) MultiSearchByID(ctx context.Context, reqs *payload.Search_MultiIDRequest) (res *payload.Search_Responses, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiSearchByID") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiSearchByIDRPCName) defer func() { if span != nil { span.End() @@ -673,7 +673,7 @@ func (s *server) MultiSearchByID(ctx context.Context, reqs *payload.Search_Multi } func (s *server) LinearSearch(ctx context.Context, req *payload.Search_Request) (res *payload.Search_Response, err error) { - _, span := trace.StartSpan(ctx, apiName+".LinearSearch") + _, span := trace.StartSpan(ctx, apiName+"/"+vald.LinearSearchRPCName) defer func() { if span != nil { span.End() @@ -793,7 +793,7 @@ func (s *server) LinearSearch(ctx context.Context, req *payload.Search_Request) } func (s *server) LinearSearchByID(ctx context.Context, req *payload.Search_IDRequest) (res *payload.Search_Response, err error) { - _, span := trace.StartSpan(ctx, apiName+".LinearSearchByID") + _, span := trace.StartSpan(ctx, apiName+"/"+vald.LinearSearchByIDRPCName) defer func() { if span != nil { span.End() @@ -927,7 +927,7 @@ func (s *server) LinearSearchByID(ctx context.Context, req *payload.Search_IDReq } func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamLinearSearch") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamLinearSearchRPCName) defer func() { if span != nil { span.End() @@ -974,7 +974,7 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) } func (s *server) StreamLinearSearchByID(stream vald.Search_StreamLinearSearchByIDServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamLinearSearchByID") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamLinearSearchByIDRPCName) defer func() { if span != nil { span.End() @@ -1019,7 +1019,7 @@ func (s *server) StreamLinearSearchByID(stream vald.Search_StreamLinearSearchByI } func (s *server) MultiLinearSearch(ctx context.Context, reqs *payload.Search_MultiRequest) (res *payload.Search_Responses, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiLinearSearch") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiLinearSearchRPCName) defer func() { if span != nil { span.End() @@ -1089,7 +1089,7 @@ func (s *server) MultiLinearSearch(ctx context.Context, reqs *payload.Search_Mul } func (s *server) MultiLinearSearchByID(ctx context.Context, reqs *payload.Search_MultiIDRequest) (res *payload.Search_Responses, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiLinearSearchByID") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiLinearSearchByIDRPCName) defer func() { if span != nil { span.End() @@ -1160,7 +1160,7 @@ func (s *server) MultiLinearSearchByID(ctx context.Context, reqs *payload.Search // Insert inserts a vector to the NGT. func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (res *payload.Object_Location, err error) { - _, span := trace.StartSpan(ctx, apiName+".Insert") + _, span := trace.StartSpan(ctx, apiName+"/"+vald.InsertRPCName) defer func() { if span != nil { span.End() @@ -1256,7 +1256,7 @@ func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (res * } func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamInsert") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamInsertRPCName) defer func() { if span != nil { span.End() @@ -1302,7 +1302,7 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) } func (s *server) MultiInsert(ctx context.Context, reqs *payload.Insert_MultiRequest) (res *payload.Object_Locations, err error) { - _, span := trace.StartSpan(ctx, apiName+".MultiInsert") + _, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiInsertRPCName) defer func() { if span != nil { span.End() @@ -1406,7 +1406,7 @@ func (s *server) MultiInsert(ctx context.Context, reqs *payload.Insert_MultiRequ } func (s *server) Update(ctx context.Context, req *payload.Update_Request) (res *payload.Object_Location, err error) { - _, span := trace.StartSpan(ctx, apiName+".Update") + _, span := trace.StartSpan(ctx, apiName+"/"+vald.UpdateRPCName) defer func() { if span != nil { span.End() @@ -1531,7 +1531,7 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (res * } func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamUpdate") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamUpdateRPCName) defer func() { if span != nil { span.End() @@ -1577,7 +1577,7 @@ func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) } func (s *server) MultiUpdate(ctx context.Context, reqs *payload.Update_MultiRequest) (res *payload.Object_Locations, err error) { - _, span := trace.StartSpan(ctx, apiName+".MultiUpdate") + _, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiUpdateRPCName) defer func() { if span != nil { span.End() @@ -1711,7 +1711,7 @@ func (s *server) MultiUpdate(ctx context.Context, reqs *payload.Update_MultiRequ } func (s *server) Upsert(ctx context.Context, req *payload.Upsert_Request) (loc *payload.Object_Location, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".Upsert") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.UpsertRPCName) defer func() { if span != nil { span.End() @@ -1808,7 +1808,7 @@ func (s *server) Upsert(ctx context.Context, req *payload.Upsert_Request) (loc * } func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamUpsert") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamUpsertRPCName) defer func() { if span != nil { span.End() @@ -1854,7 +1854,7 @@ func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) } func (s *server) MultiUpsert(ctx context.Context, reqs *payload.Upsert_MultiRequest) (res *payload.Object_Locations, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiUpsert") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiUpsertRPCName) defer func() { if span != nil { span.End() @@ -2004,7 +2004,7 @@ func (s *server) MultiUpsert(ctx context.Context, reqs *payload.Upsert_MultiRequ } func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (res *payload.Object_Location, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".Remove") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.RemoveRPCName) defer func() { if span != nil { span.End() @@ -2094,7 +2094,7 @@ func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (res * } func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamRemove") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamRemoveRPCName) defer func() { if span != nil { span.End() @@ -2140,7 +2140,7 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) } func (s *server) MultiRemove(ctx context.Context, reqs *payload.Remove_MultiRequest) (res *payload.Object_Locations, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiRemove") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiRemoveRPCName) defer func() { if span != nil { span.End() @@ -2215,7 +2215,7 @@ func (s *server) MultiRemove(ctx context.Context, reqs *payload.Remove_MultiRequ } func (s *server) GetObject(ctx context.Context, id *payload.Object_VectorRequest) (res *payload.Object_Vector, err error) { - _, span := trace.StartSpan(ctx, apiName+".GetObject") + _, span := trace.StartSpan(ctx, apiName+"/"+vald.GetObjectRPCName) defer func() { if span != nil { span.End() @@ -2272,7 +2272,7 @@ func (s *server) GetObject(ctx context.Context, id *payload.Object_VectorRequest } func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamGetObject") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamGetObjectRPCName) defer func() { if span != nil { span.End() diff --git a/pkg/gateway/filter/handler/grpc/handler.go b/pkg/gateway/filter/handler/grpc/handler.go index 1728b51cc7..5eae935ccc 100644 --- a/pkg/gateway/filter/handler/grpc/handler.go +++ b/pkg/gateway/filter/handler/grpc/handler.go @@ -69,7 +69,7 @@ func New(opts ...Option) vald.ServerWithFilter { } func (s *server) SearchObject(ctx context.Context, req *payload.Search_ObjectRequest) (*payload.Search_Response, error) { - ctx, span := trace.StartSpan(ctx, apiName+".SearchObject") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.SearchObjectRPCName) defer func() { if span != nil { span.End() @@ -106,7 +106,7 @@ func (s *server) SearchObject(ctx context.Context, req *payload.Search_ObjectReq } func (s *server) MultiSearchObject(ctx context.Context, reqs *payload.Search_MultiObjectRequest) (res *payload.Search_Responses, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiInsertObject") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiSearchObjectRPCName) defer func() { if span != nil { span.End() @@ -151,7 +151,7 @@ func (s *server) MultiSearchObject(ctx context.Context, reqs *payload.Search_Mul } func (s *server) StreamSearchObject(stream vald.Filter_StreamSearchObjectServer) error { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamSearchObject") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamSearchObjectRPCName) defer func() { if span != nil { span.End() @@ -182,7 +182,7 @@ func (s *server) StreamSearchObject(stream vald.Filter_StreamSearchObjectServer) } func (s *server) LinearSearchObject(ctx context.Context, req *payload.Search_ObjectRequest) (*payload.Search_Response, error) { - ctx, span := trace.StartSpan(ctx, apiName+".LinearSearchObject") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.LinearSearchObjectRPCName) defer func() { if span != nil { span.End() @@ -219,7 +219,7 @@ func (s *server) LinearSearchObject(ctx context.Context, req *payload.Search_Obj } func (s *server) MultiLinearSearchObject(ctx context.Context, reqs *payload.Search_MultiObjectRequest) (res *payload.Search_Responses, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiLinearSearchObject") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiLinearSearchObjectRPCName) defer func() { if span != nil { span.End() @@ -264,7 +264,7 @@ func (s *server) MultiLinearSearchObject(ctx context.Context, reqs *payload.Sear } func (s *server) StreamLinearSearchObject(stream vald.Filter_StreamSearchObjectServer) error { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamLinearSearchObject") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamLinearSearchObjectRPCName) defer func() { if span != nil { span.End() @@ -295,7 +295,7 @@ func (s *server) StreamLinearSearchObject(stream vald.Filter_StreamSearchObjectS } func (s *server) InsertObject(ctx context.Context, req *payload.Insert_ObjectRequest) (*payload.Object_Location, error) { - ctx, span := trace.StartSpan(ctx, apiName+".InsertObject") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.InsertObjectRPCName) defer func() { if span != nil { span.End() @@ -333,7 +333,7 @@ func (s *server) InsertObject(ctx context.Context, req *payload.Insert_ObjectReq } func (s *server) StreamInsertObject(stream vald.Filter_StreamInsertObjectServer) error { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamInsertObject") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamInsertObjectRPCName) defer func() { if span != nil { span.End() @@ -365,7 +365,7 @@ func (s *server) StreamInsertObject(stream vald.Filter_StreamInsertObjectServer) } func (s *server) MultiInsertObject(ctx context.Context, reqs *payload.Insert_MultiObjectRequest) (locs *payload.Object_Locations, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiInsertObject") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiInsertObjectRPCName) defer func() { if span != nil { span.End() @@ -410,7 +410,7 @@ func (s *server) MultiInsertObject(ctx context.Context, reqs *payload.Insert_Mul } func (s *server) UpdateObject(ctx context.Context, req *payload.Update_ObjectRequest) (*payload.Object_Location, error) { - ctx, span := trace.StartSpan(ctx, apiName+".UpdateObject") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.UpdateObjectRPCName) defer func() { if span != nil { span.End() @@ -449,7 +449,7 @@ func (s *server) UpdateObject(ctx context.Context, req *payload.Update_ObjectReq } func (s *server) StreamUpdateObject(stream vald.Filter_StreamUpdateObjectServer) error { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamUpdateObject") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamUpdateObjectRPCName) defer func() { if span != nil { span.End() @@ -480,7 +480,7 @@ func (s *server) StreamUpdateObject(stream vald.Filter_StreamUpdateObjectServer) } func (s *server) MultiUpdateObject(ctx context.Context, reqs *payload.Update_MultiObjectRequest) (locs *payload.Object_Locations, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiUpdateObject") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiUpdateObjectRPCName) defer func() { if span != nil { span.End() @@ -525,7 +525,7 @@ func (s *server) MultiUpdateObject(ctx context.Context, reqs *payload.Update_Mul } func (s *server) UpsertObject(ctx context.Context, req *payload.Upsert_ObjectRequest) (*payload.Object_Location, error) { - ctx, span := trace.StartSpan(ctx, apiName+".UpsertObject") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.UpsertObjectRPCName) defer func() { if span != nil { span.End() @@ -563,7 +563,7 @@ func (s *server) UpsertObject(ctx context.Context, req *payload.Upsert_ObjectReq } func (s *server) StreamUpsertObject(stream vald.Filter_StreamUpsertObjectServer) error { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamUpsertObject") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamUpsertObjectRPCName) defer func() { if span != nil { span.End() @@ -594,7 +594,7 @@ func (s *server) StreamUpsertObject(stream vald.Filter_StreamUpsertObjectServer) } func (s *server) MultiUpsertObject(ctx context.Context, reqs *payload.Upsert_MultiObjectRequest) (locs *payload.Object_Locations, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiUpsertObject") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiUpsertObjectRPCName) defer func() { if span != nil { span.End() @@ -639,7 +639,7 @@ func (s *server) MultiUpsertObject(ctx context.Context, reqs *payload.Upsert_Mul } func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (*payload.Object_ID, error) { - ctx, span := trace.StartSpan(ctx, apiName+".Exists") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.ExistsRPCName) defer func() { if span != nil { span.End() @@ -649,7 +649,7 @@ func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (*payload. } func (s *server) Search(ctx context.Context, req *payload.Search_Request) (res *payload.Search_Response, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".Search") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.SearchRPCName) defer func() { if span != nil { span.End() @@ -701,7 +701,7 @@ func (s *server) Search(ctx context.Context, req *payload.Search_Request) (res * } func (s *server) SearchByID(ctx context.Context, req *payload.Search_IDRequest) (res *payload.Search_Response, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".SearchByID") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.SearchByIDRPCName) defer func() { if span != nil { span.End() @@ -734,7 +734,7 @@ func (s *server) SearchByID(ctx context.Context, req *payload.Search_IDRequest) } func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamSearch") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamSearchRPCName) defer func() { if span != nil { span.End() @@ -774,7 +774,7 @@ func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) } func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamSearchByID") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamSearchByIDRPCName) defer func() { if span != nil { span.End() @@ -823,7 +823,7 @@ func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (er } func (s *server) MultiSearch(ctx context.Context, reqs *payload.Search_MultiRequest) (res *payload.Search_Responses, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiSearch") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiSearchRPCName) defer func() { if span != nil { span.End() @@ -867,7 +867,7 @@ func (s *server) MultiSearch(ctx context.Context, reqs *payload.Search_MultiRequ } func (s *server) MultiSearchByID(ctx context.Context, reqs *payload.Search_MultiIDRequest) (res *payload.Search_Responses, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiSearchByID") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiSearchByIDRPCName) defer func() { if span != nil { span.End() @@ -911,7 +911,7 @@ func (s *server) MultiSearchByID(ctx context.Context, reqs *payload.Search_Multi } func (s *server) LinearSearch(ctx context.Context, req *payload.Search_Request) (res *payload.Search_Response, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".LinearSearch") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.LinearSearchRPCName) defer func() { if span != nil { span.End() @@ -963,7 +963,7 @@ func (s *server) LinearSearch(ctx context.Context, req *payload.Search_Request) } func (s *server) LinearSearchByID(ctx context.Context, req *payload.Search_IDRequest) (res *payload.Search_Response, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".LinearSearchByID") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.LinearSearchByIDRPCName) defer func() { if span != nil { span.End() @@ -996,7 +996,7 @@ func (s *server) LinearSearchByID(ctx context.Context, req *payload.Search_IDReq } func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamLinearSearch") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamLinearSearchRPCName) defer func() { if span != nil { span.End() @@ -1036,7 +1036,7 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) } func (s *server) StreamLinearSearchByID(stream vald.Search_StreamLinearSearchByIDServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamLinearSearchByID") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamLinearSearchByIDRPCName) defer func() { if span != nil { span.End() @@ -1085,7 +1085,7 @@ func (s *server) StreamLinearSearchByID(stream vald.Search_StreamLinearSearchByI } func (s *server) MultiLinearSearch(ctx context.Context, reqs *payload.Search_MultiRequest) (res *payload.Search_Responses, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiLinearSearch") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiLinearSearchRPCName) defer func() { if span != nil { span.End() @@ -1129,7 +1129,7 @@ func (s *server) MultiLinearSearch(ctx context.Context, reqs *payload.Search_Mul } func (s *server) MultiLinearSearchByID(ctx context.Context, reqs *payload.Search_MultiIDRequest) (res *payload.Search_Responses, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiLinearSearchByID") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiLinearSearchByIDRPCName) defer func() { if span != nil { span.End() @@ -1173,7 +1173,7 @@ func (s *server) MultiLinearSearchByID(ctx context.Context, reqs *payload.Search } func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (loc *payload.Object_Location, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".Insert") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.InsertRPCName) defer func() { if span != nil { span.End() @@ -1243,7 +1243,7 @@ func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (loc * } func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamInsert") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamInsertRPCName) defer func() { if span != nil { span.End() @@ -1293,7 +1293,7 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) } func (s *server) MultiInsert(ctx context.Context, reqs *payload.Insert_MultiRequest) (locs *payload.Object_Locations, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiInsert") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiInsertRPCName) defer func() { if span != nil { span.End() @@ -1337,7 +1337,7 @@ func (s *server) MultiInsert(ctx context.Context, reqs *payload.Insert_MultiRequ } func (s *server) Update(ctx context.Context, req *payload.Update_Request) (loc *payload.Object_Location, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".Update") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.UpdateRPCName) defer func() { if span != nil { span.End() @@ -1407,7 +1407,7 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (loc * } func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamUpdate") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamUpdateRPCName) defer func() { if span != nil { span.End() @@ -1457,7 +1457,7 @@ func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) } func (s *server) MultiUpdate(ctx context.Context, reqs *payload.Update_MultiRequest) (locs *payload.Object_Locations, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiUpdate") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiUpdateRPCName) defer func() { if span != nil { span.End() @@ -1501,7 +1501,7 @@ func (s *server) MultiUpdate(ctx context.Context, reqs *payload.Update_MultiRequ } func (s *server) Upsert(ctx context.Context, req *payload.Upsert_Request) (loc *payload.Object_Location, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".Upsert") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.UpsertRPCName) defer func() { if span != nil { span.End() @@ -1571,7 +1571,7 @@ func (s *server) Upsert(ctx context.Context, req *payload.Upsert_Request) (loc * } func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamUpsert") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamUpsertRPCName) defer func() { if span != nil { span.End() @@ -1621,7 +1621,7 @@ func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) } func (s *server) MultiUpsert(ctx context.Context, reqs *payload.Upsert_MultiRequest) (locs *payload.Object_Locations, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiUpsert") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiUpsertRPCName) defer func() { if span != nil { span.End() @@ -1665,7 +1665,7 @@ func (s *server) MultiUpsert(ctx context.Context, reqs *payload.Upsert_MultiRequ } func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (loc *payload.Object_Location, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".Remove") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.RemoveRPCName) defer func() { if span != nil { span.End() @@ -1675,7 +1675,7 @@ func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (loc * } func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamRemove") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamRemoveRPCName) defer func() { if span != nil { span.End() @@ -1725,7 +1725,7 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) } func (s *server) MultiRemove(ctx context.Context, reqs *payload.Remove_MultiRequest) (locs *payload.Object_Locations, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiRemove") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiRemoveRPCName) defer func() { if span != nil { span.End() @@ -1769,7 +1769,7 @@ func (s *server) MultiRemove(ctx context.Context, reqs *payload.Remove_MultiRequ } func (s *server) GetObject(ctx context.Context, req *payload.Object_VectorRequest) (vec *payload.Object_Vector, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".GetObject") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.GetObjectRPCName) defer func() { if span != nil { span.End() @@ -1802,7 +1802,7 @@ func (s *server) GetObject(ctx context.Context, req *payload.Object_VectorReques } func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamGetObject") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamGetObjectRPCName) defer func() { if span != nil { span.End() diff --git a/pkg/gateway/lb/handler/grpc/handler.go b/pkg/gateway/lb/handler/grpc/handler.go index ca2e0de67e..6744426a5e 100644 --- a/pkg/gateway/lb/handler/grpc/handler.go +++ b/pkg/gateway/lb/handler/grpc/handler.go @@ -68,7 +68,7 @@ func New(opts ...Option) vald.Server { } func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (id *payload.Object_ID, err error) { - ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Object/Exists"), apiName+".Exists") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.ObjectRPCServiceName+"/"+vald.ExistsRPCName), apiName+"/"+vald.ExistsRPCName) defer func() { if span != nil { span.End() @@ -200,7 +200,7 @@ func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (id *paylo } func (s *server) Search(ctx context.Context, req *payload.Search_Request) (res *payload.Search_Response, err error) { - ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Search/Search"), apiName+".Search") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.SearchRPCServiceName+"/"+vald.SearchRPCName), apiName+"/"+vald.SearchRPCName) defer func() { if span != nil { span.End() @@ -265,7 +265,7 @@ func (s *server) Search(ctx context.Context, req *payload.Search_Request) (res * func (s *server) SearchByID(ctx context.Context, req *payload.Search_IDRequest) ( res *payload.Search_Response, err error, ) { - ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Search/SearchByID"), apiName+".SearchByID") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.SearchRPCServiceName+"/"+vald.SearchByIDRPCName), apiName+"/"+vald.SearchRPCName) defer func() { if span != nil { span.End() @@ -662,7 +662,7 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config, } func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamSearch") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamSearchRPCName) defer func() { if span != nil { span.End() @@ -709,7 +709,7 @@ func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) } func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamSearchByID") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamSearchByIDRPCName) defer func() { if span != nil { span.End() @@ -756,7 +756,7 @@ func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (er } func (s *server) MultiSearch(ctx context.Context, reqs *payload.Search_MultiRequest) (res *payload.Search_Responses, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiSearch") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiSearchRPCName) defer func() { if span != nil { span.End() @@ -845,7 +845,7 @@ func (s *server) MultiSearch(ctx context.Context, reqs *payload.Search_MultiRequ } func (s *server) MultiSearchByID(ctx context.Context, reqs *payload.Search_MultiIDRequest) (res *payload.Search_Responses, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiSearchByID") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiSearchByIDRPCName) defer func() { if span != nil { span.End() @@ -915,7 +915,7 @@ func (s *server) MultiSearchByID(ctx context.Context, reqs *payload.Search_Multi } func (s *server) LinearSearch(ctx context.Context, req *payload.Search_Request) (res *payload.Search_Response, err error) { - ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Search/LinearSearch"), apiName+".LinearSearch") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.SearchRPCServiceName+"/"+vald.LinearSearchRPCName), apiName+"/"+vald.LinearSearchRPCName) defer func() { if span != nil { span.End() @@ -978,7 +978,7 @@ func (s *server) LinearSearch(ctx context.Context, req *payload.Search_Request) func (s *server) LinearSearchByID(ctx context.Context, req *payload.Search_IDRequest) ( res *payload.Search_Response, err error, ) { - ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Search/LinearSearchByID"), apiName+".LinearSearchByID") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.SearchRPCServiceName+"/"+vald.LinearSearchByIDRPCName), apiName+"/"+vald.LinearSearchByIDRPCName) defer func() { if span != nil { span.End() @@ -1094,7 +1094,7 @@ func (s *server) LinearSearchByID(ctx context.Context, req *payload.Search_IDReq } func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamLinearSearch") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamLinearSearchRPCName) defer func() { if span != nil { span.End() @@ -1141,7 +1141,7 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) } func (s *server) StreamLinearSearchByID(stream vald.Search_StreamLinearSearchByIDServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamLinearSearchByID") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamLinearSearchByIDRPCName) defer func() { if span != nil { span.End() @@ -1188,7 +1188,7 @@ func (s *server) StreamLinearSearchByID(stream vald.Search_StreamLinearSearchByI } func (s *server) MultiLinearSearch(ctx context.Context, reqs *payload.Search_MultiRequest) (res *payload.Search_Responses, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiLinearSearch") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiLinearSearchRPCName) defer func() { if span != nil { span.End() @@ -1277,7 +1277,7 @@ func (s *server) MultiLinearSearch(ctx context.Context, reqs *payload.Search_Mul } func (s *server) MultiLinearSearchByID(ctx context.Context, reqs *payload.Search_MultiIDRequest) (res *payload.Search_Responses, errs error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiLinearSearchByID") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiLinearSearchByIDRPCName) defer func() { if span != nil { span.End() @@ -1347,7 +1347,7 @@ func (s *server) MultiLinearSearchByID(ctx context.Context, reqs *payload.Search } func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (ce *payload.Object_Location, err error) { - ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Insert/Insert"), apiName+".Insert") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.InsertRPCServiceName+"/"+vald.InsertRPCName), apiName+"/"+vald.InsertRPCName) defer func() { if span != nil { span.End() @@ -1534,7 +1534,7 @@ func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (ce *p } func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamInsert") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamInsertRPCName) defer func() { if span != nil { span.End() @@ -1580,7 +1580,7 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) } func (s *server) MultiInsert(ctx context.Context, reqs *payload.Insert_MultiRequest) (locs *payload.Object_Locations, err error) { - ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Insert/MultiInsert"), apiName+".MultiInsert") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.InsertRPCServiceName+"/"+vald.MultiInsertRPCName), apiName+"/"+vald.MultiInsertRPCName) defer func() { if span != nil { span.End() @@ -1748,7 +1748,7 @@ func (s *server) MultiInsert(ctx context.Context, reqs *payload.Insert_MultiRequ } func (s *server) Update(ctx context.Context, req *payload.Update_Request) (res *payload.Object_Location, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".Update") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.UpdateRPCName) defer func() { if span != nil { span.End() @@ -1915,7 +1915,7 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (res * } func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamUpdate") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamUpdateRPCName) defer func() { if span != nil { span.End() @@ -1961,7 +1961,7 @@ func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) } func (s *server) MultiUpdate(ctx context.Context, reqs *payload.Update_MultiRequest) (res *payload.Object_Locations, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiUpdate") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiUpdateRPCName) defer func() { if span != nil { span.End() @@ -2099,7 +2099,7 @@ func (s *server) MultiUpdate(ctx context.Context, reqs *payload.Update_MultiRequ } func (s *server) Upsert(ctx context.Context, req *payload.Upsert_Request) (loc *payload.Object_Location, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".Upsert") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.UpsertRPCName) defer func() { if span != nil { span.End() @@ -2231,7 +2231,7 @@ func (s *server) Upsert(ctx context.Context, req *payload.Upsert_Request) (loc * } func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamUpsert") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamUpsertRPCName) defer func() { if span != nil { span.End() @@ -2277,7 +2277,7 @@ func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) } func (s *server) MultiUpsert(ctx context.Context, reqs *payload.Upsert_MultiRequest) (res *payload.Object_Locations, err error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiUpsert") + ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.MultiUpsertRPCName) defer func() { if span != nil { span.End() @@ -2444,7 +2444,7 @@ func (s *server) MultiUpsert(ctx context.Context, reqs *payload.Upsert_MultiRequ } func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (locs *payload.Object_Location, err error) { - ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Remove/Remove"), apiName+".Remove") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.RemoveRPCServiceName+"/"+vald.RemoveRPCName), apiName+"/"+vald.RemoveRPCName) defer func() { if span != nil { span.End() @@ -2564,7 +2564,7 @@ func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (locs } func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamRemove") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamRemoveRPCName) defer func() { if span != nil { span.End() @@ -2610,7 +2610,7 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) } func (s *server) MultiRemove(ctx context.Context, reqs *payload.Remove_MultiRequest) (locs *payload.Object_Locations, err error) { - ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Remove/MultiRemove"), apiName+".MultiRemove") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.RemoveRPCServiceName+"/"+vald.RemoveRPCName), apiName+"/"+vald.MultiRemoveRPCName) defer func() { if span != nil { span.End() @@ -2754,7 +2754,7 @@ func (s *server) MultiRemove(ctx context.Context, reqs *payload.Remove_MultiRequ } func (s *server) GetObject(ctx context.Context, req *payload.Object_VectorRequest) (vec *payload.Object_Vector, err error) { - ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, "vald.v1.Object/GetObject"), apiName+".GetObject") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.ObjectRPCServiceName+"/"+vald.GetObjectRPCName), apiName+"/"+vald.GetObjectRPCName) defer func() { if span != nil { span.End() @@ -2853,7 +2853,7 @@ func (s *server) GetObject(ctx context.Context, req *payload.Object_VectorReques } func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err error) { - ctx, span := trace.StartSpan(stream.Context(), apiName+".StreamGetObject") + ctx, span := trace.StartSpan(stream.Context(), apiName+"/"+vald.StreamGetObjectRPCName) defer func() { if span != nil { span.End() From c37f0ada4438e364cb3cab0a1ffc3af70f6a7ac7 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 2 Jun 2022 20:10:15 +0900 Subject: [PATCH 5/7] fix bug of grpc method name Signed-off-by: hlts2 --- pkg/gateway/lb/handler/grpc/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/gateway/lb/handler/grpc/handler.go b/pkg/gateway/lb/handler/grpc/handler.go index 6744426a5e..0cbb1a49c0 100644 --- a/pkg/gateway/lb/handler/grpc/handler.go +++ b/pkg/gateway/lb/handler/grpc/handler.go @@ -2610,7 +2610,7 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) } func (s *server) MultiRemove(ctx context.Context, reqs *payload.Remove_MultiRequest) (locs *payload.Object_Locations, err error) { - ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.RemoveRPCServiceName+"/"+vald.RemoveRPCName), apiName+"/"+vald.MultiRemoveRPCName) + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.RemoveRPCServiceName+"/"+vald.MultiRemoveRPCName), apiName+"/"+vald.MultiRemoveRPCName) defer func() { if span != nil { span.End() From 8c877983fe0d8abc41a11e1255d51e986037ee9b Mon Sep 17 00:00:00 2001 From: hlts2 Date: Fri, 3 Jun 2022 10:51:44 +0900 Subject: [PATCH 6/7] apply suggestion Signed-off-by: hlts2 --- apis/grpc/v1/vald/vald.go | 55 ++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/apis/grpc/v1/vald/vald.go b/apis/grpc/v1/vald/vald.go index 67104f3f8b..36888317c3 100644 --- a/apis/grpc/v1/vald/vald.go +++ b/apis/grpc/v1/vald/vald.go @@ -58,25 +58,36 @@ type Client interface { ObjectClient } -const ( - PackageName = "vald.v1" +type ClientWithFilter interface { + Client + FilterClient +} - InsertRPCServiceName = "Insert" - InsertRPCName = "Insert" - StreamInsertRPCName = "StreamInsert" - MultiInsertRPCName = "MultiInsert" +const PackageName = "vald.v1" +const ( + InsertRPCServiceName = "Insert" UpdateRPCServiceName = "Update" - UpdateRPCName = "Update" - StreamUpdateRPCName = "StreamUpdate" - MultiUpdateRPCName = "MultiUpdate" - UpsertRPCServiceName = "Upsert" - UpsertRPCName = "Upsert" - StreamUpsertRPCName = "StreamUpsert" - MultiUpsertRPCName = "MultiUpsert" + SearchRPCServiceName = "Search" + RemoveRPCServiceName = "Remove" + ObjectRPCServiceName = "Object" + FilterRPCServiceName = "Filter" +) + +const ( + InsertRPCName = "Insert" + StreamInsertRPCName = "StreamInsert" + MultiInsertRPCName = "MultiInsert" + + UpdateRPCName = "Update" + StreamUpdateRPCName = "StreamUpdate" + MultiUpdateRPCName = "MultiUpdate" + + UpsertRPCName = "Upsert" + StreamUpsertRPCName = "StreamUpsert" + MultiUpsertRPCName = "MultiUpsert" - SearchRPCServiceName = "Search" SearchRPCName = "Search" SearchByIDRPCName = "SearchByID" StreamSearchRPCName = "StreamSearch" @@ -90,24 +101,14 @@ const ( MultiLinearSearchRPCName = "MultiLinearSearch" MultiLinearSearchByIDRPCName = "MultiLinearSearchByID" - RemoveRPCServiceName = "Remove" - RemoveRPCName = "Remove" - StreamRemoveRPCName = "StreamRemove" - MultiRemoveRPCName = "MultiRemove" + RemoveRPCName = "Remove" + StreamRemoveRPCName = "StreamRemove" + MultiRemoveRPCName = "MultiRemove" - ObjectRPCServiceName = "Object" ExistsRPCName = "Exists" GetObjectRPCName = "GetObject" StreamGetObjectRPCName = "StreamGetObject" -) -type ClientWithFilter interface { - Client - FilterClient -} - -const ( - FilterRPCServiceName = "Filter" SearchObjectRPCName = "SearchObject" MultiSearchObjectRPCName = "MultiSearchObject" LinearSearchObjectRPCName = "LinearSearchObject" From be4654021e0998e7d1f1c055a8573bf80456fc76 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Fri, 3 Jun 2022 16:27:35 +0900 Subject: [PATCH 7/7] apply suggestion Signed-off-by: hlts2 --- apis/grpc/v1/vald/vald.go | 79 +++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 40 deletions(-) diff --git a/apis/grpc/v1/vald/vald.go b/apis/grpc/v1/vald/vald.go index 36888317c3..8eae4432e4 100644 --- a/apis/grpc/v1/vald/vald.go +++ b/apis/grpc/v1/vald/vald.go @@ -76,30 +76,45 @@ const ( ) const ( - InsertRPCName = "Insert" - StreamInsertRPCName = "StreamInsert" - MultiInsertRPCName = "MultiInsert" - - UpdateRPCName = "Update" - StreamUpdateRPCName = "StreamUpdate" - MultiUpdateRPCName = "MultiUpdate" - - UpsertRPCName = "Upsert" - StreamUpsertRPCName = "StreamUpsert" - MultiUpsertRPCName = "MultiUpsert" - - SearchRPCName = "Search" - SearchByIDRPCName = "SearchByID" - StreamSearchRPCName = "StreamSearch" - StreamSearchByIDRPCName = "StreamSearchByID" - MultiSearchRPCName = "MultiSearch" - MultiSearchByIDRPCName = "MultiSearchByID" - LinearSearchRPCName = "LinearSearch" - LinearSearchByIDRPCName = "LinearSearchByID" - StreamLinearSearchRPCName = "StreamLinearSearch" - StreamLinearSearchByIDRPCName = "StreamLinearSearchByID" - MultiLinearSearchRPCName = "MultiLinearSearch" - MultiLinearSearchByIDRPCName = "MultiLinearSearchByID" + InsertRPCName = "Insert" + StreamInsertRPCName = "StreamInsert" + MultiInsertRPCName = "MultiInsert" + InsertObjectRPCName = "InsertObject" + StreamInsertObjectRPCName = "StreamInsertObject" + MultiInsertObjectRPCName = "MultiInsertObject" + + UpdateRPCName = "Update" + StreamUpdateRPCName = "StreamUpdate" + MultiUpdateRPCName = "MultiUpdate" + UpdateObjectRPCName = "UpdateObject" + StreamUpdateObjectRPCName = "StreamUpdateObject" + MultiUpdateObjectRPCName = "MultiUpdateObject" + + UpsertRPCName = "Upsert" + StreamUpsertRPCName = "StreamUpsert" + MultiUpsertRPCName = "MultiUpsert" + UpsertObjectRPCName = "UpsertObject" + StreamUpsertObjectRPCName = "StreamUpsertObject" + MultiUpsertObjectRPCName = "MultiUpsertObject" + + SearchRPCName = "Search" + SearchByIDRPCName = "SearchByID" + StreamSearchRPCName = "StreamSearch" + StreamSearchByIDRPCName = "StreamSearchByID" + MultiSearchRPCName = "MultiSearch" + MultiSearchByIDRPCName = "MultiSearchByID" + LinearSearchRPCName = "LinearSearch" + LinearSearchByIDRPCName = "LinearSearchByID" + StreamLinearSearchRPCName = "StreamLinearSearch" + StreamLinearSearchByIDRPCName = "StreamLinearSearchByID" + MultiLinearSearchRPCName = "MultiLinearSearch" + MultiLinearSearchByIDRPCName = "MultiLinearSearchByID" + SearchObjectRPCName = "SearchObject" + MultiSearchObjectRPCName = "MultiSearchObject" + LinearSearchObjectRPCName = "LinearSearchObject" + MultiLinearSearchObjectRPCName = "MultiLinearSearchObject" + StreamLinearSearchObjectRPCName = "StreamLinearSearchObject" + StreamSearchObjectRPCName = "StreamSearchObject" RemoveRPCName = "Remove" StreamRemoveRPCName = "StreamRemove" @@ -108,22 +123,6 @@ const ( ExistsRPCName = "Exists" GetObjectRPCName = "GetObject" StreamGetObjectRPCName = "StreamGetObject" - - SearchObjectRPCName = "SearchObject" - MultiSearchObjectRPCName = "MultiSearchObject" - LinearSearchObjectRPCName = "LinearSearchObject" - MultiLinearSearchObjectRPCName = "MultiLinearSearchObject" - StreamLinearSearchObjectRPCName = "StreamLinearSearchObject" - StreamSearchObjectRPCName = "StreamSearchObject" - InsertObjectRPCName = "InsertObject" - StreamInsertObjectRPCName = "StreamInsertObject" - MultiInsertObjectRPCName = "MultiInsertObject" - UpdateObjectRPCName = "UpdateObject" - StreamUpdateObjectRPCName = "StreamUpdateObject" - MultiUpdateObjectRPCName = "MultiUpdateObject" - UpsertObjectRPCName = "UpsertObject" - StreamUpsertObjectRPCName = "StreamUpsertObject" - MultiUpsertObjectRPCName = "MultiUpsertObject" ) type client struct {