Skip to content

Commit

Permalink
puller(cdc): more tests and logs for frontier and kv-client (#10333)
Browse files Browse the repository at this point in the history
close #10370
  • Loading branch information
hicqu authored Dec 28, 2023
1 parent 5ce4090 commit 8ca4b58
Show file tree
Hide file tree
Showing 10 changed files with 321 additions and 52 deletions.
138 changes: 97 additions & 41 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
log.Debug("created stream to store",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.String("addr", addr))
zap.Int64("tableID", c.tableID),
zap.String("tableName", c.tableName),
zap.String("store", addr))
return nil
}
if c.config.Debug.EnableKVConnectBackOff {
Expand Down Expand Up @@ -516,6 +518,8 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
log.Info("request expired",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Stringer("span", &sri.span),
zap.Any("retrySpans", res.RetryRanges))
Expand Down Expand Up @@ -567,9 +571,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo) {
s.rangeLock.UnlockRange(errorInfo.span.StartKey, errorInfo.span.EndKey,
errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.resolvedTs())
log.Info("region failed", zap.Stringer("span", &errorInfo.span),
zap.Any("regionId", errorInfo.verID.GetID()),
zap.Error(errorInfo.err))

select {
case s.errCh.In() <- errorInfo:
s.errChSizeGauge.Inc()
Expand Down Expand Up @@ -647,9 +649,10 @@ func (s *eventFeedSession) requestRegionToStore(
log.Warn("get grpc stream client failed",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Uint64("regionID", regionID),
zap.Uint64("requestID", requestID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("storeID", storeID),
zap.String("store", storeAddr),
zap.Error(err))
if cerror.ErrVersionIncompatible.Equal(err) {
// It often occurs on rolling update. Sleep 20s to reduce logs.
Expand All @@ -669,10 +672,12 @@ func (s *eventFeedSession) requestRegionToStore(
log.Info("creating new stream to store to send request",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("regionID", regionID),
zap.Uint64("requestID", requestID),
zap.Uint64("storeID", storeID),
zap.String("addr", storeAddr))
zap.String("store", storeAddr))

g.Go(func() error {
defer s.deleteStream(storeAddr)
Expand All @@ -688,7 +693,10 @@ func (s *eventFeedSession) requestRegionToStore(
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.String("store", storeAddr))
zap.Uint64("storeID", storeID),
zap.String("store", storeAddr),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Uint64("requestID", requestID))
return cerror.ErrUnexpected.FastGenByArgs("pending regions is not found for store")
}

Expand All @@ -700,8 +708,11 @@ func (s *eventFeedSession) requestRegionToStore(
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("storeID", storeID),
zap.String("store", storeAddr),
zap.Uint64("regionID", sri.verID.GetID()),
zap.String("addr", storeAddr))
zap.Uint64("requestID", requestID),
zap.Stringer("span", &sri.span))

err = stream.client.Send(req)

Expand All @@ -713,8 +724,8 @@ func (s *eventFeedSession) requestRegionToStore(
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.String("addr", storeAddr),
zap.Uint64("storeID", storeID),
zap.String("store", storeAddr),
zap.Uint64("regionID", regionID),
zap.Uint64("requestID", requestID),
zap.Error(err))
Expand All @@ -724,10 +735,8 @@ func (s *eventFeedSession) requestRegionToStore(
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.String("addr", storeAddr),
zap.Uint64("storeID", storeID),
zap.Uint64("regionID", regionID),
zap.Uint64("requestID", requestID),
zap.String("store", storeAddr),
zap.Error(err))
}
// Delete the stream from the map so that the next time the store is accessed, the stream will be
Expand All @@ -746,6 +755,13 @@ func (s *eventFeedSession) requestRegionToStore(
continue
}

log.Info("region send to store failed",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Any("regionId", sri.verID.GetID()),
zap.Stringer("span", &sri.span))
errInfo := newRegionErrorInfo(sri, &sendRequestToStoreErr{})
s.onRegionFail(ctx, errInfo)
}
Expand Down Expand Up @@ -860,6 +876,8 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
log.Warn("load regions failed",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Any("span", nextSpan),
zap.Error(retryErr))
return retryErr
Expand Down Expand Up @@ -900,6 +918,8 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("regionID", errInfo.verID.GetID()),
zap.Stringer("span", &errInfo.span),
zap.Stringer("error", innerErr))

if notLeader := innerErr.GetNotLeader(); notLeader != nil {
Expand Down Expand Up @@ -927,13 +947,17 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
log.Error("tikv reported compatibility error, which is not expected",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.String("rpcCtx", errInfo.rpcCtx.String()),
zap.Stringer("error", compatibility))
return cerror.ErrVersionIncompatible.GenWithStackByArgs(compatibility)
} else if mismatch := innerErr.GetClusterIdMismatch(); mismatch != nil {
log.Error("tikv reported the request cluster ID mismatch error, which is not expected",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("tikvCurrentClusterID", mismatch.Current),
zap.Uint64("requestClusterID", mismatch.Request))
return cerror.ErrClusterIDMismatch.GenWithStackByArgs(mismatch.Current, mismatch.Request)
Expand All @@ -942,6 +966,8 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
log.Warn("receive empty or unknown error msg",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Stringer("error", innerErr))
}
case *rpcCtxUnavailableErr:
Expand Down Expand Up @@ -1006,12 +1032,22 @@ func (s *eventFeedSession) receiveFromStream(
log.Info("stream to store closed",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.String("addr", addr), zap.Uint64("storeID", storeID))
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("storeID", storeID),
zap.String("store", addr))

failpoint.Inject("kvClientStreamCloseDelay", nil)

remainingRegions := pendingRegions.takeAll()
for _, state := range remainingRegions {
log.Info("region canceled",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Any("regionId", state.sri.verID.GetID()),
zap.Stringer("span", &state.sri.span))
errInfo := newRegionErrorInfo(state.sri, cerror.ErrPendingRegionCancel.FastGenByArgs())
s.onRegionFail(parentCtx, errInfo)
}
Expand Down Expand Up @@ -1040,10 +1076,14 @@ func (s *eventFeedSession) receiveFromStream(
eg.Go(func() error {
err := handleExit(worker.run())
if err != nil {
log.Error("region worker exited with error", zap.Error(err),
zap.Any("changefeed", s.changefeed),
zap.Any("addr", addr),
zap.Any("storeID", storeID))
log.Error("region worker exited with error",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Any("store", addr),
zap.Any("storeID", storeID),
zap.Error(err))
}
return err
})
Expand All @@ -1068,22 +1108,22 @@ func (s *eventFeedSession) receiveFromStream(
})
if err != nil {
if status.Code(errors.Cause(err)) == codes.Canceled {
log.Info(
"receive from stream canceled",
log.Info("receive from stream canceled",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.String("addr", addr),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("storeID", storeID),
)
zap.String("store", addr))
} else {
log.Warn(
"failed to receive from stream",
log.Warn("failed to receive from stream",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.String("addr", addr),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("storeID", storeID),
zap.Error(err),
)
zap.String("store", addr),
zap.Error(err))
// Note that pd need at lease 10s+ to tag a kv node as disconnect if kv node down
// tikv raft need wait (raft-base-tick-interval * raft-election-timeout-ticks) 10s to start a new
// election
Expand Down Expand Up @@ -1120,15 +1160,17 @@ func (s *eventFeedSession) receiveFromStream(
log.Warn("change data event size too large",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int("size", size), zap.Int("eventLen", len(cevent.Events)),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Int("size", size), zap.Int("count", len(cevent.Events)),
zap.Int("resolvedRegionCount", regionCount))
}

if commitTs := getChangeDataEventCommitTs(cevent); commitTs > 0 && maxCommitTs < commitTs {
maxCommitTs = commitTs
}

err = s.sendRegionChangeEvents(ctx, cevent.Events, worker, pendingRegions, addr)
err = s.sendRegionChangeEvents(ctx, cevent.Events, worker, pendingRegions)
if err != nil {
return err
}
Expand Down Expand Up @@ -1166,7 +1208,6 @@ func (s *eventFeedSession) sendRegionChangeEvents(
events []*cdcpb.Event,
worker *regionWorker,
pendingRegions *syncRegionFeedStateMap,
addr string,
) error {
statefulEvents := make([][]*regionStatefulEvent, worker.concurrency)
for i := 0; i < worker.concurrency; i++ {
Expand All @@ -1187,17 +1228,17 @@ func (s *eventFeedSession) sendRegionChangeEvents(
zap.String("changefeed", s.changefeed.ID),
zap.Uint64("regionID", event.RegionId),
zap.Uint64("oldRequestID", state.requestID),
zap.Uint64("requestID", event.RequestId),
zap.String("addr", addr))
zap.Uint64("requestID", event.RequestId))
valid = false
} else if state.requestID > event.RequestId {
log.Warn("drop event due to event belongs to a stale request",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("regionID", event.RegionId),
zap.Uint64("requestID", event.RequestId),
zap.Uint64("currRequestID", state.requestID),
zap.String("addr", addr))
zap.Uint64("currRequestID", state.requestID))
continue
}
}
Expand All @@ -1212,20 +1253,30 @@ func (s *eventFeedSession) sendRegionChangeEvents(
log.Warn("drop event due to region feed is removed",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("regionID", event.RegionId),
zap.Uint64("requestID", event.RequestId),
zap.String("addr", addr))
zap.Uint64("requestID", event.RequestId))
continue
}
state.start()
worker.setRegionState(event.RegionId, state)
log.Info("event feeds puts state into region worker",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("regionID", event.RegionId),
zap.Uint64("requestID", event.RequestId),
zap.Stringer("span", &state.sri.span))
} else if state.isStale() {
log.Warn("drop event due to region feed stopped",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("regionID", event.RegionId),
zap.Uint64("requestID", event.RequestId),
zap.String("addr", addr))
zap.Uint64("requestID", event.RequestId))
continue
}

