From 8b0540dd76e19748e76db21e51c4e3c6e1f4cd5e Mon Sep 17 00:00:00 2001 From: ykadowak Date: Tue, 26 Sep 2023 02:22:15 +0000 Subject: [PATCH] Add timestamp check --- pkg/index/job/correction/service/corrector.go | 76 ++++++++++++++++--- 1 file changed, 64 insertions(+), 12 deletions(-) diff --git a/pkg/index/job/correction/service/corrector.go b/pkg/index/job/correction/service/corrector.go index e57bdcaf0e..faa5b19d3a 100644 --- a/pkg/index/job/correction/service/corrector.go +++ b/pkg/index/job/correction/service/corrector.go @@ -22,6 +22,7 @@ import ( "path/filepath" "slices" "sync/atomic" + "time" agent "github.com/vdaas/vald/apis/grpc/v1/agent/core" "github.com/vdaas/vald/apis/grpc/v1/payload" @@ -39,10 +40,13 @@ import ( "github.com/vdaas/vald/pkg/index/job/correction/config" ) +type contextTimeKey string + const ( - insertMethod = "core.v1.Vald/Insert" - updateMethod = "core.v1.Vald/Update" - deleteMethod = "core.v1.Vald/Delete" + insertMethod = "core.v1.Vald/Insert" + updateMethod = "core.v1.Vald/Update" + deleteMethod = "core.v1.Vald/Delete" + correctionStartTimeKey contextTimeKey = "correctionStartTimeKey" ) type Corrector interface { @@ -79,6 +83,9 @@ func New(cfg *config.Data, discoverer discoverer.Client) (Corrector, error) { } func (c *correct) Start(ctx context.Context) (<-chan error, error) { + // set current time to context + ctx = embedTime(ctx) + dech, err := c.discoverer.Start(ctx) if err != nil { return nil, err @@ -135,6 +142,13 @@ func (c *correct) correct(ctx context.Context) (err error) { return fmt.Errorf("failed to copy agentAddrs") } + // Vector with time after this should not be processed + correctionStartTime, err := getCorrectionStartTime(ctx) + if err != nil { + log.Errorf("cannot determine correction start time: %w", err) + return err + } + if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs, func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error { // current address is the leftAgentAddrs[0] because this is OrderedRange and @@ -182,7 +196,6 @@ func (c *correct) correct(ctx context.Context) (err error) { mu.Unlock() if errors.Is(err, io.EOF) { - log.Debugf("StreamListObject stream finished for agent %s", addr) scancel() return nil } @@ -191,17 +204,26 @@ func (c *correct) correct(ctx context.Context) (err error) { return err } - if res.GetVector() == nil { + vec := res.GetVector() + if vec == nil { st := res.GetStatus() log.Error(st.GetCode(), st.GetMessage(), st.GetDetails()) // continue return nil } - log.Debugf("received object in StreamListObject: agent(%s), id(%s), timestamp(%v)", addr, res.GetVector().GetId(), res.GetVector().GetTimestamp()) + // skip if the vector is inserted after correction start + if vec.GetTimestamp() > correctionStartTime.UnixNano() { + log.Debugf("timestamp of vector(id: %s, timestamp: %v) is newer than correction start time(%v). skipping...", + vec.GetId(), + vec.GetTimestamp(), + correctionStartTime.UnixNano(), + ) + return nil + } // check if the index is already checked - id := res.GetVector().GetId() + id := vec.GetId() _, ok, err := c.checkedID.Get([]byte(id)) if err != nil { log.Errorf("failed to perform Get from bbolt: %v", err) @@ -215,7 +237,7 @@ func (c *correct) correct(ctx context.Context) (err error) { ctx, &vectorReplica{ addr: addr, - vec: res.GetVector(), + vec: vec, }, leftAgentAddrs, ); err != nil { @@ -271,6 +293,13 @@ func (c *correct) checkConsistency(ctx context.Context, targetReplica *vectorRep } } + // Vector with time after this should not be processed + correctionStartTime, err := getCorrectionStartTime(ctx) + if err != nil { + log.Errorf("cannot determine correction start time: %w", err) + return err + } + foundReplicas := make([]*vectorReplica, 0, len(availableAddrs)) var mu sync.Mutex if err := c.discoverer.GetClient().OrderedRangeConcurrent(ctx, leftAgentAddrs, len(leftAgentAddrs), @@ -286,7 +315,7 @@ func (c *correct) checkConsistency(ctx context.Context, targetReplica *vectorRep default: } vc := vald.NewValdClient(conn) - v, err := vc.GetObject(ctx, &payload.Object_VectorRequest{ + vec, err := vc.GetObject(ctx, &payload.Object_VectorRequest{ Id: &payload.Object_ID{ Id: targetReplica.vec.GetId(), }, @@ -304,13 +333,20 @@ func (c *correct) checkConsistency(ctx context.Context, targetReplica *vectorRep } } - // the target replica is found in this agent with the addr - log.Debugf("object found: agent(%s), id(%v), timestamp(%v)", addr, v.GetId(), v.GetTimestamp()) + // skip if the vector is inserted after correction start + if vec.GetTimestamp() > correctionStartTime.UnixNano() { + log.Debugf("timestamp of vector(id: %s, timestamp: %v) is newer than correction start time(%v). skipping...", + vec.GetId(), + vec.GetTimestamp(), + correctionStartTime.UnixNano(), + ) + return nil + } mu.Lock() foundReplicas = append(foundReplicas, &vectorReplica{ addr: addr, - vec: v, + vec: vec, }) // Remove this addr from availableAddrs because this addr has the target replica @@ -554,3 +590,19 @@ func (c *correct) loadInfos(ctx context.Context) (err error) { }) return nil } + +func embedTime(ctx context.Context) context.Context { + v := ctx.Value(correctionStartTimeKey) + if _, ok := v.(time.Time); ok { + return ctx + } + return context.WithValue(ctx, correctionStartTimeKey, time.Now()) +} + +func getCorrectionStartTime(ctx context.Context) (time.Time, error) { + v := ctx.Value(correctionStartTimeKey) + if t, ok := v.(time.Time); ok { + return t, nil + } + return time.Time{}, fmt.Errorf("timeKey is not embeded in context") +}