diff --git a/internal/errors/corrector.go b/internal/errors/corrector.go index 757c66820e..5391f3591a 100644 --- a/internal/errors/corrector.go +++ b/internal/errors/corrector.go @@ -20,8 +20,24 @@ package errors // ErrIndexReplicaOne represents an error that nothing to correct when index replica is 1. var ErrIndexReplicaOne = New("nothing to correct when index replica is 1") +// ErrAgentReplicaOne represents an error that nothing to correct when agent replica is 1. +var ErrAgentReplicaOne = New("nothing to correct when agent replica is 1") + // ErrNoAvailableAgentToInsert represents an error that no available agent to insert replica. var ErrNoAvailableAgentToInsert = New("no available agent to insert replica") // ErrFailedToCorrectReplicaNum represents an error that failed to correct replica number after correction process. var ErrFailedToCorrectReplicaNum = New("failed to correct replica number after correction process") + +// ErrFailedToReceiveVectorFromStream represents an error that failed to receive vector from stream while index correction process. +var ErrFailedToReceiveVectorFromStream = New("failed to receive vector from stream") + +// ErrFailedToCheckConsistency represents an error that failed to check consistency process while index correction process. +var ErrFailedToCheckConsistency = func(err error) error { + return Wrap(err, "failed to check consistency while index correctioin process") +} + +// ErrStreamListObjectStreamFinishedUnexpectedly represents an error that StreamListObject finished not because of io.EOF. +var ErrStreamListObjectStreamFinishedUnexpectedly = func(err error) error { + return Wrap(err, "stream list object stream finished unexpectedly") +} diff --git a/internal/errors/errors.go b/internal/errors/errors.go index 3fd58dc0ec..0e6e5b6b43 100644 --- a/internal/errors/errors.go +++ b/internal/errors/errors.go @@ -18,10 +18,12 @@ package errors import ( + "cmp" "errors" "fmt" "reflect" "runtime" + "slices" "strings" "github.com/vdaas/vald/internal/sync" @@ -279,6 +281,16 @@ func Join(errs ...error) error { return e } +func RemoveDuplicates(errs []error) []error { + if len(errs) < 2 { + return errs + } + slices.SortStableFunc(errs, func(l error, r error) int { + return cmp.Compare(l.Error(), r.Error()) + }) + return slices.CompactFunc(errs, Is) +} + type joinError struct { errs []error } diff --git a/internal/errors/errors_test.go b/internal/errors/errors_test.go index 8a6b70e747..32f7e99bef 100644 --- a/internal/errors/errors_test.go +++ b/internal/errors/errors_test.go @@ -1581,6 +1581,74 @@ func TestAs(t *testing.T) { } } +func TestRemoveDuplicates(t *testing.T) { + type args struct { + errs []error + } + tests := []struct { + name string + args args + want []error + }{ + { + name: "succeeds to remove duplicated errors", + args: args{ + errs: []error{ + New("same error1"), + New("same error1"), + New("same error2"), + New("same error2"), + New("same error2"), + New("same error3"), + }, + }, + want: []error{ + New("same error1"), + New("same error2"), + New("same error3"), + }, + }, + { + name: "single error remains the same", + args: args{ + errs: []error{ + New("same error"), + }, + }, + want: []error{ + New("same error"), + }, + }, + { + name: "empty errs remains the same", + args: args{ + errs: []error{}, + }, + want: []error{}, + }, + } + + equalErrs := func(errs1, errs2 []error) bool { + if len(errs1) != len(errs2) { + return false + } + for i := range errs1 { + if !Is(errs1[i], errs2[i]) { + return false + } + } + return true + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := RemoveDuplicates(tt.args.errs); !equalErrs(got, tt.want) { + t.Errorf("removeDuplicatedErrs() = %v, want %v", got, tt.want) + } + }) + } +} + // NOT IMPLEMENTED BELOW // // func TestUnwrap(t *testing.T) { diff --git a/internal/net/grpc/stream.go b/internal/net/grpc/stream.go index 4dc6e56de3..15ebbbd755 100644 --- a/internal/net/grpc/stream.go +++ b/internal/net/grpc/stream.go @@ -18,11 +18,9 @@ package grpc import ( - "cmp" "context" "fmt" "runtime" - "slices" "sync/atomic" "github.com/vdaas/vald/internal/errors" @@ -75,9 +73,7 @@ func BidirectionalStream[Q any, R any](ctx context.Context, stream ServerStream, errs = append(errs, err) emu.Unlock() } - removeDuplicates(errs, func(left, right error) int { - return cmp.Compare(left.Error(), right.Error()) - }) + errs := errors.RemoveDuplicates(errs) emu.Lock() err = errors.Join(errs...) emu.Unlock() @@ -229,11 +225,3 @@ func BidirectionalStreamClient(stream ClientStream, } }() } - -func removeDuplicates[S ~[]E, E comparable](x S, less func(left, right E) int) S { - if len(x) < 2 { - return x - } - slices.SortStableFunc(x, less) - return slices.Compact(x) -} diff --git a/pkg/index/job/correction/service/corrector.go b/pkg/index/job/correction/service/corrector.go index 9dfe5abb91..44f8b834be 100644 --- a/pkg/index/job/correction/service/corrector.go +++ b/pkg/index/job/correction/service/corrector.go @@ -50,7 +50,8 @@ const ( ) type Corrector interface { - Start(ctx context.Context) (<-chan error, error) + Start(ctx context.Context) error + StartClient(ctx context.Context) (<-chan error, error) PreStop(ctx context.Context) error // For metrics NumberOfCheckedIndex() uint64 @@ -89,15 +90,14 @@ func New(cfg *config.Data, discoverer discoverer.Client) (Corrector, error) { }, nil } -func (c *correct) Start(ctx context.Context) (<-chan error, error) { +func (c *correct) StartClient(ctx context.Context) (<-chan error, error) { + return c.discoverer.Start(ctx) +} + +func (c *correct) Start(ctx context.Context) error { // set current time to context ctx = embedTime(ctx) - dech, err := c.discoverer.Start(ctx) - if err != nil { - return nil, err - } - // addrs is sorted by the memory usage of each agent(descending order) // this is decending because it's supposed to be used for index manager to decide // which pod to make a create index rpc(higher memory, first to commit) @@ -106,12 +106,12 @@ func (c *correct) Start(ctx context.Context) (<-chan error, error) { if l := len(c.agentAddrs); l <= 1 { log.Warn("only %d agent found, there must be more than two agents for correction to happen", l) - return nil, err + return errors.ErrAgentReplicaOne } - err = c.loadInfos(ctx) + err := c.loadInfos(ctx) if err != nil { - return nil, err + return err } c.indexInfos.Range(func(addr string, info *payload.Info_Index_Count) bool { @@ -122,11 +122,11 @@ func (c *correct) Start(ctx context.Context) (<-chan error, error) { log.Info("starting correction with bbolt disk cache...") if err := c.correct(ctx); err != nil { log.Errorf("there's some errors while correction: %v", err) - return nil, err + return err } log.Info("correction finished successfully") - return dech, nil + return nil } func (c *correct) PreStop(_ context.Context) error { @@ -161,11 +161,16 @@ func (c *correct) correct(ctx context.Context) (err error) { } curTargetAgent := 0 + jobErrs := make([]error, 0, c.cfg.Corrector.StreamListConcurrency) if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs, - func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error { + func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) (err error) { // current address is the leftAgentAddrs[0] because this is OrderedRange and // leftAgentAddrs is copied from c.agentAddrs defer func() { + if err != nil { + // catch the err that happened in this scope using named return err + jobErrs = append(jobErrs, err) + } curTargetAgent++ }() @@ -181,7 +186,6 @@ func (c *correct) correct(ctx context.Context) (err error) { bconcurrency := c.cfg.Corrector.GetBboltAsyncWriteConcurrency() bolteg.SetLimit(bconcurrency) - var mu sync.Mutex log.Infof("starting correction for agent %s, stream concurrency: %d, bbolt concurrency: %d", addr, sconcurrency, bconcurrency) vc := vald.NewValdClient(conn) @@ -190,6 +194,7 @@ func (c *correct) correct(ctx context.Context) (err error) { return err } + var mu sync.Mutex // The number of items to be received in advance is not known in advance. // This is because there is a possibility of new items being inserted during processing. for { @@ -230,16 +235,14 @@ func (c *correct) correct(ctx context.Context) (err error) { return nil } if err != nil { - log.Errorf("StreamListObject stream finished unexpectedly: %v", err) - return err + return errors.ErrStreamListObjectStreamFinishedUnexpectedly(err) } vec := res.GetVector() if vec == nil { st := res.GetStatus() log.Error(st.GetCode(), st.GetMessage(), st.GetDetails()) - // continue - return nil + return errors.ErrFailedToReceiveVectorFromStream } // skip if the vector is inserted after correction start @@ -256,7 +259,7 @@ func (c *correct) correct(ctx context.Context) (err error) { id := vec.GetId() _, ok, err := c.checkedID.Get([]byte(id)) if err != nil { - log.Errorf("failed to perform Get from bbolt: %v", err) + log.Errorf("failed to perform Get from bbolt but still try to finish processing without cache: %v", err) } if ok { // already checked index @@ -271,8 +274,7 @@ func (c *correct) correct(ctx context.Context) (err error) { }, curTargetAgent, ); err != nil { - log.Errorf("failed to check consistency: %v", err) - return nil // continue other processes + return errors.ErrFailedToCheckConsistency(err) } // now this id is checked so set it to the disk cache @@ -285,11 +287,13 @@ func (c *correct) correct(ctx context.Context) (err error) { } }, ); err != nil { - log.Errorf("failed to range over agents(%v): %v", c.agentAddrs, err) + // This only happnes when ErrGRPCClientConnNotFound is returned. + // In other cases, OrderedRange continues processing, so jobErrrs is used to keep track of the error status of correction. return err } - return nil + jobErrs = errors.RemoveDuplicates(jobErrs) + return errors.Join(jobErrs...) } type vectorReplica struct { diff --git a/pkg/index/job/correction/usecase/corrector.go b/pkg/index/job/correction/usecase/corrector.go index 44907b9c86..896165f6aa 100644 --- a/pkg/index/job/correction/usecase/corrector.go +++ b/pkg/index/job/correction/usecase/corrector.go @@ -145,13 +145,19 @@ func (r *run) PreStart(ctx context.Context) error { func (r *run) Start(ctx context.Context) (<-chan error, error) { log.Info("starting servers") ech := make(chan error, 3) //nolint:gomnd - var oech, nech, sech <-chan error + var oech <-chan error + if r.observability != nil { + oech = r.observability.Start(ctx) + } + sech := r.server.ListenAndServe(ctx) + nech, err := r.corrector.StartClient(ctx) + if err != nil { + close(ech) + return nil, err + } + r.eg.Go(safety.RecoverFunc(func() (err error) { defer close(ech) - if r.observability != nil { - oech = r.observability.Start(ctx) - } - sech = r.server.ListenAndServe(ctx) for { select { case <-ctx.Done(): @@ -189,7 +195,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { }() start := time.Now() - _, err = r.corrector.Start(ctx) + err = r.corrector.Start(ctx) if err != nil { log.Errorf("index correction process failed: %v", err) return err