Expand All @@ -1237,6 +1288,8 @@ func (s *eventFeedSession) sendRegionChangeEvents(
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("regionID", event.RegionId),
zap.Uint64("requestID", event.RequestId),
zap.Stringer("span", &state.sri.span),
zap.Any("error", x.Error))
}

Expand Down Expand Up @@ -1349,7 +1402,10 @@ func (s *eventFeedSession) logSlowRegions(ctx context.Context) error {
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName))
zap.String("tableName", s.tableName),
zap.Any("slowRegion", attr.SlowestRegion),
zap.Bool("holesExist", len(attr.Holes) > 0),
zap.Int("lockedRegionCount", attr.LockedRegionCount))

if attr.SlowestRegion.Initialized {
if currTime.Sub(ckptTime) > 2*resolveLockMinInterval {
Expand All @@ -1360,14 +1416,14 @@ func (s *eventFeedSession) logSlowRegions(ctx context.Context) error {
zap.String("tableName", s.tableName),
zap.Any("slowRegion", attr.SlowestRegion))
}
} else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute {
} else if currTime.Sub(attr.SlowestRegion.Created) > 1*time.Minute {
log.Info("event feed initializes a region too slow",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Any("slowRegion", attr.SlowestRegion))
} else if currTime.Sub(ckptTime) > 10*time.Minute {
} else if currTime.Sub(ckptTime) > 1*time.Minute {
log.Info("event feed finds a uninitialized slow region",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
Expand Down
7 changes: 6 additions & 1 deletion cdc/kv/region_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,12 @@ func (s *regionFeedState) updateResolvedTs(resolvedTs uint64) {
}
}
if s.sri.requestedTable != nil {
s.sri.requestedTable.postUpdateRegionResolvedTs(s.sri.verID.GetID(), state)
s.sri.requestedTable.postUpdateRegionResolvedTs(
s.sri.verID.GetID(),
s.sri.verID.GetVer(),
state,
s.sri.span,
)
}
}

Expand Down
Loading

0 comments on commit 8ca4b58

Please sign in to comment.