Skip to content

Commit

Permalink
Add timestamp check
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Sep 26, 2023
1 parent df88705 commit 8b0540d
Showing 1 changed file with 64 additions and 12 deletions.
76 changes: 64 additions & 12 deletions pkg/index/job/correction/service/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"path/filepath"
"slices"
"sync/atomic"
"time"

agent "github.com/vdaas/vald/apis/grpc/v1/agent/core"
"github.com/vdaas/vald/apis/grpc/v1/payload"
Expand All @@ -39,10 +40,13 @@ import (
"github.com/vdaas/vald/pkg/index/job/correction/config"
)

type contextTimeKey string

const (
insertMethod = "core.v1.Vald/Insert"
updateMethod = "core.v1.Vald/Update"
deleteMethod = "core.v1.Vald/Delete"
insertMethod = "core.v1.Vald/Insert"
updateMethod = "core.v1.Vald/Update"
deleteMethod = "core.v1.Vald/Delete"
correctionStartTimeKey contextTimeKey = "correctionStartTimeKey"
)

type Corrector interface {
Expand Down Expand Up @@ -79,6 +83,9 @@ func New(cfg *config.Data, discoverer discoverer.Client) (Corrector, error) {
}

func (c *correct) Start(ctx context.Context) (<-chan error, error) {
// set current time to context
ctx = embedTime(ctx)

dech, err := c.discoverer.Start(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -135,6 +142,13 @@ func (c *correct) correct(ctx context.Context) (err error) {
return fmt.Errorf("failed to copy agentAddrs")
}

// Vector with time after this should not be processed
correctionStartTime, err := getCorrectionStartTime(ctx)
if err != nil {
log.Errorf("cannot determine correction start time: %w", err)
return err
}

if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs,
func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error {
// current address is the leftAgentAddrs[0] because this is OrderedRange and
Expand Down Expand Up @@ -182,7 +196,6 @@ func (c *correct) correct(ctx context.Context) (err error) {
mu.Unlock()

if errors.Is(err, io.EOF) {
log.Debugf("StreamListObject stream finished for agent %s", addr)
scancel()
return nil
}
Expand All @@ -191,17 +204,26 @@ func (c *correct) correct(ctx context.Context) (err error) {
return err
}

if res.GetVector() == nil {
vec := res.GetVector()
if vec == nil {
st := res.GetStatus()
log.Error(st.GetCode(), st.GetMessage(), st.GetDetails())
// continue
return nil
}

log.Debugf("received object in StreamListObject: agent(%s), id(%s), timestamp(%v)", addr, res.GetVector().GetId(), res.GetVector().GetTimestamp())
// skip if the vector is inserted after correction start
if vec.GetTimestamp() > correctionStartTime.UnixNano() {
log.Debugf("timestamp of vector(id: %s, timestamp: %v) is newer than correction start time(%v). skipping...",
vec.GetId(),
vec.GetTimestamp(),
correctionStartTime.UnixNano(),
)
return nil
}

// check if the index is already checked
id := res.GetVector().GetId()
id := vec.GetId()
_, ok, err := c.checkedID.Get([]byte(id))
if err != nil {
log.Errorf("failed to perform Get from bbolt: %v", err)
Expand All @@ -215,7 +237,7 @@ func (c *correct) correct(ctx context.Context) (err error) {
ctx,
&vectorReplica{
addr: addr,
vec: res.GetVector(),
vec: vec,
},
leftAgentAddrs,
); err != nil {
Expand Down Expand Up @@ -271,6 +293,13 @@ func (c *correct) checkConsistency(ctx context.Context, targetReplica *vectorRep
}
}

// Vector with time after this should not be processed
correctionStartTime, err := getCorrectionStartTime(ctx)
if err != nil {
log.Errorf("cannot determine correction start time: %w", err)
return err
}

foundReplicas := make([]*vectorReplica, 0, len(availableAddrs))
var mu sync.Mutex
if err := c.discoverer.GetClient().OrderedRangeConcurrent(ctx, leftAgentAddrs, len(leftAgentAddrs),
Expand All @@ -286,7 +315,7 @@ func (c *correct) checkConsistency(ctx context.Context, targetReplica *vectorRep
default:
}
vc := vald.NewValdClient(conn)
v, err := vc.GetObject(ctx, &payload.Object_VectorRequest{
vec, err := vc.GetObject(ctx, &payload.Object_VectorRequest{
Id: &payload.Object_ID{
Id: targetReplica.vec.GetId(),
},
Expand All @@ -304,13 +333,20 @@ func (c *correct) checkConsistency(ctx context.Context, targetReplica *vectorRep
}
}

// the target replica is found in this agent with the addr
log.Debugf("object found: agent(%s), id(%v), timestamp(%v)", addr, v.GetId(), v.GetTimestamp())
// skip if the vector is inserted after correction start
if vec.GetTimestamp() > correctionStartTime.UnixNano() {
log.Debugf("timestamp of vector(id: %s, timestamp: %v) is newer than correction start time(%v). skipping...",
vec.GetId(),
vec.GetTimestamp(),
correctionStartTime.UnixNano(),
)
return nil
}

mu.Lock()
foundReplicas = append(foundReplicas, &vectorReplica{
addr: addr,
vec: v,
vec: vec,
})

// Remove this addr from availableAddrs because this addr has the target replica
Expand Down Expand Up @@ -554,3 +590,19 @@ func (c *correct) loadInfos(ctx context.Context) (err error) {
})
return nil
}

func embedTime(ctx context.Context) context.Context {
v := ctx.Value(correctionStartTimeKey)
if _, ok := v.(time.Time); ok {
return ctx
}
return context.WithValue(ctx, correctionStartTimeKey, time.Now())
}

func getCorrectionStartTime(ctx context.Context) (time.Time, error) {
v := ctx.Value(correctionStartTimeKey)
if t, ok := v.(time.Time); ok {
return t, nil
}
return time.Time{}, fmt.Errorf("timeKey is not embeded in context")
}

0 comments on commit 8b0540d

Please sign in to comment.