Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job error to report index correction error status #2231

Merged
merged 11 commits into from
Nov 9, 2023
16 changes: 16 additions & 0 deletions internal/errors/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,24 @@
// ErrIndexReplicaOne represents an error that nothing to correct when index replica is 1.
var ErrIndexReplicaOne = New("nothing to correct when index replica is 1")

// ErrAgentReplicaOne represents an error that nothing to correct when agent replica is 1.
var ErrAgentReplicaOne = New("nothing to correct when agent replica is 1")

// ErrNoAvailableAgentToInsert represents an error that no available agent to insert replica.
var ErrNoAvailableAgentToInsert = New("no available agent to insert replica")

// ErrFailedToCorrectReplicaNum represents an error that failed to correct replica number after correction process.
var ErrFailedToCorrectReplicaNum = New("failed to correct replica number after correction process")

// ErrFailedToReceiveVectorFromStream represents an error that failed to receive vector from stream while index correction process.
var ErrFailedToReceiveVectorFromStream = New("failed to receive vector from stream")

// ErrFailedToCheckConsistency represents an error that failed to check consistency process while index correction process.
var ErrFailedToCheckConsistency = func(err error) error {
return Wrap(err, "failed to check consistency while index correctioin process")

Check warning on line 37 in internal/errors/corrector.go

View check run for this annotation

Codecov / codecov/patch

internal/errors/corrector.go#L36-L37

Added lines #L36 - L37 were not covered by tests
}

// ErrStreamListObjectStreamFinishedUnexpectedly represents an error that StreamListObject finished not because of io.EOF.
var ErrStreamListObjectStreamFinishedUnexpectedly = func(err error) error {
return Wrap(err, "stream list object stream finished unexpectedly")

Check warning on line 42 in internal/errors/corrector.go

View check run for this annotation

Codecov / codecov/patch

internal/errors/corrector.go#L41-L42

Added lines #L41 - L42 were not covered by tests
}
12 changes: 12 additions & 0 deletions internal/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package errors

import (
"cmp"
"errors"
"fmt"
"reflect"
"runtime"
"slices"
"strings"

"github.com/vdaas/vald/internal/sync"
Expand Down Expand Up @@ -279,6 +281,16 @@ func Join(errs ...error) error {
return e
}

func RemoveDuplicates(errs []error) []error {
if len(errs) < 2 {
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
return errs
}
slices.SortStableFunc(errs, func(l error, r error) int {
return cmp.Compare(l.Error(), r.Error())
})
return slices.CompactFunc(errs, Is)
}

type joinError struct {
errs []error
}
Expand Down
68 changes: 68 additions & 0 deletions internal/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1581,6 +1581,74 @@ func TestAs(t *testing.T) {
}
}

func TestRemoveDuplicates(t *testing.T) {
type args struct {
errs []error
}
tests := []struct {
name string
args args
want []error
}{
{
name: "succeeds to remove duplicated errors",
args: args{
errs: []error{
New("same error1"),
New("same error1"),
New("same error2"),
New("same error2"),
New("same error2"),
New("same error3"),
},
},
want: []error{
New("same error1"),
New("same error2"),
New("same error3"),
},
},
{
name: "single error remains the same",
args: args{
errs: []error{
New("same error"),
},
},
want: []error{
New("same error"),
},
},
{
name: "empty errs remains the same",
args: args{
errs: []error{},
},
want: []error{},
},
}

equalErrs := func(errs1, errs2 []error) bool {
if len(errs1) != len(errs2) {
return false
}
for i := range errs1 {
if !Is(errs1[i], errs2[i]) {
return false
}
}
return true
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := RemoveDuplicates(tt.args.errs); !equalErrs(got, tt.want) {
t.Errorf("removeDuplicatedErrs() = %v, want %v", got, tt.want)
}
})
}
}

// NOT IMPLEMENTED BELOW
//
// func TestUnwrap(t *testing.T) {
Expand Down
14 changes: 1 addition & 13 deletions internal/net/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package grpc

