Skip to content

Commit

Permalink
Fix duplicate make command (#2165)
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <[email protected]>
  • Loading branch information
kpango committed Sep 6, 2023
1 parent ac21880 commit d91c758
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 137 deletions.
2 changes: 1 addition & 1 deletion hack/license/gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (
{{.Escape}} Copyright (C) 2019-{{.Year}} {{.Maintainer}}
{{.Escape}}
{{.Escape}} Licensed under the Apache License, Version 2.0 (the "License");
{{.Escape}} you may not use this file except in compliance with the License.
{{.Escape}} You may not use this file except in compliance with the License.
{{.Escape}} You may obtain a copy of the License at
{{.Escape}}
{{.Escape}} https://www.apache.org/licenses/LICENSE-2.0
Expand Down
3 changes: 3 additions & 0 deletions internal/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ func Join(errs ...error) error {
case 2:
switch {
case errs[0] != nil && errs[1] != nil:
if errs[0] == errs[1] || errors.Is(errs[0], errs[1]) {
return errs[0]
}
var es []error
switch x := errs[1].(type) {
case *joinError:
Expand Down
3 changes: 2 additions & 1 deletion internal/strings/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package strings
import (
"bytes"
"strings"
"syscall"

"github.com/vdaas/vald/internal/sync"
)
Expand Down Expand Up @@ -54,7 +55,7 @@ var (

bufferPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 1024))
return bytes.NewBuffer(make([]byte, 0, syscall.Getpagesize()))
},
}
)
Expand Down
264 changes: 129 additions & 135 deletions pkg/gateway/lb/handler/grpc/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ import (
"github.com/vdaas/vald/internal/net/grpc/errdetails"
"github.com/vdaas/vald/internal/net/grpc/status"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/slices"
"github.com/vdaas/vald/internal/sync"
"github.com/vdaas/vald/internal/sync/errgroup"
)

type Aggregator interface {
Expand All @@ -61,78 +59,141 @@ func (s *server) aggregationSearch(ctx context.Context, aggr Aggregator, cfg *pa

num := int(cfg.GetNum())
min := int(cfg.GetMinNum())
eg, ectx := errgroup.New(ctx)
var cancel context.CancelFunc
var timeout time.Duration
if to := cfg.GetTimeout(); to != 0 {
timeout = time.Duration(to)
} else {
timeout = s.timeout
}

ectx, cancel = context.WithTimeout(ectx, timeout)
aggr.Start(ectx)
eg.Go(safety.RecoverFunc(func() error {
defer cancel()
return s.gateway.BroadCast(ectx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
sctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/aggregationSearch/"+target)
defer func() {
ctx, cancel := context.WithTimeout(ctx, timeout)
aggr.Start(ctx)
err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
sctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/aggregationSearch/"+target)
defer func() {
if sspan != nil {
sspan.End()
}
}()
r, err := f(sctx, vc, copts...)
if err != nil {
switch {
case errors.Is(err, context.Canceled),
errors.Is(err, errors.ErrRPCCallFailed(target, context.Canceled)):
if sspan != nil {
sspan.End()
sspan.RecordError(err)
sspan.SetAttributes(trace.StatusCodeCancelled(
errdetails.ValdGRPCResourceTypePrefix +
"/vald.v1.search.BroadCast/" +
target + " canceled: " + err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
}()
r, err := f(sctx, vc, copts...)
if err != nil {
switch {
case errors.Is(err, context.Canceled),
errors.Is(err, errors.ErrRPCCallFailed(target, context.Canceled)):
if sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.StatusCodeCancelled(
errdetails.ValdGRPCResourceTypePrefix +
"/vald.v1.search.BroadCast/" +
target + " canceled: " + err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
case errors.Is(err, context.DeadlineExceeded),
errors.Is(err, errors.ErrRPCCallFailed(target, context.DeadlineExceeded)):
if sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.StatusCodeDeadlineExceeded(
errdetails.ValdGRPCResourceTypePrefix +
"/vald.v1.search.BroadCast/" +
target + " deadline_exceeded: " + err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
default:
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse search gRPC error response",
&errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search",
ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target),
})
if sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
sspan.SetStatus(trace.StatusError, err.Error())
}
switch st.Code() {
case codes.Internal,
codes.Unavailable,
codes.ResourceExhausted:
log.Warn(err)
return err
case codes.NotFound,
codes.Aborted,
codes.InvalidArgument:
return nil
}
return nil
case errors.Is(err, context.DeadlineExceeded),
errors.Is(err, errors.ErrRPCCallFailed(target, context.DeadlineExceeded)):
if sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.StatusCodeDeadlineExceeded(
errdetails.ValdGRPCResourceTypePrefix +
"/vald.v1.search.BroadCast/" +
target + " deadline_exceeded: " + err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
log.Debug(err)
return nil
default:
st, msg, err := status.ParseError(err, codes.Unknown, "failed to parse search gRPC error response",
&errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search",
ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target),
})
if sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
sspan.SetStatus(trace.StatusError, err.Error())
}
switch st.Code() {
case codes.Internal,
codes.Unavailable,
codes.ResourceExhausted:
log.Warn(err)
return err
case codes.NotFound,
codes.Aborted,
codes.InvalidArgument:
return nil
}
}
if r == nil || len(r.GetResults()) == 0 {
select {
case <-sctx.Done():
log.Debug(err)
return nil
}
if r == nil || len(r.GetResults()) == 0 {
select {
case <-sctx.Done():
err = status.WrapWithNotFound("failed to process search request", errors.ErrEmptySearchResult,
&errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search",
ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target),
})
if sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.StatusCodeNotFound(err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
log.Debug(err)
return nil
default:
r, err = f(sctx, vc, copts...)
if err != nil {
switch {
case errors.Is(err, context.Canceled),
errors.Is(err, errors.ErrRPCCallFailed(target, context.Canceled)):
if sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.StatusCodeCancelled(
errdetails.ValdGRPCResourceTypePrefix +
"/vald.v1.search.BroadCast/" +
target + " canceled: " + err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
return nil
case errors.Is(err, context.DeadlineExceeded),
errors.Is(err, errors.ErrRPCCallFailed(target, context.DeadlineExceeded)):
if sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.StatusCodeDeadlineExceeded(
errdetails.ValdGRPCResourceTypePrefix +
"/vald.v1.search.BroadCast/" +
target + " deadline_exceeded: " + err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
return nil
default:
st, msg, err := status.ParseError(err, codes.Unknown, "failed to parse search gRPC error response",
&errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search",
ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target),
})
if sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
sspan.SetStatus(trace.StatusError, err.Error())
}
switch st.Code() {
case codes.Internal,
codes.Unavailable,
codes.ResourceExhausted:
log.Warn(err)
return err
case codes.NotFound,
codes.Aborted,
codes.InvalidArgument:
return nil
}
}
log.Debug(err)
return nil
}
if r == nil || len(r.GetResults()) == 0 {
err = status.WrapWithNotFound("failed to process search request", errors.ErrEmptySearchResult,
&errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search",
Expand All @@ -145,80 +206,13 @@ func (s *server) aggregationSearch(ctx context.Context, aggr Aggregator, cfg *pa
}
log.Debug(err)
return nil
default:
r, err = f(sctx, vc, copts...)
if err != nil {
switch {
case errors.Is(err, context.Canceled),
errors.Is(err, errors.ErrRPCCallFailed(target, context.Canceled)):
if sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.StatusCodeCancelled(
errdetails.ValdGRPCResourceTypePrefix +
"/vald.v1.search.BroadCast/" +
target + " canceled: " + err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
case errors.Is(err, context.DeadlineExceeded),
errors.Is(err, errors.ErrRPCCallFailed(target, context.DeadlineExceeded)):
if sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.StatusCodeDeadlineExceeded(
errdetails.ValdGRPCResourceTypePrefix +
"/vald.v1.search.BroadCast/" +
target + " deadline_exceeded: " + err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
default:
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse search gRPC error response",
&errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search",
ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target),
})
if sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
sspan.SetStatus(trace.StatusError, err.Error())
}
switch st.Code() {
case codes.Internal,
codes.Unavailable,
codes.ResourceExhausted:
log.Warn(err)
return err
case codes.NotFound,
codes.Aborted,
codes.InvalidArgument:
return nil
}
}
log.Debug(err)
return nil
}
if r == nil || len(r.GetResults()) == 0 {
err = status.WrapWithNotFound("failed to process search request", errors.ErrEmptySearchResult,
&errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search",
ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target),
})
if sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.StatusCodeNotFound(err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
log.Debug(err)
return nil
}
}
}
aggr.Send(sctx, r)
return nil
})
}))

<-ectx.Done() // Blocking here

err = eg.Wait()
}
aggr.Send(sctx, r)
return nil
})
cancel()
if errors.Is(err, errors.ErrGRPCClientConnNotFound("*")) {
err = status.WrapWithInternal("search API connection not found", err,
&errdetails.RequestInfo{
Expand All @@ -241,7 +235,7 @@ func (s *server) aggregationSearch(ctx context.Context, aggr Aggregator, cfg *pa
res.Results = res.GetResults()[:num]
}

if errors.Is(ectx.Err(), context.DeadlineExceeded) {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
if len(res.GetResults()) == 0 {
err = status.WrapWithDeadlineExceeded(
"error search result length is 0 due to the timeoutage limit",
Expand Down

0 comments on commit d91c758

Please sign in to comment.