From 8ca4b58dd39d09579bfc39823a781a7edc172d11 Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 28 Dec 2023 11:19:58 +0800 Subject: [PATCH] puller(cdc): more tests and logs for frontier and kv-client (#10333) close pingcap/tiflow#10370 --- cdc/kv/client.go | 138 ++++++++++++++------ cdc/kv/region_state.go | 7 +- cdc/kv/region_worker.go | 20 ++- cdc/kv/regionlock/region_range_lock.go | 20 ++- cdc/kv/regionlock/region_range_lock_test.go | 31 +++++ cdc/kv/shared_client.go | 4 +- cdc/kv/shared_region_worker.go | 4 +- cdc/puller/frontier/frontier_test.go | 132 +++++++++++++++++++ pkg/spanz/convert.go | 11 ++ pkg/spanz/convert_test.go | 6 + 10 files changed, 321 insertions(+), 52 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 21925d96c89..74b3c506d88 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -251,7 +251,9 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) log.Debug("created stream to store", zap.String("namespace", c.changefeed.Namespace), zap.String("changefeed", c.changefeed.ID), - zap.String("addr", addr)) + zap.Int64("tableID", c.tableID), + zap.String("tableName", c.tableName), + zap.String("store", addr)) return nil } if c.config.Debug.EnableKVConnectBackOff { @@ -516,6 +518,8 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single log.Info("request expired", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), zap.Uint64("regionID", sri.verID.GetID()), zap.Stringer("span", &sri.span), zap.Any("retrySpans", res.RetryRanges)) @@ -567,9 +571,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo) { s.rangeLock.UnlockRange(errorInfo.span.StartKey, errorInfo.span.EndKey, errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.resolvedTs()) - log.Info("region failed", zap.Stringer("span", &errorInfo.span), - zap.Any("regionId", errorInfo.verID.GetID()), - zap.Error(errorInfo.err)) + select { case s.errCh.In() <- errorInfo: s.errChSizeGauge.Inc() @@ -647,9 +649,10 @@ func (s *eventFeedSession) requestRegionToStore( log.Warn("get grpc stream client failed", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), - zap.Uint64("regionID", regionID), - zap.Uint64("requestID", requestID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), zap.Uint64("storeID", storeID), + zap.String("store", storeAddr), zap.Error(err)) if cerror.ErrVersionIncompatible.Equal(err) { // It often occurs on rolling update. Sleep 20s to reduce logs. @@ -669,10 +672,12 @@ func (s *eventFeedSession) requestRegionToStore( log.Info("creating new stream to store to send request", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), zap.Uint64("regionID", regionID), zap.Uint64("requestID", requestID), zap.Uint64("storeID", storeID), - zap.String("addr", storeAddr)) + zap.String("store", storeAddr)) g.Go(func() error { defer s.deleteStream(storeAddr) @@ -688,7 +693,10 @@ func (s *eventFeedSession) requestRegionToStore( zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), - zap.String("store", storeAddr)) + zap.Uint64("storeID", storeID), + zap.String("store", storeAddr), + zap.Uint64("regionID", sri.verID.GetID()), + zap.Uint64("requestID", requestID)) return cerror.ErrUnexpected.FastGenByArgs("pending regions is not found for store") } @@ -700,8 +708,11 @@ func (s *eventFeedSession) requestRegionToStore( zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), + zap.Uint64("storeID", storeID), + zap.String("store", storeAddr), zap.Uint64("regionID", sri.verID.GetID()), - zap.String("addr", storeAddr)) + zap.Uint64("requestID", requestID), + zap.Stringer("span", &sri.span)) err = stream.client.Send(req) @@ -713,8 +724,8 @@ func (s *eventFeedSession) requestRegionToStore( zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), - zap.String("addr", storeAddr), zap.Uint64("storeID", storeID), + zap.String("store", storeAddr), zap.Uint64("regionID", regionID), zap.Uint64("requestID", requestID), zap.Error(err)) @@ -724,10 +735,8 @@ func (s *eventFeedSession) requestRegionToStore( zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), - zap.String("addr", storeAddr), zap.Uint64("storeID", storeID), - zap.Uint64("regionID", regionID), - zap.Uint64("requestID", requestID), + zap.String("store", storeAddr), zap.Error(err)) } // Delete the stream from the map so that the next time the store is accessed, the stream will be @@ -746,6 +755,13 @@ func (s *eventFeedSession) requestRegionToStore( continue } + log.Info("region send to store failed", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Any("regionId", sri.verID.GetID()), + zap.Stringer("span", &sri.span)) errInfo := newRegionErrorInfo(sri, &sendRequestToStoreErr{}) s.onRegionFail(ctx, errInfo) } @@ -860,6 +876,8 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( log.Warn("load regions failed", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), zap.Any("span", nextSpan), zap.Error(retryErr)) return retryErr @@ -900,6 +918,8 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), + zap.Uint64("regionID", errInfo.verID.GetID()), + zap.Stringer("span", &errInfo.span), zap.Stringer("error", innerErr)) if notLeader := innerErr.GetNotLeader(); notLeader != nil { @@ -927,6 +947,8 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI log.Error("tikv reported compatibility error, which is not expected", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), zap.String("rpcCtx", errInfo.rpcCtx.String()), zap.Stringer("error", compatibility)) return cerror.ErrVersionIncompatible.GenWithStackByArgs(compatibility) @@ -934,6 +956,8 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI log.Error("tikv reported the request cluster ID mismatch error, which is not expected", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), zap.Uint64("tikvCurrentClusterID", mismatch.Current), zap.Uint64("requestClusterID", mismatch.Request)) return cerror.ErrClusterIDMismatch.GenWithStackByArgs(mismatch.Current, mismatch.Request) @@ -942,6 +966,8 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI log.Warn("receive empty or unknown error msg", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), zap.Stringer("error", innerErr)) } case *rpcCtxUnavailableErr: @@ -1006,12 +1032,22 @@ func (s *eventFeedSession) receiveFromStream( log.Info("stream to store closed", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), - zap.String("addr", addr), zap.Uint64("storeID", storeID)) + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("storeID", storeID), + zap.String("store", addr)) failpoint.Inject("kvClientStreamCloseDelay", nil) remainingRegions := pendingRegions.takeAll() for _, state := range remainingRegions { + log.Info("region canceled", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Any("regionId", state.sri.verID.GetID()), + zap.Stringer("span", &state.sri.span)) errInfo := newRegionErrorInfo(state.sri, cerror.ErrPendingRegionCancel.FastGenByArgs()) s.onRegionFail(parentCtx, errInfo) } @@ -1040,10 +1076,14 @@ func (s *eventFeedSession) receiveFromStream( eg.Go(func() error { err := handleExit(worker.run()) if err != nil { - log.Error("region worker exited with error", zap.Error(err), - zap.Any("changefeed", s.changefeed), - zap.Any("addr", addr), - zap.Any("storeID", storeID)) + log.Error("region worker exited with error", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Any("store", addr), + zap.Any("storeID", storeID), + zap.Error(err)) } return err }) @@ -1068,22 +1108,22 @@ func (s *eventFeedSession) receiveFromStream( }) if err != nil { if status.Code(errors.Cause(err)) == codes.Canceled { - log.Info( - "receive from stream canceled", + log.Info("receive from stream canceled", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), - zap.String("addr", addr), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), zap.Uint64("storeID", storeID), - ) + zap.String("store", addr)) } else { - log.Warn( - "failed to receive from stream", + log.Warn("failed to receive from stream", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), - zap.String("addr", addr), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), zap.Uint64("storeID", storeID), - zap.Error(err), - ) + zap.String("store", addr), + zap.Error(err)) // Note that pd need at lease 10s+ to tag a kv node as disconnect if kv node down // tikv raft need wait (raft-base-tick-interval * raft-election-timeout-ticks) 10s to start a new // election @@ -1120,7 +1160,9 @@ func (s *eventFeedSession) receiveFromStream( log.Warn("change data event size too large", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), - zap.Int("size", size), zap.Int("eventLen", len(cevent.Events)), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Int("size", size), zap.Int("count", len(cevent.Events)), zap.Int("resolvedRegionCount", regionCount)) } @@ -1128,7 +1170,7 @@ func (s *eventFeedSession) receiveFromStream( maxCommitTs = commitTs } - err = s.sendRegionChangeEvents(ctx, cevent.Events, worker, pendingRegions, addr) + err = s.sendRegionChangeEvents(ctx, cevent.Events, worker, pendingRegions) if err != nil { return err } @@ -1166,7 +1208,6 @@ func (s *eventFeedSession) sendRegionChangeEvents( events []*cdcpb.Event, worker *regionWorker, pendingRegions *syncRegionFeedStateMap, - addr string, ) error { statefulEvents := make([][]*regionStatefulEvent, worker.concurrency) for i := 0; i < worker.concurrency; i++ { @@ -1187,17 +1228,17 @@ func (s *eventFeedSession) sendRegionChangeEvents( zap.String("changefeed", s.changefeed.ID), zap.Uint64("regionID", event.RegionId), zap.Uint64("oldRequestID", state.requestID), - zap.Uint64("requestID", event.RequestId), - zap.String("addr", addr)) + zap.Uint64("requestID", event.RequestId)) valid = false } else if state.requestID > event.RequestId { log.Warn("drop event due to event belongs to a stale request", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), zap.Uint64("regionID", event.RegionId), zap.Uint64("requestID", event.RequestId), - zap.Uint64("currRequestID", state.requestID), - zap.String("addr", addr)) + zap.Uint64("currRequestID", state.requestID)) continue } } @@ -1212,20 +1253,30 @@ func (s *eventFeedSession) sendRegionChangeEvents( log.Warn("drop event due to region feed is removed", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), zap.Uint64("regionID", event.RegionId), - zap.Uint64("requestID", event.RequestId), - zap.String("addr", addr)) + zap.Uint64("requestID", event.RequestId)) continue } state.start() worker.setRegionState(event.RegionId, state) + log.Info("event feeds puts state into region worker", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("regionID", event.RegionId), + zap.Uint64("requestID", event.RequestId), + zap.Stringer("span", &state.sri.span)) } else if state.isStale() { log.Warn("drop event due to region feed stopped", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), zap.Uint64("regionID", event.RegionId), - zap.Uint64("requestID", event.RequestId), - zap.String("addr", addr)) + zap.Uint64("requestID", event.RequestId)) continue } @@ -1237,6 +1288,8 @@ func (s *eventFeedSession) sendRegionChangeEvents( zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), zap.Uint64("regionID", event.RegionId), + zap.Uint64("requestID", event.RequestId), + zap.Stringer("span", &state.sri.span), zap.Any("error", x.Error)) } @@ -1349,7 +1402,10 @@ func (s *eventFeedSession) logSlowRegions(ctx context.Context) error { zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), - zap.String("tableName", s.tableName)) + zap.String("tableName", s.tableName), + zap.Any("slowRegion", attr.SlowestRegion), + zap.Bool("holesExist", len(attr.Holes) > 0), + zap.Int("lockedRegionCount", attr.LockedRegionCount)) if attr.SlowestRegion.Initialized { if currTime.Sub(ckptTime) > 2*resolveLockMinInterval { @@ -1360,14 +1416,14 @@ func (s *eventFeedSession) logSlowRegions(ctx context.Context) error { zap.String("tableName", s.tableName), zap.Any("slowRegion", attr.SlowestRegion)) } - } else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute { + } else if currTime.Sub(attr.SlowestRegion.Created) > 1*time.Minute { log.Info("event feed initializes a region too slow", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), zap.Any("slowRegion", attr.SlowestRegion)) - } else if currTime.Sub(ckptTime) > 10*time.Minute { + } else if currTime.Sub(ckptTime) > 1*time.Minute { log.Info("event feed finds a uninitialized slow region", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), diff --git a/cdc/kv/region_state.go b/cdc/kv/region_state.go index 00cee2b8572..1263eaee9e7 100644 --- a/cdc/kv/region_state.go +++ b/cdc/kv/region_state.go @@ -150,7 +150,12 @@ func (s *regionFeedState) updateResolvedTs(resolvedTs uint64) { } } if s.sri.requestedTable != nil { - s.sri.requestedTable.postUpdateRegionResolvedTs(s.sri.verID.GetID(), state) + s.sri.requestedTable.postUpdateRegionResolvedTs( + s.sri.verID.GetID(), + s.sri.verID.GetVer(), + state, + s.sri.span, + ) } } diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 01d425558de..3f430878dbb 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -214,6 +214,8 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState log.Info("single region event feed disconnected", zap.String("namespace", w.session.client.changefeed.Namespace), zap.String("changefeed", w.session.client.changefeed.ID), + zap.Int64("tableID", w.session.tableID), + zap.String("tableName", w.session.tableName), zap.Uint64("regionID", regionID), zap.Uint64("requestID", state.requestID), zap.Stringer("span", &state.sri.span), @@ -388,6 +390,8 @@ func (w *regionWorker) processEvent(ctx context.Context, event *regionStatefulEv log.Info("receive admin event", zap.String("namespace", w.session.client.changefeed.Namespace), zap.String("changefeed", w.session.client.changefeed.ID), + zap.Int64("tableID", w.session.tableID), + zap.String("tableName", w.session.tableName), zap.Stringer("event", event.changeEvent)) case *cdcpb.Event_Error: err = w.handleSingleRegionError( @@ -439,7 +443,9 @@ func (w *regionWorker) eventHandler(ctx context.Context) error { exitFn := func() error { log.Info("region worker closed by error", zap.String("namespace", w.session.client.changefeed.Namespace), - zap.String("changefeed", w.session.client.changefeed.ID)) + zap.String("changefeed", w.session.client.changefeed.ID), + zap.Int64("tableID", w.session.tableID), + zap.String("tableName", w.session.tableName)) return cerror.ErrRegionWorkerExit.GenWithStackByArgs() } @@ -636,7 +642,7 @@ func (w *regionWorker) handleEventEntry( return false } } - return handleEventEntry(x, w.session.startTs, state, w.metrics, emit) + return handleEventEntry(x, w.session.startTs, state, w.metrics, emit, w.session.changefeed, w.session.tableID) } func handleEventEntry( @@ -645,6 +651,8 @@ func handleEventEntry( state *regionFeedState, metrics *regionWorkerMetrics, emit func(assembled model.RegionFeedEvent) bool, + changefeed model.ChangeFeedID, + tableID model.TableID, ) error { regionID, regionSpan, _ := state.getRegionMeta() for _, entry := range x.Entries.GetEntries() { @@ -660,6 +668,14 @@ func handleEventEntry( case cdcpb.Event_INITIALIZED: metrics.metricPullEventInitializedCounter.Inc() state.setInitialized() + log.Info("region is initialized", + zap.String("namespace", changefeed.Namespace), + zap.String("changefeed", changefeed.ID), + zap.Int64("tableID", tableID), + zap.Uint64("regionID", regionID), + zap.Uint64("requestID", state.requestID), + zap.Stringer("span", &state.sri.span)) + for _, cachedEvent := range state.matcher.matchCachedRow(true) { revent, err := assembleRowEvent(regionID, cachedEvent) if err != nil { diff --git a/cdc/kv/regionlock/region_range_lock.go b/cdc/kv/regionlock/region_range_lock.go index 3c375f843df..8d4a919918f 100644 --- a/cdc/kv/regionlock/region_range_lock.go +++ b/cdc/kv/regionlock/region_range_lock.go @@ -420,6 +420,13 @@ func (l *RegionRangeLock) UnlockRange( return } +// LockedRanges returns count of locked ranges. +func (l *RegionRangeLock) LockedRanges() int { + l.mu.Lock() + defer l.mu.Unlock() + return l.rangeLock.Len() +} + // RefCount returns how many ranges are locked. func (l *RegionRangeLock) RefCount() uint64 { l.mu.Lock() @@ -475,17 +482,19 @@ type LockedRange struct { // CollectLockedRangeAttrs collects locked range attributes. func (l *RegionRangeLock) CollectLockedRangeAttrs( - action func(regionID uint64, state *LockedRange), + action func(regionID, version uint64, state *LockedRange, span tablepb.Span), ) (r CollectedLockedRangeAttrs) { l.mu.Lock() defer l.mu.Unlock() + r.LockedRegionCount = l.rangeLock.Len() r.FastestRegion.CheckpointTs = 0 r.SlowestRegion.CheckpointTs = math.MaxUint64 lastEnd := l.totalSpan.StartKey l.rangeLock.Ascend(func(item *rangeLockEntry) bool { if action != nil { - action(item.regionID, &item.state) + span := tablepb.Span{StartKey: item.startKey, EndKey: item.endKey} + action(item.regionID, item.version, &item.state, span) } if spanz.EndCompare(lastEnd, item.startKey) < 0 { r.Holes = append(r.Holes, tablepb.Span{StartKey: lastEnd, EndKey: item.startKey}) @@ -514,9 +523,10 @@ func (l *RegionRangeLock) CollectLockedRangeAttrs( // CollectedLockedRangeAttrs returns by `RegionRangeLock.CollectedLockedRangeAttrs`. type CollectedLockedRangeAttrs struct { - Holes []tablepb.Span - FastestRegion LockedRangeAttrs - SlowestRegion LockedRangeAttrs + LockedRegionCount int + Holes []tablepb.Span + FastestRegion LockedRangeAttrs + SlowestRegion LockedRangeAttrs } // LockedRangeAttrs is like `LockedRange`, but only contains some read-only attributes. diff --git a/cdc/kv/regionlock/region_range_lock_test.go b/cdc/kv/regionlock/region_range_lock_test.go index af887248164..fc85bf68706 100644 --- a/cdc/kv/regionlock/region_range_lock_test.go +++ b/cdc/kv/regionlock/region_range_lock_test.go @@ -219,3 +219,34 @@ func TestRangeTsMap(t *testing.T) { mustGetMin("b", "e", 100) mustGetMin("a", "z", 80) } + +func TestRegionRangeLockCollect(t *testing.T) { + t.Parallel() + + ctx := context.Background() + l := NewRegionRangeLock(1, []byte("a"), []byte("z"), 100, "") + attrs := l.CollectLockedRangeAttrs(nil) + require.Equal(t, 1, len(attrs.Holes)) + + l.LockRange(ctx, []byte("a"), []byte("m"), 1, 1).LockedRange.CheckpointTs.Add(1) + attrs = l.CollectLockedRangeAttrs(nil) + require.Equal(t, 1, len(attrs.Holes)) + require.Equal(t, uint64(101), attrs.SlowestRegion.CheckpointTs) + require.Equal(t, uint64(101), attrs.FastestRegion.CheckpointTs) + + l.LockRange(ctx, []byte("m"), []byte("z"), 2, 1).LockedRange.CheckpointTs.Add(2) + attrs = l.CollectLockedRangeAttrs(nil) + require.Equal(t, 0, len(attrs.Holes)) + require.Equal(t, uint64(101), attrs.SlowestRegion.CheckpointTs) + require.Equal(t, uint64(102), attrs.FastestRegion.CheckpointTs) + + l.UnlockRange([]byte("a"), []byte("m"), 1, 1) + attrs = l.CollectLockedRangeAttrs(nil) + require.Equal(t, 1, len(attrs.Holes)) + require.Equal(t, uint64(102), attrs.SlowestRegion.CheckpointTs) + require.Equal(t, uint64(102), attrs.FastestRegion.CheckpointTs) + + l.UnlockRange([]byte("m"), []byte("z"), 2, 1) + attrs = l.CollectLockedRangeAttrs(nil) + require.Equal(t, 1, len(attrs.Holes)) +} diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index cf5ad03eebc..c53e6ac22cb 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -126,7 +126,7 @@ type requestedTable struct { stopped atomic.Bool // To handle lock resolvings. - postUpdateRegionResolvedTs func(regionID uint64, state *regionlock.LockedRange) + postUpdateRegionResolvedTs func(regionID, version uint64, state *regionlock.LockedRange, span tablepb.Span) staleLocksVersion atomic.Uint64 } @@ -770,7 +770,7 @@ func (s *SharedClient) newRequestedTable( eventCh: eventCh, } - rt.postUpdateRegionResolvedTs = func(regionID uint64, state *regionlock.LockedRange) { + rt.postUpdateRegionResolvedTs = func(regionID, _ uint64, state *regionlock.LockedRange, _ tablepb.Span) { maxVersion := rt.staleLocksVersion.Load() if state.CheckpointTs.Load() <= maxVersion && state.Initialzied.Load() { enter := time.Now() diff --git a/cdc/kv/shared_region_worker.go b/cdc/kv/shared_region_worker.go index 83b7805712d..732b148f1ff 100644 --- a/cdc/kv/shared_region_worker.go +++ b/cdc/kv/shared_region_worker.go @@ -163,12 +163,14 @@ func (w *sharedRegionWorker) handleEventEntry(ctx context.Context, x *cdcpb.Even return false } } + tableID := state.sri.requestedTable.span.TableID log.Debug("region worker get an Event", zap.String("namespace", w.changefeed.Namespace), zap.String("changefeed", w.changefeed.ID), zap.Any("subscriptionID", state.sri.requestedTable.subscriptionID), + zap.Int64("tableID", tableID), zap.Int("rows", len(x.Entries.GetEntries()))) - return handleEventEntry(x, startTs, state, w.metrics, emit) + return handleEventEntry(x, startTs, state, w.metrics, emit, w.changefeed, tableID) } func (w *sharedRegionWorker) handleResolvedTs(ctx context.Context, batch resolvedTsBatch) { diff --git a/cdc/puller/frontier/frontier_test.go b/cdc/puller/frontier/frontier_test.go index 2ca32716016..e096f20f42a 100644 --- a/cdc/puller/frontier/frontier_test.go +++ b/cdc/puller/frontier/frontier_test.go @@ -15,11 +15,13 @@ package frontier import ( "bytes" + "context" "math" "math/rand" "sort" "testing" + "github.com/pingcap/tiflow/cdc/kv/regionlock" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/pkg/spanz" "github.com/stretchr/testify/require" @@ -435,3 +437,133 @@ func TestFrontierEntries(t *testing.T) { require.Equal(t, []byte("a"), []byte(slowestRange.StartKey)) require.Equal(t, []byte("b"), []byte(slowestRange.EndKey)) } + +func TestRandomMergeAndSplit(t *testing.T) { + t.Parallel() + + start, end := spanz.GetTableRange(8616) + rangelock := regionlock.NewRegionRangeLock(1, start, end, 100, "") + frontier := NewFrontier(100, tablepb.Span{StartKey: start, EndKey: end}) + ctx := context.Background() + + var nextRegionID uint64 = 1 + var nextVersion uint64 = 1 + var nextTs uint64 = 100 + rangelock.LockRange(ctx, start, end, nextRegionID, nextVersion) + + nextTs += 1 + frontier.Forward(1, tablepb.Span{StartKey: start, EndKey: end}, nextTs) + require.Equal(t, nextTs, frontier.Frontier()) + + for i := 0; i < 100000; i++ { + totalLockedRanges := rangelock.LockedRanges() + unchangedRegions := make([]lockedRegion, 0, totalLockedRanges) + + mergeOrSplit := "split" + if totalLockedRanges > 1 && rand.Intn(2) > 0 { + mergeOrSplit = "merge" + } + + nextTs += 1 + if mergeOrSplit == "split" { + var r1, r2 lockedRegion + selected := rand.Intn(totalLockedRanges) + count := 0 + rangelock.CollectLockedRangeAttrs(func(regionID, version uint64, state *regionlock.LockedRange, span tablepb.Span) { + ts := state.CheckpointTs.Load() + startKey := span.StartKey + endKey := span.EndKey + if count == selected { + r1 = lockedRegion{regionID, version, startKey, endKey, ts} + } else { + r := lockedRegion{regionID, version, startKey, endKey, ts} + unchangedRegions = append(unchangedRegions, r) + } + count += 1 + }) + + rangelock.UnlockRange(r1.startKey, r1.endKey, r1.regionID, r1.version) + + r2 = r1.split(&nextRegionID, &nextVersion) + rangelock.LockRange(ctx, r1.startKey, r1.endKey, r1.regionID, nextVersion) + rangelock.LockRange(ctx, r2.startKey, r2.endKey, r2.regionID, nextVersion) + + frontier.Forward(r1.regionID, tablepb.Span{StartKey: r1.startKey, EndKey: r1.endKey}, nextTs) + frontier.Forward(r2.regionID, tablepb.Span{StartKey: r2.startKey, EndKey: r2.endKey}, nextTs) + } else { + var r1, r2 lockedRegion + selected := rand.Intn(totalLockedRanges - 1) + count := 0 + rangelock.CollectLockedRangeAttrs(func(regionID, version uint64, state *regionlock.LockedRange, span tablepb.Span) { + ts := state.CheckpointTs.Load() + startKey := span.StartKey + endKey := span.EndKey + if count == selected { + r1 = lockedRegion{regionID, version, startKey, endKey, ts} + } else if count == selected+1 { + r2 = lockedRegion{regionID, version, startKey, endKey, ts} + } else { + r := lockedRegion{regionID, version, startKey, endKey, ts} + unchangedRegions = append(unchangedRegions, r) + } + count += 1 + }) + + rangelock.UnlockRange(r1.startKey, r1.endKey, r1.regionID, r1.version) + rangelock.UnlockRange(r2.startKey, r2.endKey, r2.regionID, r2.version) + + r2.merge(r1, &nextVersion) + rangelock.LockRange(ctx, r2.startKey, r2.endKey, r2.regionID, nextVersion) + + frontier.Forward(r2.regionID, tablepb.Span{StartKey: r2.startKey, EndKey: r2.endKey}, nextTs) + } + for _, r := range unchangedRegions { + frontier.Forward(r.regionID, tablepb.Span{StartKey: r.startKey, EndKey: r.endKey}, nextTs) + } + require.Equal(t, nextTs, frontier.Frontier()) + } +} + +type lockedRegion struct { + regionID uint64 + version uint64 + startKey []byte + endKey []byte + ts uint64 +} + +func (r *lockedRegion) split(regionIDGen *uint64, versionGen *uint64) (s lockedRegion) { + *regionIDGen += 1 + *versionGen += 1 + + s.regionID = *regionIDGen + s.version = *versionGen + s.ts = r.ts + s.startKey = r.startKey + + s.endKey = make([]byte, len(r.startKey)+1) + copy(s.endKey, r.startKey) + for { + s.endKey[len(s.endKey)-1] = '1' + if bytes.Compare(s.endKey, r.endKey) < 0 { + break + } + s.endKey[len(s.endKey)-1] = '0' + s.endKey = append(s.endKey, '0') + } + + r.version = *versionGen + r.startKey = make([]byte, len(s.endKey)) + copy(r.startKey, s.endKey) + return +} + +func (r *lockedRegion) merge(s lockedRegion, versionGen *uint64) { + if !bytes.Equal(r.startKey, s.endKey) { + panic("bad merge") + } + + *versionGen += 1 + r.startKey = s.startKey + r.version = *versionGen +} diff --git a/pkg/spanz/convert.go b/pkg/spanz/convert.go index 6668f55250f..442f6ad7364 100644 --- a/pkg/spanz/convert.go +++ b/pkg/spanz/convert.go @@ -14,6 +14,7 @@ package spanz import ( + "fmt" "reflect" "sort" "unsafe" @@ -23,6 +24,16 @@ import ( "go.uber.org/zap" ) +// HexKey returns a hex string generated from the key. +func HexKey(key []byte) string { + // TODO(qupeng): improve the function. + str := "" + for _, c := range key { + str += fmt.Sprintf("%02X", c) + } + return str +} + // ArrayToSpan converts an array of TableID to an array of Span. func ArrayToSpan(in []tablepb.TableID) []tablepb.Span { out := make([]tablepb.Span, 0, len(in)) diff --git a/pkg/spanz/convert_test.go b/pkg/spanz/convert_test.go index b8a829ae684..f3e7c8db8bd 100644 --- a/pkg/spanz/convert_test.go +++ b/pkg/spanz/convert_test.go @@ -65,3 +65,9 @@ func TestUnsafeStringByte(t *testing.T) { require.EqualValues(t, len(s), len(ub)) require.EqualValues(t, len(s), cap(ub)) } + +func TestHexKey(t *testing.T) { + span := TableIDToComparableSpan(8616) + require.Equal(t, "7480000000000021FFA85F720000000000FA", HexKey(span.StartKey)) + require.Equal(t, "7480000000000021FFA85F730000000000FA", HexKey(span.EndKey)) +}