import (
"cmp"
"context"
"fmt"
"runtime"
"slices"
"sync/atomic"

"github.com/vdaas/vald/internal/errors"
Expand Down Expand Up @@ -75,9 +73,7 @@ func BidirectionalStream[Q any, R any](ctx context.Context, stream ServerStream,
errs = append(errs, err)
emu.Unlock()
}
removeDuplicates(errs, func(left, right error) int {
return cmp.Compare(left.Error(), right.Error())
})
errs := errors.RemoveDuplicates(errs)
emu.Lock()
err = errors.Join(errs...)
emu.Unlock()
Expand Down Expand Up @@ -229,11 +225,3 @@ func BidirectionalStreamClient(stream ClientStream,
}
}()
}

func removeDuplicates[S ~[]E, E comparable](x S, less func(left, right E) int) S {
if len(x) < 2 {
return x
}
slices.SortStableFunc(x, less)
return slices.Compact(x)
}
50 changes: 27 additions & 23 deletions pkg/index/job/correction/service/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
)

type Corrector interface {
Start(ctx context.Context) (<-chan error, error)
Start(ctx context.Context) error
StartClient(ctx context.Context) (<-chan error, error)
PreStop(ctx context.Context) error
// For metrics
NumberOfCheckedIndex() uint64
Expand Down Expand Up @@ -89,15 +90,14 @@
}, nil
}

func (c *correct) Start(ctx context.Context) (<-chan error, error) {
func (c *correct) StartClient(ctx context.Context) (<-chan error, error) {
return c.discoverer.Start(ctx)

Check warning on line 94 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L93-L94

Added lines #L93 - L94 were not covered by tests
}

func (c *correct) Start(ctx context.Context) error {

Check warning on line 97 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L97

Added line #L97 was not covered by tests
// set current time to context
ctx = embedTime(ctx)

dech, err := c.discoverer.Start(ctx)
if err != nil {
return nil, err
}

// addrs is sorted by the memory usage of each agent(descending order)
// this is decending because it's supposed to be used for index manager to decide
// which pod to make a create index rpc(higher memory, first to commit)
Expand All @@ -106,12 +106,12 @@

if l := len(c.agentAddrs); l <= 1 {
log.Warn("only %d agent found, there must be more than two agents for correction to happen", l)
return nil, err
return errors.ErrAgentReplicaOne

Check warning on line 109 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L109

Added line #L109 was not covered by tests
}

err = c.loadInfos(ctx)
err := c.loadInfos(ctx)

Check warning on line 112 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L112

Added line #L112 was not covered by tests
if err != nil {
return nil, err
return err

Check warning on line 114 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L114

Added line #L114 was not covered by tests
}

c.indexInfos.Range(func(addr string, info *payload.Info_Index_Count) bool {
Expand All @@ -122,11 +122,11 @@
log.Info("starting correction with bbolt disk cache...")
if err := c.correct(ctx); err != nil {
log.Errorf("there's some errors while correction: %v", err)
return nil, err
return err

Check warning on line 125 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L125

Added line #L125 was not covered by tests
}
log.Info("correction finished successfully")

return dech, nil
return nil

Check warning on line 129 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L129

Added line #L129 was not covered by tests
}

func (c *correct) PreStop(_ context.Context) error {
Expand Down Expand Up @@ -161,11 +161,16 @@
}

curTargetAgent := 0
jobErrs := make([]error, 0, c.cfg.Corrector.StreamListConcurrency)

Check warning on line 164 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L164

Added line #L164 was not covered by tests
if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs,
func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error {
func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) (err error) {

Check warning on line 166 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L166

Added line #L166 was not covered by tests
// current address is the leftAgentAddrs[0] because this is OrderedRange and
// leftAgentAddrs is copied from c.agentAddrs
defer func() {
if err != nil {
// catch the err that happened in this scope using named return err
jobErrs = append(jobErrs, err)
}

Check warning on line 173 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L170-L173

Added lines #L170 - L173 were not covered by tests
curTargetAgent++
}()

Expand All @@ -181,7 +186,6 @@
bconcurrency := c.cfg.Corrector.GetBboltAsyncWriteConcurrency()
bolteg.SetLimit(bconcurrency)

var mu sync.Mutex
log.Infof("starting correction for agent %s, stream concurrency: %d, bbolt concurrency: %d", addr, sconcurrency, bconcurrency)

vc := vald.NewValdClient(conn)
Expand All @@ -190,6 +194,7 @@
return err
}

var mu sync.Mutex

Check warning on line 197 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L197

Added line #L197 was not covered by tests
// The number of items to be received in advance is not known in advance.
// This is because there is a possibility of new items being inserted during processing.
for {
Expand Down Expand Up @@ -230,16 +235,14 @@
return nil
}
if err != nil {
log.Errorf("StreamListObject stream finished unexpectedly: %v", err)
return err
return errors.ErrStreamListObjectStreamFinishedUnexpectedly(err)

Check warning on line 238 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L238

Added line #L238 was not covered by tests
}

vec := res.GetVector()
if vec == nil {
st := res.GetStatus()
log.Error(st.GetCode(), st.GetMessage(), st.GetDetails())
// continue
return nil
return errors.ErrFailedToReceiveVectorFromStream

Check warning on line 245 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L245

Added line #L245 was not covered by tests
}

// skip if the vector is inserted after correction start
Expand All @@ -256,7 +259,7 @@
id := vec.GetId()
_, ok, err := c.checkedID.Get([]byte(id))
if err != nil {
log.Errorf("failed to perform Get from bbolt: %v", err)
log.Errorf("failed to perform Get from bbolt but still try to finish processing without cache: %v", err)

Check warning on line 262 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L262

Added line #L262 was not covered by tests
}
if ok {
// already checked index
Expand All @@ -271,8 +274,7 @@
},
curTargetAgent,
); err != nil {
log.Errorf("failed to check consistency: %v", err)
return nil // continue other processes
return errors.ErrFailedToCheckConsistency(err)

Check warning on line 277 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L277

Added line #L277 was not covered by tests
}

// now this id is checked so set it to the disk cache
Expand All @@ -285,11 +287,13 @@
}
},
); err != nil {
log.Errorf("failed to range over agents(%v): %v", c.agentAddrs, err)
// This only happnes when ErrGRPCClientConnNotFound is returned.
// In other cases, OrderedRange continues processing, so jobErrrs is used to keep track of the error status of correction.

Check warning on line 291 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L290-L291

Added lines #L290 - L291 were not covered by tests
return err
}

