From d91c758b6734fad973e543328bf2cb9e9f8b5fb6 Mon Sep 17 00:00:00 2001 From: kpango Date: Wed, 6 Sep 2023 14:24:50 +0900 Subject: [PATCH] Fix duplicate make command (#2165) Signed-off-by: kpango --- hack/license/gen/main.go | 2 +- internal/errors/errors.go | 3 + internal/strings/strings.go | 3 +- pkg/gateway/lb/handler/grpc/aggregation.go | 264 ++++++++++----------- 4 files changed, 135 insertions(+), 137 deletions(-) diff --git a/hack/license/gen/main.go b/hack/license/gen/main.go index a7fbcc72df0..23bb40fe3fd 100644 --- a/hack/license/gen/main.go +++ b/hack/license/gen/main.go @@ -38,7 +38,7 @@ var ( {{.Escape}} Copyright (C) 2019-{{.Year}} {{.Maintainer}} {{.Escape}} {{.Escape}} Licensed under the Apache License, Version 2.0 (the "License"); -{{.Escape}} you may not use this file except in compliance with the License. +{{.Escape}} You may not use this file except in compliance with the License. {{.Escape}} You may obtain a copy of the License at {{.Escape}} {{.Escape}} https://www.apache.org/licenses/LICENSE-2.0 diff --git a/internal/errors/errors.go b/internal/errors/errors.go index d4647986f8e..bfde2cdc8ee 100644 --- a/internal/errors/errors.go +++ b/internal/errors/errors.go @@ -230,6 +230,9 @@ func Join(errs ...error) error { case 2: switch { case errs[0] != nil && errs[1] != nil: + if errs[0] == errs[1] || errors.Is(errs[0], errs[1]) { + return errs[0] + } var es []error switch x := errs[1].(type) { case *joinError: diff --git a/internal/strings/strings.go b/internal/strings/strings.go index 2031fdefe86..540a4ea7918 100644 --- a/internal/strings/strings.go +++ b/internal/strings/strings.go @@ -16,6 +16,7 @@ package strings import ( "bytes" "strings" + "syscall" "github.com/vdaas/vald/internal/sync" ) @@ -54,7 +55,7 @@ var ( bufferPool = sync.Pool{ New: func() interface{} { - return bytes.NewBuffer(make([]byte, 0, 1024)) + return bytes.NewBuffer(make([]byte, 0, syscall.Getpagesize())) }, } ) diff --git a/pkg/gateway/lb/handler/grpc/aggregation.go b/pkg/gateway/lb/handler/grpc/aggregation.go index 702abc34e67..c725f9f3b4c 100644 --- a/pkg/gateway/lb/handler/grpc/aggregation.go +++ b/pkg/gateway/lb/handler/grpc/aggregation.go @@ -31,10 +31,8 @@ import ( "github.com/vdaas/vald/internal/net/grpc/errdetails" "github.com/vdaas/vald/internal/net/grpc/status" "github.com/vdaas/vald/internal/observability/trace" - "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/slices" "github.com/vdaas/vald/internal/sync" - "github.com/vdaas/vald/internal/sync/errgroup" ) type Aggregator interface { @@ -61,8 +59,6 @@ func (s *server) aggregationSearch(ctx context.Context, aggr Aggregator, cfg *pa num := int(cfg.GetNum()) min := int(cfg.GetMinNum()) - eg, ectx := errgroup.New(ctx) - var cancel context.CancelFunc var timeout time.Duration if to := cfg.GetTimeout(); to != 0 { timeout = time.Duration(to) @@ -70,69 +66,134 @@ func (s *server) aggregationSearch(ctx context.Context, aggr Aggregator, cfg *pa timeout = s.timeout } - ectx, cancel = context.WithTimeout(ectx, timeout) - aggr.Start(ectx) - eg.Go(safety.RecoverFunc(func() error { - defer cancel() - return s.gateway.BroadCast(ectx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { - sctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/aggregationSearch/"+target) - defer func() { + ctx, cancel := context.WithTimeout(ctx, timeout) + aggr.Start(ctx) + err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { + sctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/aggregationSearch/"+target) + defer func() { + if sspan != nil { + sspan.End() + } + }() + r, err := f(sctx, vc, copts...) + if err != nil { + switch { + case errors.Is(err, context.Canceled), + errors.Is(err, errors.ErrRPCCallFailed(target, context.Canceled)): if sspan != nil { - sspan.End() + sspan.RecordError(err) + sspan.SetAttributes(trace.StatusCodeCancelled( + errdetails.ValdGRPCResourceTypePrefix + + "/vald.v1.search.BroadCast/" + + target + " canceled: " + err.Error())...) + sspan.SetStatus(trace.StatusError, err.Error()) } - }() - r, err := f(sctx, vc, copts...) - if err != nil { - switch { - case errors.Is(err, context.Canceled), - errors.Is(err, errors.ErrRPCCallFailed(target, context.Canceled)): - if sspan != nil { - sspan.RecordError(err) - sspan.SetAttributes(trace.StatusCodeCancelled( - errdetails.ValdGRPCResourceTypePrefix + - "/vald.v1.search.BroadCast/" + - target + " canceled: " + err.Error())...) - sspan.SetStatus(trace.StatusError, err.Error()) - } - case errors.Is(err, context.DeadlineExceeded), - errors.Is(err, errors.ErrRPCCallFailed(target, context.DeadlineExceeded)): - if sspan != nil { - sspan.RecordError(err) - sspan.SetAttributes(trace.StatusCodeDeadlineExceeded( - errdetails.ValdGRPCResourceTypePrefix + - "/vald.v1.search.BroadCast/" + - target + " deadline_exceeded: " + err.Error())...) - sspan.SetStatus(trace.StatusError, err.Error()) - } - default: - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse search gRPC error response", - &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search", - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), - }) - if sspan != nil { - sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) - sspan.SetStatus(trace.StatusError, err.Error()) - } - switch st.Code() { - case codes.Internal, - codes.Unavailable, - codes.ResourceExhausted: - log.Warn(err) - return err - case codes.NotFound, - codes.Aborted, - codes.InvalidArgument: - return nil - } + return nil + case errors.Is(err, context.DeadlineExceeded), + errors.Is(err, errors.ErrRPCCallFailed(target, context.DeadlineExceeded)): + if sspan != nil { + sspan.RecordError(err) + sspan.SetAttributes(trace.StatusCodeDeadlineExceeded( + errdetails.ValdGRPCResourceTypePrefix + + "/vald.v1.search.BroadCast/" + + target + " deadline_exceeded: " + err.Error())...) + sspan.SetStatus(trace.StatusError, err.Error()) } - log.Debug(err) return nil + default: + st, msg, err := status.ParseError(err, codes.Unknown, "failed to parse search gRPC error response", + &errdetails.ResourceInfo{ + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search", + ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), + }) + if sspan != nil { + sspan.RecordError(err) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetStatus(trace.StatusError, err.Error()) + } + switch st.Code() { + case codes.Internal, + codes.Unavailable, + codes.ResourceExhausted: + log.Warn(err) + return err + case codes.NotFound, + codes.Aborted, + codes.InvalidArgument: + return nil + } } - if r == nil || len(r.GetResults()) == 0 { - select { - case <-sctx.Done(): + log.Debug(err) + return nil + } + if r == nil || len(r.GetResults()) == 0 { + select { + case <-sctx.Done(): + err = status.WrapWithNotFound("failed to process search request", errors.ErrEmptySearchResult, + &errdetails.ResourceInfo{ + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search", + ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), + }) + if sspan != nil { + sspan.RecordError(err) + sspan.SetAttributes(trace.StatusCodeNotFound(err.Error())...) + sspan.SetStatus(trace.StatusError, err.Error()) + } + log.Debug(err) + return nil + default: + r, err = f(sctx, vc, copts...) + if err != nil { + switch { + case errors.Is(err, context.Canceled), + errors.Is(err, errors.ErrRPCCallFailed(target, context.Canceled)): + if sspan != nil { + sspan.RecordError(err) + sspan.SetAttributes(trace.StatusCodeCancelled( + errdetails.ValdGRPCResourceTypePrefix + + "/vald.v1.search.BroadCast/" + + target + " canceled: " + err.Error())...) + sspan.SetStatus(trace.StatusError, err.Error()) + } + return nil + case errors.Is(err, context.DeadlineExceeded), + errors.Is(err, errors.ErrRPCCallFailed(target, context.DeadlineExceeded)): + if sspan != nil { + sspan.RecordError(err) + sspan.SetAttributes(trace.StatusCodeDeadlineExceeded( + errdetails.ValdGRPCResourceTypePrefix + + "/vald.v1.search.BroadCast/" + + target + " deadline_exceeded: " + err.Error())...) + sspan.SetStatus(trace.StatusError, err.Error()) + } + return nil + default: + st, msg, err := status.ParseError(err, codes.Unknown, "failed to parse search gRPC error response", + &errdetails.ResourceInfo{ + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search", + ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), + }) + if sspan != nil { + sspan.RecordError(err) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetStatus(trace.StatusError, err.Error()) + } + switch st.Code() { + case codes.Internal, + codes.Unavailable, + codes.ResourceExhausted: + log.Warn(err) + return err + case codes.NotFound, + codes.Aborted, + codes.InvalidArgument: + return nil + } + } + log.Debug(err) + return nil + } + if r == nil || len(r.GetResults()) == 0 { err = status.WrapWithNotFound("failed to process search request", errors.ErrEmptySearchResult, &errdetails.ResourceInfo{ ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search", @@ -145,80 +206,13 @@ func (s *server) aggregationSearch(ctx context.Context, aggr Aggregator, cfg *pa } log.Debug(err) return nil - default: - r, err = f(sctx, vc, copts...) - if err != nil { - switch { - case errors.Is(err, context.Canceled), - errors.Is(err, errors.ErrRPCCallFailed(target, context.Canceled)): - if sspan != nil { - sspan.RecordError(err) - sspan.SetAttributes(trace.StatusCodeCancelled( - errdetails.ValdGRPCResourceTypePrefix + - "/vald.v1.search.BroadCast/" + - target + " canceled: " + err.Error())...) - sspan.SetStatus(trace.StatusError, err.Error()) - } - case errors.Is(err, context.DeadlineExceeded), - errors.Is(err, errors.ErrRPCCallFailed(target, context.DeadlineExceeded)): - if sspan != nil { - sspan.RecordError(err) - sspan.SetAttributes(trace.StatusCodeDeadlineExceeded( - errdetails.ValdGRPCResourceTypePrefix + - "/vald.v1.search.BroadCast/" + - target + " deadline_exceeded: " + err.Error())...) - sspan.SetStatus(trace.StatusError, err.Error()) - } - default: - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse search gRPC error response", - &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search", - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), - }) - if sspan != nil { - sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) - sspan.SetStatus(trace.StatusError, err.Error()) - } - switch st.Code() { - case codes.Internal, - codes.Unavailable, - codes.ResourceExhausted: - log.Warn(err) - return err - case codes.NotFound, - codes.Aborted, - codes.InvalidArgument: - return nil - } - } - log.Debug(err) - return nil - } - if r == nil || len(r.GetResults()) == 0 { - err = status.WrapWithNotFound("failed to process search request", errors.ErrEmptySearchResult, - &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search", - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), - }) - if sspan != nil { - sspan.RecordError(err) - sspan.SetAttributes(trace.StatusCodeNotFound(err.Error())...) - sspan.SetStatus(trace.StatusError, err.Error()) - } - log.Debug(err) - return nil - } } } - aggr.Send(sctx, r) - return nil - }) - })) - - <-ectx.Done() // Blocking here - - err = eg.Wait() + } + aggr.Send(sctx, r) + return nil + }) + cancel() if errors.Is(err, errors.ErrGRPCClientConnNotFound("*")) { err = status.WrapWithInternal("search API connection not found", err, &errdetails.RequestInfo{ @@ -241,7 +235,7 @@ func (s *server) aggregationSearch(ctx context.Context, aggr Aggregator, cfg *pa res.Results = res.GetResults()[:num] } - if errors.Is(ectx.Err(), context.DeadlineExceeded) { + if errors.Is(ctx.Err(), context.DeadlineExceeded) { if len(res.GetResults()) == 0 { err = status.WrapWithDeadlineExceeded( "error search result length is 0 due to the timeoutage limit",