diff --git a/internal/errors/corrector.go b/internal/errors/corrector.go index 5391f3591a..757c66820e 100644 --- a/internal/errors/corrector.go +++ b/internal/errors/corrector.go @@ -20,24 +20,8 @@ package errors // ErrIndexReplicaOne represents an error that nothing to correct when index replica is 1. var ErrIndexReplicaOne = New("nothing to correct when index replica is 1") -// ErrAgentReplicaOne represents an error that nothing to correct when agent replica is 1. -var ErrAgentReplicaOne = New("nothing to correct when agent replica is 1") - // ErrNoAvailableAgentToInsert represents an error that no available agent to insert replica. var ErrNoAvailableAgentToInsert = New("no available agent to insert replica") // ErrFailedToCorrectReplicaNum represents an error that failed to correct replica number after correction process. var ErrFailedToCorrectReplicaNum = New("failed to correct replica number after correction process") - -// ErrFailedToReceiveVectorFromStream represents an error that failed to receive vector from stream while index correction process. -var ErrFailedToReceiveVectorFromStream = New("failed to receive vector from stream") - -// ErrFailedToCheckConsistency represents an error that failed to check consistency process while index correction process. -var ErrFailedToCheckConsistency = func(err error) error { - return Wrap(err, "failed to check consistency while index correctioin process") -} - -// ErrStreamListObjectStreamFinishedUnexpectedly represents an error that StreamListObject finished not because of io.EOF. -var ErrStreamListObjectStreamFinishedUnexpectedly = func(err error) error { - return Wrap(err, "stream list object stream finished unexpectedly") -} diff --git a/internal/errors/errors.go b/internal/errors/errors.go index 0e6e5b6b43..3fd58dc0ec 100644 --- a/internal/errors/errors.go +++ b/internal/errors/errors.go @@ -18,12 +18,10 @@ package errors import ( - "cmp" "errors" "fmt" "reflect" "runtime" - "slices" "strings" "github.com/vdaas/vald/internal/sync" @@ -281,16 +279,6 @@ func Join(errs ...error) error { return e } -func RemoveDuplicates(errs []error) []error { - if len(errs) < 2 { - return errs - } - slices.SortStableFunc(errs, func(l error, r error) int { - return cmp.Compare(l.Error(), r.Error()) - }) - return slices.CompactFunc(errs, Is) -} - type joinError struct { errs []error } diff --git a/internal/errors/errors_test.go b/internal/errors/errors_test.go index 32f7e99bef..8a6b70e747 100644 --- a/internal/errors/errors_test.go +++ b/internal/errors/errors_test.go @@ -1581,74 +1581,6 @@ func TestAs(t *testing.T) { } } -func TestRemoveDuplicates(t *testing.T) { - type args struct { - errs []error - } - tests := []struct { - name string - args args - want []error - }{ - { - name: "succeeds to remove duplicated errors", - args: args{ - errs: []error{ - New("same error1"), - New("same error1"), - New("same error2"), - New("same error2"), - New("same error2"), - New("same error3"), - }, - }, - want: []error{ - New("same error1"), - New("same error2"), - New("same error3"), - }, - }, - { - name: "single error remains the same", - args: args{ - errs: []error{ - New("same error"), - }, - }, - want: []error{ - New("same error"), - }, - }, - { - name: "empty errs remains the same", - args: args{ - errs: []error{}, - }, - want: []error{}, - }, - } - - equalErrs := func(errs1, errs2 []error) bool { - if len(errs1) != len(errs2) { - return false - } - for i := range errs1 { - if !Is(errs1[i], errs2[i]) { - return false - } - } - return true - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := RemoveDuplicates(tt.args.errs); !equalErrs(got, tt.want) { - t.Errorf("removeDuplicatedErrs() = %v, want %v", got, tt.want) - } - }) - } -} - // NOT IMPLEMENTED BELOW // // func TestUnwrap(t *testing.T) { diff --git a/internal/net/grpc/stream.go b/internal/net/grpc/stream.go index 15ebbbd755..4dc6e56de3 100644 --- a/internal/net/grpc/stream.go +++ b/internal/net/grpc/stream.go @@ -18,9 +18,11 @@ package grpc import ( + "cmp" "context" "fmt" "runtime" + "slices" "sync/atomic" "github.com/vdaas/vald/internal/errors" @@ -73,7 +75,9 @@ func BidirectionalStream[Q any, R any](ctx context.Context, stream ServerStream, errs = append(errs, err) emu.Unlock() } - errs := errors.RemoveDuplicates(errs) + removeDuplicates(errs, func(left, right error) int { + return cmp.Compare(left.Error(), right.Error()) + }) emu.Lock() err = errors.Join(errs...) emu.Unlock() @@ -225,3 +229,11 @@ func BidirectionalStreamClient(stream ClientStream, } }() } + +func removeDuplicates[S ~[]E, E comparable](x S, less func(left, right E) int) S { + if len(x) < 2 { + return x + } + slices.SortStableFunc(x, less) + return slices.Compact(x) +} diff --git a/pkg/index/job/correction/service/corrector.go b/pkg/index/job/correction/service/corrector.go index 44f8b834be..9dfe5abb91 100644 --- a/pkg/index/job/correction/service/corrector.go +++ b/pkg/index/job/correction/service/corrector.go @@ -50,8 +50,7 @@ const ( ) type Corrector interface { - Start(ctx context.Context) error - StartClient(ctx context.Context) (<-chan error, error) + Start(ctx context.Context) (<-chan error, error) PreStop(ctx context.Context) error // For metrics NumberOfCheckedIndex() uint64 @@ -90,14 +89,15 @@ func New(cfg *config.Data, discoverer discoverer.Client) (Corrector, error) { }, nil } -func (c *correct) StartClient(ctx context.Context) (<-chan error, error) { - return c.discoverer.Start(ctx) -} - -func (c *correct) Start(ctx context.Context) error { +func (c *correct) Start(ctx context.Context) (<-chan error, error) { // set current time to context ctx = embedTime(ctx) + dech, err := c.discoverer.Start(ctx) + if err != nil { + return nil, err + } + // addrs is sorted by the memory usage of each agent(descending order) // this is decending because it's supposed to be used for index manager to decide // which pod to make a create index rpc(higher memory, first to commit) @@ -106,12 +106,12 @@ func (c *correct) Start(ctx context.Context) error { if l := len(c.agentAddrs); l <= 1 { log.Warn("only %d agent found, there must be more than two agents for correction to happen", l) - return errors.ErrAgentReplicaOne + return nil, err } - err := c.loadInfos(ctx) + err = c.loadInfos(ctx) if err != nil { - return err + return nil, err } c.indexInfos.Range(func(addr string, info *payload.Info_Index_Count) bool { @@ -122,11 +122,11 @@ func (c *correct) Start(ctx context.Context) error { log.Info("starting correction with bbolt disk cache...") if err := c.correct(ctx); err != nil { log.Errorf("there's some errors while correction: %v", err) - return err + return nil, err } log.Info("correction finished successfully") - return nil + return dech, nil } func (c *correct) PreStop(_ context.Context) error { @@ -161,16 +161,11 @@ func (c *correct) correct(ctx context.Context) (err error) { } curTargetAgent := 0 - jobErrs := make([]error, 0, c.cfg.Corrector.StreamListConcurrency) if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs, - func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) (err error) { + func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error { // current address is the leftAgentAddrs[0] because this is OrderedRange and // leftAgentAddrs is copied from c.agentAddrs defer func() { - if err != nil { - // catch the err that happened in this scope using named return err - jobErrs = append(jobErrs, err) - } curTargetAgent++ }() @@ -186,6 +181,7 @@ func (c *correct) correct(ctx context.Context) (err error) { bconcurrency := c.cfg.Corrector.GetBboltAsyncWriteConcurrency() bolteg.SetLimit(bconcurrency) + var mu sync.Mutex log.Infof("starting correction for agent %s, stream concurrency: %d, bbolt concurrency: %d", addr, sconcurrency, bconcurrency) vc := vald.NewValdClient(conn) @@ -194,7 +190,6 @@ func (c *correct) correct(ctx context.Context) (err error) { return err } - var mu sync.Mutex // The number of items to be received in advance is not known in advance. // This is because there is a possibility of new items being inserted during processing. for { @@ -235,14 +230,16 @@ func (c *correct) correct(ctx context.Context) (err error) { return nil } if err != nil { - return errors.ErrStreamListObjectStreamFinishedUnexpectedly(err) + log.Errorf("StreamListObject stream finished unexpectedly: %v", err) + return err } vec := res.GetVector() if vec == nil { st := res.GetStatus() log.Error(st.GetCode(), st.GetMessage(), st.GetDetails()) - return errors.ErrFailedToReceiveVectorFromStream + // continue + return nil } // skip if the vector is inserted after correction start @@ -259,7 +256,7 @@ func (c *correct) correct(ctx context.Context) (err error) { id := vec.GetId() _, ok, err := c.checkedID.Get([]byte(id)) if err != nil { - log.Errorf("failed to perform Get from bbolt but still try to finish processing without cache: %v", err) + log.Errorf("failed to perform Get from bbolt: %v", err) } if ok { // already checked index @@ -274,7 +271,8 @@ func (c *correct) correct(ctx context.Context) (err error) { }, curTargetAgent, ); err != nil { - return errors.ErrFailedToCheckConsistency(err) + log.Errorf("failed to check consistency: %v", err) + return nil // continue other processes } // now this id is checked so set it to the disk cache @@ -287,13 +285,11 @@ func (c *correct) correct(ctx context.Context) (err error) { } }, ); err != nil { - // This only happnes when ErrGRPCClientConnNotFound is returned. - // In other cases, OrderedRange continues processing, so jobErrrs is used to keep track of the error status of correction. + log.Errorf("failed to range over agents(%v): %v", c.agentAddrs, err) return err } - jobErrs = errors.RemoveDuplicates(jobErrs) - return errors.Join(jobErrs...) + return nil } type vectorReplica struct { diff --git a/pkg/index/job/correction/usecase/corrector.go b/pkg/index/job/correction/usecase/corrector.go index 896165f6aa..44907b9c86 100644 --- a/pkg/index/job/correction/usecase/corrector.go +++ b/pkg/index/job/correction/usecase/corrector.go @@ -145,19 +145,13 @@ func (r *run) PreStart(ctx context.Context) error { func (r *run) Start(ctx context.Context) (<-chan error, error) { log.Info("starting servers") ech := make(chan error, 3) //nolint:gomnd - var oech <-chan error - if r.observability != nil { - oech = r.observability.Start(ctx) - } - sech := r.server.ListenAndServe(ctx) - nech, err := r.corrector.StartClient(ctx) - if err != nil { - close(ech) - return nil, err - } - + var oech, nech, sech <-chan error r.eg.Go(safety.RecoverFunc(func() (err error) { defer close(ech) + if r.observability != nil { + oech = r.observability.Start(ctx) + } + sech = r.server.ListenAndServe(ctx) for { select { case <-ctx.Done(): @@ -195,7 +189,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { }() start := time.Now() - err = r.corrector.Start(ctx) + _, err = r.corrector.Start(ctx) if err != nil { log.Errorf("index correction process failed: %v", err) return err