return nil
jobErrs = errors.RemoveDuplicates(jobErrs)
return errors.Join(jobErrs...)

Check warning on line 296 in pkg/index/job/correction/service/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/service/corrector.go#L295-L296

Added lines #L295 - L296 were not covered by tests
}

type vectorReplica struct {
Expand Down
18 changes: 12 additions & 6 deletions pkg/index/job/correction/usecase/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,19 @@
func (r *run) Start(ctx context.Context) (<-chan error, error) {
log.Info("starting servers")
ech := make(chan error, 3) //nolint:gomnd
var oech, nech, sech <-chan error
var oech <-chan error
if r.observability != nil {
oech = r.observability.Start(ctx)
}
sech := r.server.ListenAndServe(ctx)
nech, err := r.corrector.StartClient(ctx)
if err != nil {
close(ech)
return nil, err
}

Check warning on line 157 in pkg/index/job/correction/usecase/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/usecase/corrector.go#L148-L157

Added lines #L148 - L157 were not covered by tests

r.eg.Go(safety.RecoverFunc(func() (err error) {
defer close(ech)
if r.observability != nil {
oech = r.observability.Start(ctx)
}
sech = r.server.ListenAndServe(ctx)
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -189,7 +195,7 @@
}()

start := time.Now()
_, err = r.corrector.Start(ctx)
err = r.corrector.Start(ctx)

Check warning on line 198 in pkg/index/job/correction/usecase/corrector.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/correction/usecase/corrector.go#L198

Added line #L198 was not covered by tests
if err != nil {
log.Errorf("index correction process failed: %v", err)
return err
Expand Down
Loading