From 9e427d3b82784c31102077efb3a761f250ce6cf2 Mon Sep 17 00:00:00 2001 From: ykadowak Date: Tue, 7 Nov 2023 06:28:41 +0000 Subject: [PATCH 1/8] Fix to monitor discoverer error for index correction --- internal/errors/corrector.go | 3 +++ pkg/index/job/correction/service/corrector.go | 24 +++++++++++-------- pkg/index/job/correction/usecase/corrector.go | 18 +++++++++----- 3 files changed, 29 insertions(+), 16 deletions(-) diff --git a/internal/errors/corrector.go b/internal/errors/corrector.go index 757c66820e..234eb04ebd 100644 --- a/internal/errors/corrector.go +++ b/internal/errors/corrector.go @@ -20,6 +20,9 @@ package errors // ErrIndexReplicaOne represents an error that nothing to correct when index replica is 1. var ErrIndexReplicaOne = New("nothing to correct when index replica is 1") +// ErrAgentReplicaOne represents an error that nothing to correct when agent replica is 1. +var ErrAgentReplicaOne = New("nothing to correct when agent replica is 1") + // ErrNoAvailableAgentToInsert represents an error that no available agent to insert replica. var ErrNoAvailableAgentToInsert = New("no available agent to insert replica") diff --git a/pkg/index/job/correction/service/corrector.go b/pkg/index/job/correction/service/corrector.go index 9dfe5abb91..90935d8692 100644 --- a/pkg/index/job/correction/service/corrector.go +++ b/pkg/index/job/correction/service/corrector.go @@ -50,7 +50,8 @@ const ( ) type Corrector interface { - Start(ctx context.Context) (<-chan error, error) + Start(ctx context.Context) error + PreStart(ctx context.Context) (<-chan error, error) PreStop(ctx context.Context) error // For metrics NumberOfCheckedIndex() uint64 @@ -89,14 +90,17 @@ func New(cfg *config.Data, discoverer discoverer.Client) (Corrector, error) { }, nil } -func (c *correct) Start(ctx context.Context) (<-chan error, error) { - // set current time to context - ctx = embedTime(ctx) - +func (c *correct) PreStart(ctx context.Context) (<-chan error, error) { dech, err := c.discoverer.Start(ctx) if err != nil { return nil, err } + return dech, nil +} + +func (c *correct) Start(ctx context.Context) error { + // set current time to context + ctx = embedTime(ctx) // addrs is sorted by the memory usage of each agent(descending order) // this is decending because it's supposed to be used for index manager to decide @@ -106,12 +110,12 @@ func (c *correct) Start(ctx context.Context) (<-chan error, error) { if l := len(c.agentAddrs); l <= 1 { log.Warn("only %d agent found, there must be more than two agents for correction to happen", l) - return nil, err + return errors.ErrAgentReplicaOne } - err = c.loadInfos(ctx) + err := c.loadInfos(ctx) if err != nil { - return nil, err + return err } c.indexInfos.Range(func(addr string, info *payload.Info_Index_Count) bool { @@ -122,11 +126,11 @@ func (c *correct) Start(ctx context.Context) (<-chan error, error) { log.Info("starting correction with bbolt disk cache...") if err := c.correct(ctx); err != nil { log.Errorf("there's some errors while correction: %v", err) - return nil, err + return err } log.Info("correction finished successfully") - return dech, nil + return nil } func (c *correct) PreStop(_ context.Context) error { diff --git a/pkg/index/job/correction/usecase/corrector.go b/pkg/index/job/correction/usecase/corrector.go index 44907b9c86..8b218efc30 100644 --- a/pkg/index/job/correction/usecase/corrector.go +++ b/pkg/index/job/correction/usecase/corrector.go @@ -145,13 +145,19 @@ func (r *run) PreStart(ctx context.Context) error { func (r *run) Start(ctx context.Context) (<-chan error, error) { log.Info("starting servers") ech := make(chan error, 3) //nolint:gomnd - var oech, nech, sech <-chan error + var oech <-chan error + if r.observability != nil { + oech = r.observability.Start(ctx) + } + sech := r.server.ListenAndServe(ctx) + nech, err := r.corrector.PreStart(ctx) + if err != nil { + close(ech) + return nil, err + } + r.eg.Go(safety.RecoverFunc(func() (err error) { defer close(ech) - if r.observability != nil { - oech = r.observability.Start(ctx) - } - sech = r.server.ListenAndServe(ctx) for { select { case <-ctx.Done(): @@ -189,7 +195,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { }() start := time.Now() - _, err = r.corrector.Start(ctx) + err = r.corrector.Start(ctx) if err != nil { log.Errorf("index correction process failed: %v", err) return err From 3ff5451853c7e37cf100fe88e1a27b91e9a6cbe3 Mon Sep 17 00:00:00 2001 From: ykadowak Date: Tue, 7 Nov 2023 08:35:00 +0000 Subject: [PATCH 2/8] Add errors.RemoveDuplicates --- internal/errors/errors.go | 14 +++++++ internal/errors/errors_test.go | 68 ++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/internal/errors/errors.go b/internal/errors/errors.go index 3fd58dc0ec..13f7ecbdb1 100644 --- a/internal/errors/errors.go +++ b/internal/errors/errors.go @@ -18,10 +18,12 @@ package errors import ( + "cmp" "errors" "fmt" "reflect" "runtime" + "slices" "strings" "github.com/vdaas/vald/internal/sync" @@ -279,6 +281,18 @@ func Join(errs ...error) error { return e } +func RemoveDuplicates(errs []error) []error { + if len(errs) < 2 { + return errs + } + slices.SortStableFunc(errs, func(l error, r error) int { + return cmp.Compare(l.Error(), r.Error()) + }) + return slices.CompactFunc(errs, func(l error, r error) bool { + return Is(l, r) + }) +} + type joinError struct { errs []error } diff --git a/internal/errors/errors_test.go b/internal/errors/errors_test.go index 8a6b70e747..32f7e99bef 100644 --- a/internal/errors/errors_test.go +++ b/internal/errors/errors_test.go @@ -1581,6 +1581,74 @@ func TestAs(t *testing.T) { } } +func TestRemoveDuplicates(t *testing.T) { + type args struct { + errs []error + } + tests := []struct { + name string + args args + want []error + }{ + { + name: "succeeds to remove duplicated errors", + args: args{ + errs: []error{ + New("same error1"), + New("same error1"), + New("same error2"), + New("same error2"), + New("same error2"), + New("same error3"), + }, + }, + want: []error{ + New("same error1"), + New("same error2"), + New("same error3"), + }, + }, + { + name: "single error remains the same", + args: args{ + errs: []error{ + New("same error"), + }, + }, + want: []error{ + New("same error"), + }, + }, + { + name: "empty errs remains the same", + args: args{ + errs: []error{}, + }, + want: []error{}, + }, + } + + equalErrs := func(errs1, errs2 []error) bool { + if len(errs1) != len(errs2) { + return false + } + for i := range errs1 { + if !Is(errs1[i], errs2[i]) { + return false + } + } + return true + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := RemoveDuplicates(tt.args.errs); !equalErrs(got, tt.want) { + t.Errorf("removeDuplicatedErrs() = %v, want %v", got, tt.want) + } + }) + } +} + // NOT IMPLEMENTED BELOW // // func TestUnwrap(t *testing.T) { From 18ca1b3b5f4ab7e6bcbe84df716da857a869f3f2 Mon Sep 17 00:00:00 2001 From: ykadowak Date: Tue, 7 Nov 2023 08:36:00 +0000 Subject: [PATCH 3/8] Add jobErrs to report job status of index correction --- internal/errors/corrector.go | 13 ++++++++++ internal/net/grpc/stream.go | 14 +--------- pkg/index/job/correction/service/corrector.go | 26 ++++++++++++------- 3 files changed, 30 insertions(+), 23 deletions(-) diff --git a/internal/errors/corrector.go b/internal/errors/corrector.go index 234eb04ebd..5391f3591a 100644 --- a/internal/errors/corrector.go +++ b/internal/errors/corrector.go @@ -28,3 +28,16 @@ var ErrNoAvailableAgentToInsert = New("no available agent to insert replica") // ErrFailedToCorrectReplicaNum represents an error that failed to correct replica number after correction process. var ErrFailedToCorrectReplicaNum = New("failed to correct replica number after correction process") + +// ErrFailedToReceiveVectorFromStream represents an error that failed to receive vector from stream while index correction process. +var ErrFailedToReceiveVectorFromStream = New("failed to receive vector from stream") + +// ErrFailedToCheckConsistency represents an error that failed to check consistency process while index correction process. +var ErrFailedToCheckConsistency = func(err error) error { + return Wrap(err, "failed to check consistency while index correctioin process") +} + +// ErrStreamListObjectStreamFinishedUnexpectedly represents an error that StreamListObject finished not because of io.EOF. +var ErrStreamListObjectStreamFinishedUnexpectedly = func(err error) error { + return Wrap(err, "stream list object stream finished unexpectedly") +} diff --git a/internal/net/grpc/stream.go b/internal/net/grpc/stream.go index 4dc6e56de3..15ebbbd755 100644 --- a/internal/net/grpc/stream.go +++ b/internal/net/grpc/stream.go @@ -18,11 +18,9 @@ package grpc import ( - "cmp" "context" "fmt" "runtime" - "slices" "sync/atomic" "github.com/vdaas/vald/internal/errors" @@ -75,9 +73,7 @@ func BidirectionalStream[Q any, R any](ctx context.Context, stream ServerStream, errs = append(errs, err) emu.Unlock() } - removeDuplicates(errs, func(left, right error) int { - return cmp.Compare(left.Error(), right.Error()) - }) + errs := errors.RemoveDuplicates(errs) emu.Lock() err = errors.Join(errs...) emu.Unlock() @@ -229,11 +225,3 @@ func BidirectionalStreamClient(stream ClientStream, } }() } - -func removeDuplicates[S ~[]E, E comparable](x S, less func(left, right E) int) S { - if len(x) < 2 { - return x - } - slices.SortStableFunc(x, less) - return slices.Compact(x) -} diff --git a/pkg/index/job/correction/service/corrector.go b/pkg/index/job/correction/service/corrector.go index 90935d8692..6e77a106a0 100644 --- a/pkg/index/job/correction/service/corrector.go +++ b/pkg/index/job/correction/service/corrector.go @@ -165,6 +165,7 @@ func (c *correct) correct(ctx context.Context) (err error) { } curTargetAgent := 0 + jobErrs := make([]error, 0, c.cfg.Corrector.StreamListConcurrency) if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs, func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error { // current address is the leftAgentAddrs[0] because this is OrderedRange and @@ -185,15 +186,16 @@ func (c *correct) correct(ctx context.Context) (err error) { bconcurrency := c.cfg.Corrector.GetBboltAsyncWriteConcurrency() bolteg.SetLimit(bconcurrency) - var mu sync.Mutex log.Infof("starting correction for agent %s, stream concurrency: %d, bbolt concurrency: %d", addr, sconcurrency, bconcurrency) vc := vald.NewValdClient(conn) stream, err := vc.StreamListObject(ctx, &payload.Object_List_Request{}) if err != nil { + jobErrs = append(jobErrs, err) return err } + var mu sync.Mutex // The number of items to be received in advance is not known in advance. // This is because there is a possibility of new items being inserted during processing. for { @@ -217,6 +219,11 @@ func (c *correct) correct(ctx context.Context) (err error) { log.Info("bbolt all batch finished") } + // Aggregate errors for job status + if err != nil { + jobErrs = append(jobErrs, err) + } + log.Infof("correction finished for agent %s", addr) return err @@ -234,16 +241,14 @@ func (c *correct) correct(ctx context.Context) (err error) { return nil } if err != nil { - log.Errorf("StreamListObject stream finished unexpectedly: %v", err) - return err + return errors.ErrStreamListObjectStreamFinishedUnexpectedly(err) } vec := res.GetVector() if vec == nil { st := res.GetStatus() log.Error(st.GetCode(), st.GetMessage(), st.GetDetails()) - // continue - return nil + return errors.ErrFailedToReceiveVectorFromStream } // skip if the vector is inserted after correction start @@ -260,7 +265,7 @@ func (c *correct) correct(ctx context.Context) (err error) { id := vec.GetId() _, ok, err := c.checkedID.Get([]byte(id)) if err != nil { - log.Errorf("failed to perform Get from bbolt: %v", err) + log.Errorf("failed to perform Get from bbolt but still try to finish processing without cache: %v", err) } if ok { // already checked index @@ -275,8 +280,7 @@ func (c *correct) correct(ctx context.Context) (err error) { }, curTargetAgent, ); err != nil { - log.Errorf("failed to check consistency: %v", err) - return nil // continue other processes + return errors.ErrFailedToCheckConsistency(err) } // now this id is checked so set it to the disk cache @@ -289,11 +293,13 @@ func (c *correct) correct(ctx context.Context) (err error) { } }, ); err != nil { - log.Errorf("failed to range over agents(%v): %v", c.agentAddrs, err) + // This only happnes when ErrGRPCClientConnNotFound is returned. + // In other cases, OrderedRange continues processing, so jobErrrs is used to keep track of the error status of correction. return err } - return nil + jobErrs = errors.RemoveDuplicates(jobErrs) + return errors.Join(jobErrs...) } type vectorReplica struct { From c59ba5623f328feb416960f2ce82bf3fdf4c9a45 Mon Sep 17 00:00:00 2001 From: ykadowak Date: Wed, 8 Nov 2023 01:51:55 +0000 Subject: [PATCH 4/8] Simplify function literal --- internal/errors/errors.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/errors/errors.go b/internal/errors/errors.go index 13f7ecbdb1..0e6e5b6b43 100644 --- a/internal/errors/errors.go +++ b/internal/errors/errors.go @@ -288,9 +288,7 @@ func RemoveDuplicates(errs []error) []error { slices.SortStableFunc(errs, func(l error, r error) int { return cmp.Compare(l.Error(), r.Error()) }) - return slices.CompactFunc(errs, func(l error, r error) bool { - return Is(l, r) - }) + return slices.CompactFunc(errs, Is) } type joinError struct { From a214c1e93a147c21fc2b255f5e950c1fd9df3ee6 Mon Sep 17 00:00:00 2001 From: ykadowak Date: Wed, 8 Nov 2023 02:22:37 +0000 Subject: [PATCH 5/8] Update Corrector interface method name to StartMonitoring for clearity --- pkg/index/job/correction/service/corrector.go | 4 ++-- pkg/index/job/correction/usecase/corrector.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/index/job/correction/service/corrector.go b/pkg/index/job/correction/service/corrector.go index 6e77a106a0..8ac271da58 100644 --- a/pkg/index/job/correction/service/corrector.go +++ b/pkg/index/job/correction/service/corrector.go @@ -51,7 +51,7 @@ const ( type Corrector interface { Start(ctx context.Context) error - PreStart(ctx context.Context) (<-chan error, error) + StartMonitoring(ctx context.Context) (<-chan error, error) PreStop(ctx context.Context) error // For metrics NumberOfCheckedIndex() uint64 @@ -90,7 +90,7 @@ func New(cfg *config.Data, discoverer discoverer.Client) (Corrector, error) { }, nil } -func (c *correct) PreStart(ctx context.Context) (<-chan error, error) { +func (c *correct) StartMonitoring(ctx context.Context) (<-chan error, error) { dech, err := c.discoverer.Start(ctx) if err != nil { return nil, err diff --git a/pkg/index/job/correction/usecase/corrector.go b/pkg/index/job/correction/usecase/corrector.go index 8b218efc30..ec823e8b88 100644 --- a/pkg/index/job/correction/usecase/corrector.go +++ b/pkg/index/job/correction/usecase/corrector.go @@ -150,7 +150,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { oech = r.observability.Start(ctx) } sech := r.server.ListenAndServe(ctx) - nech, err := r.corrector.PreStart(ctx) + nech, err := r.corrector.StartMonitoring(ctx) if err != nil { close(ech) return nil, err From d2a24970f6db25f5d39766a9f5bbf02a22019a43 Mon Sep 17 00:00:00 2001 From: ykadowak Date: Wed, 8 Nov 2023 04:13:31 +0000 Subject: [PATCH 6/8] Removed unnecessary error handling --- pkg/index/job/correction/service/corrector.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/index/job/correction/service/corrector.go b/pkg/index/job/correction/service/corrector.go index 8ac271da58..c0ee6ea134 100644 --- a/pkg/index/job/correction/service/corrector.go +++ b/pkg/index/job/correction/service/corrector.go @@ -91,11 +91,7 @@ func New(cfg *config.Data, discoverer discoverer.Client) (Corrector, error) { } func (c *correct) StartMonitoring(ctx context.Context) (<-chan error, error) { - dech, err := c.discoverer.Start(ctx) - if err != nil { - return nil, err - } - return dech, nil + return c.discoverer.Start(ctx) } func (c *correct) Start(ctx context.Context) error { From fae30a04698a68ce53eb57d2cb8a2e2b074d356b Mon Sep 17 00:00:00 2001 From: ykadowak Date: Wed, 8 Nov 2023 04:15:23 +0000 Subject: [PATCH 7/8] Update Corrector interface method name to StartClient --- pkg/index/job/correction/service/corrector.go | 4 ++-- pkg/index/job/correction/usecase/corrector.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/index/job/correction/service/corrector.go b/pkg/index/job/correction/service/corrector.go index c0ee6ea134..ab67520d03 100644 --- a/pkg/index/job/correction/service/corrector.go +++ b/pkg/index/job/correction/service/corrector.go @@ -51,7 +51,7 @@ const ( type Corrector interface { Start(ctx context.Context) error - StartMonitoring(ctx context.Context) (<-chan error, error) + StartClient(ctx context.Context) (<-chan error, error) PreStop(ctx context.Context) error // For metrics NumberOfCheckedIndex() uint64 @@ -90,7 +90,7 @@ func New(cfg *config.Data, discoverer discoverer.Client) (Corrector, error) { }, nil } -func (c *correct) StartMonitoring(ctx context.Context) (<-chan error, error) { +func (c *correct) StartClient(ctx context.Context) (<-chan error, error) { return c.discoverer.Start(ctx) } diff --git a/pkg/index/job/correction/usecase/corrector.go b/pkg/index/job/correction/usecase/corrector.go index ec823e8b88..896165f6aa 100644 --- a/pkg/index/job/correction/usecase/corrector.go +++ b/pkg/index/job/correction/usecase/corrector.go @@ -150,7 +150,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { oech = r.observability.Start(ctx) } sech := r.server.ListenAndServe(ctx) - nech, err := r.corrector.StartMonitoring(ctx) + nech, err := r.corrector.StartClient(ctx) if err != nil { close(ech) return nil, err From 5f0a887866d209f86676902a92bdc0f845dcaa82 Mon Sep 17 00:00:00 2001 From: ykadowak Date: Wed, 8 Nov 2023 05:33:00 +0000 Subject: [PATCH 8/8] Fix error handling in corrector.go to catch all the err in defer function --- pkg/index/job/correction/service/corrector.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/index/job/correction/service/corrector.go b/pkg/index/job/correction/service/corrector.go index ab67520d03..44f8b834be 100644 --- a/pkg/index/job/correction/service/corrector.go +++ b/pkg/index/job/correction/service/corrector.go @@ -163,10 +163,14 @@ func (c *correct) correct(ctx context.Context) (err error) { curTargetAgent := 0 jobErrs := make([]error, 0, c.cfg.Corrector.StreamListConcurrency) if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs, - func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error { + func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) (err error) { // current address is the leftAgentAddrs[0] because this is OrderedRange and // leftAgentAddrs is copied from c.agentAddrs defer func() { + if err != nil { + // catch the err that happened in this scope using named return err + jobErrs = append(jobErrs, err) + } curTargetAgent++ }() @@ -187,7 +191,6 @@ func (c *correct) correct(ctx context.Context) (err error) { vc := vald.NewValdClient(conn) stream, err := vc.StreamListObject(ctx, &payload.Object_List_Request{}) if err != nil { - jobErrs = append(jobErrs, err) return err } @@ -215,11 +218,6 @@ func (c *correct) correct(ctx context.Context) (err error) { log.Info("bbolt all batch finished") } - // Aggregate errors for job status - if err != nil { - jobErrs = append(jobErrs, err) - } - log.Infof("correction finished for agent %s", addr) return err