Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan committed Nov 21, 2024
1 parent e58a145 commit c2f69f8
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 35 deletions.
2 changes: 1 addition & 1 deletion logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (e *eventStore) RegisterDispatcher(
// maxEventCommitTs may not be updated correctly and cause data loss.(lost resolved ts is harmless)
// To fix it, we need to alloc subID and initialize dispatcherStat before puller may send events.
// That is allocate subID in a separate method.
stat.subID = e.puller.Subscribe(*tableSpan, startTs, subscriptionTag{
stat.subID = e.puller.Subscribe(tableSpan, startTs, subscriptionTag{
chIndex: chIndex,
tableID: tableSpan.TableID,
uniqueKeyID: uniqueKeyID,
Expand Down
4 changes: 2 additions & 2 deletions logservice/logpuller/log_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
type spanProgress struct {
client *SubscriptionClient

span heartbeatpb.TableSpan
span *heartbeatpb.TableSpan

subID SubscriptionID

Expand Down Expand Up @@ -157,7 +157,7 @@ func (p *LogPuller) Close(ctx context.Context) error {
}

func (p *LogPuller) Subscribe(
span heartbeatpb.TableSpan,
span *heartbeatpb.TableSpan,
startTs uint64,
tag interface{},
) SubscriptionID {
Expand Down
2 changes: 1 addition & 1 deletion logservice/logpuller/log_puller_multi_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type LogPullerMultiSpan struct {
func NewLogPullerMultiSpan(
client *SubscriptionClient,
pdClock pdutil.Clock,
spans []heartbeatpb.TableSpan,
spans []*heartbeatpb.TableSpan,
startTs uint64,
consume func(context.Context, *common.RawKVEntry) error,
) *LogPullerMultiSpan {
Expand Down
2 changes: 1 addition & 1 deletion logservice/logpuller/region_change_event_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (w *changeEventProcessor) doHandle(
zap.Any("tableID", tableID),
zap.Uint64("regionID", regionID),
zap.Uint64("requestID", state.requestID),
zap.Stringer("span", &state.region.span))
zap.Stringer("span", state.region.span))

for _, cachedEvent := range state.matcher.matchCachedRow(true) {
revent, err := w.assembleRowEvent(regionID, cachedEvent)
Expand Down
6 changes: 3 additions & 3 deletions logservice/logpuller/region_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type regionInfo struct {
// For instance, consider region-1 with a span of [a, d).
// It contains 3 tables: t1[a, b), t2[b,c), and t3[c,d).
// If only table t1 is subscribed to, then the span of interest is [a,b).
span heartbeatpb.TableSpan
span *heartbeatpb.TableSpan
rpcCtx *tikv.RPCContext

// The table that the region belongs to.
Expand All @@ -52,7 +52,7 @@ func (s *regionInfo) isStopped() bool {

func newRegionInfo(
verID tikv.RegionVerID,
span heartbeatpb.TableSpan,
span *heartbeatpb.TableSpan,
rpcCtx *tikv.RPCContext,
subscribedSpan *subscribedSpan,
) regionInfo {
Expand Down Expand Up @@ -182,6 +182,6 @@ func (s *regionFeedState) getRegionInfo() regionInfo {
return s.region
}

func (s *regionFeedState) getRegionMeta() (uint64, heartbeatpb.TableSpan, string) {
func (s *regionFeedState) getRegionMeta() (uint64, *heartbeatpb.TableSpan, string) {
return s.region.verID.GetID(), s.region.span, s.region.rpcCtx.Addr
}
17 changes: 9 additions & 8 deletions logservice/logpuller/regionlock/region_range_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type LockRangeResult struct {

// RetryRanges is only used when Status is LockRangeStatusStale.
// It contains the ranges that should be retried to lock.
RetryRanges []heartbeatpb.TableSpan
RetryRanges []*heartbeatpb.TableSpan
}

// LockedRangeState is used to access the real-time state changes of a locked range.
Expand Down Expand Up @@ -109,7 +109,7 @@ type RangeLock struct {
// ID to identify different RangeLock instances, so logs of different instances can be distinguished.
id uint64
// totalSpan is the total range of the table, totalSpan = unlockedRanges + lockedRanges
totalSpan heartbeatpb.TableSpan
totalSpan *heartbeatpb.TableSpan

mu sync.RWMutex
// unlockedRanges is used to store the resolvedTs of unlocked ranges.
Expand All @@ -124,12 +124,13 @@ type RangeLock struct {
// NewRangeLock creates a new RangeLock.
func NewRangeLock(
id uint64,
startKey, endKey []byte, startTs uint64,
span *heartbeatpb.TableSpan,
startTs uint64,
) *RangeLock {
return &RangeLock{
id: id,
totalSpan: heartbeatpb.TableSpan{StartKey: startKey, EndKey: endKey},
unlockedRanges: newRangeTsMap(startKey, endKey, startTs),
totalSpan: span,
unlockedRanges: newRangeTsMap(span.StartKey, span.EndKey, startTs),
lockedRanges: btree.NewG(16, rangeLockEntryLess),
regionIDToLockedRanges: make(map[uint64]*rangeLockEntry),
}
Expand Down Expand Up @@ -456,7 +457,7 @@ func (l *RangeLock) tryLockRange(startKey, endKey []byte, regionID, regionVersio
// If the range is stale, we should return the overlapping ranges to the caller,
// so that the caller can retry to lock the rest of the range.
if isStale {
retryRanges := make([]heartbeatpb.TableSpan, 0)
retryRanges := make([]*heartbeatpb.TableSpan, 0)
currentRangeStartKey := startKey

log.Info("try lock range staled",
Expand All @@ -474,13 +475,13 @@ func (l *RangeLock) tryLockRange(startKey, endKey []byte, regionID, regionVersio
// The rest should come from range searching and is sorted in increasing order, and they
// must intersect with the current given range.
if bytes.Compare(currentRangeStartKey, r.startKey) < 0 {
retryRanges = append(retryRanges, heartbeatpb.TableSpan{StartKey: currentRangeStartKey, EndKey: r.startKey})
retryRanges = append(retryRanges, &heartbeatpb.TableSpan{StartKey: currentRangeStartKey, EndKey: r.startKey})
}
currentRangeStartKey = r.endKey
}

if bytes.Compare(currentRangeStartKey, endKey) < 0 {
retryRanges = append(retryRanges, heartbeatpb.TableSpan{StartKey: currentRangeStartKey, EndKey: endKey})
retryRanges = append(retryRanges, &heartbeatpb.TableSpan{StartKey: currentRangeStartKey, EndKey: endKey})
}

return LockRangeResult{
Expand Down
4 changes: 2 additions & 2 deletions logservice/logpuller/regionlock/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import (
)

// CheckRegionsLeftCover checks whether the regions cover the left part of given span
func CheckRegionsLeftCover(regions []*metapb.Region, span heartbeatpb.TableSpan) bool {
func CheckRegionsLeftCover(regions []*metapb.Region, span *heartbeatpb.TableSpan) bool {
subRegions := CutRegionsLeftCoverSpan(regions, span)
return len(regions) > 0 && len(subRegions) == len(regions)
}

// CutRegionsLeftCoverSpan processes a list of regions to remove those that
// do not cover the specified span or are discontinuous with the previous region.
// It returns a new slice containing only the continuous regions that cover the span.
func CutRegionsLeftCoverSpan(regions []*metapb.Region, spanToCover heartbeatpb.TableSpan) []*metapb.Region {
func CutRegionsLeftCoverSpan(regions []*metapb.Region, spanToCover *heartbeatpb.TableSpan) []*metapb.Region {
if len(regions) == 0 {
return nil
}
Expand Down
16 changes: 8 additions & 8 deletions logservice/logpuller/subscription_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type resolveLockTask struct {
// rangeTask represents a task to subscribe a range span of a table.
// It can be a part of a table or a whole table, it also can be a part of a region.
type rangeTask struct {
span heartbeatpb.TableSpan
span *heartbeatpb.TableSpan
subscribedSpan *subscribedSpan
}

Expand All @@ -117,7 +117,7 @@ type subscribedSpan struct {
startTs tablepb.Ts

// The target span
span heartbeatpb.TableSpan
span *heartbeatpb.TableSpan
// The range lock of the span,
// it is used to prevent duplicate requests to the same region range,
// and it also used to calculate this table's resolvedTs.
Expand Down Expand Up @@ -263,7 +263,7 @@ func (s *SubscriptionClient) initMetrics() {
// It new a subscribedSpan and store it in `s.totalSpans`,
// and send a rangeTask to `s.rangeTaskCh`.
// The rangeTask will be handled in `handleRangeTasks` goroutine.
func (s *SubscriptionClient) Subscribe(subID SubscriptionID, span heartbeatpb.TableSpan, startTs uint64) {
func (s *SubscriptionClient) Subscribe(subID SubscriptionID, span *heartbeatpb.TableSpan, startTs uint64) {
if span.TableID == 0 {
log.Panic("subscription client subscribe with zero TableID")
}
Expand Down Expand Up @@ -501,7 +501,7 @@ func (s *SubscriptionClient) handleRangeTasks(ctx context.Context) error {
// 3. Schedule a region request to subscribe the region.
func (s *SubscriptionClient) divideSpanAndScheduleRegionRequests(
ctx context.Context,
span heartbeatpb.TableSpan,
span *heartbeatpb.TableSpan,
subscribedSpan *subscribedSpan,
) error {
// Limit the number of regions loaded at a time to make the load more stable.
Expand Down Expand Up @@ -549,7 +549,7 @@ func (s *SubscriptionClient) divideSpanAndScheduleRegionRequests(
}

for _, regionMeta := range regionMetas {
regionSpan := heartbeatpb.TableSpan{
regionSpan := &heartbeatpb.TableSpan{
StartKey: regionMeta.StartKey,
EndKey: regionMeta.EndKey,
}
Expand Down Expand Up @@ -609,7 +609,7 @@ func (s *SubscriptionClient) scheduleRegionRequest(ctx context.Context, region r
}

func (s *SubscriptionClient) scheduleRangeRequest(
ctx context.Context, span heartbeatpb.TableSpan,
ctx context.Context, span *heartbeatpb.TableSpan,
subscribedSpan *subscribedSpan,
) {
select {
Expand Down Expand Up @@ -805,10 +805,10 @@ func (s *SubscriptionClient) logSlowRegions(ctx context.Context) error {

func (s *SubscriptionClient) newSubscribedSpan(
subID SubscriptionID,
span heartbeatpb.TableSpan,
span *heartbeatpb.TableSpan,
startTs uint64,
) *subscribedSpan {
rangeLock := regionlock.NewRangeLock(uint64(subID), span.StartKey, span.EndKey, startTs)
rangeLock := regionlock.NewRangeLock(uint64(subID), span, startTs)

rt := &subscribedSpan{
subID: subID,
Expand Down
8 changes: 4 additions & 4 deletions logservice/schemastore/ddl_job_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,16 @@ const (
JobHistoryID = ddl.HistoryTableID
)

func getAllDDLSpan() []heartbeatpb.TableSpan {
spans := make([]heartbeatpb.TableSpan, 0, 2)
func getAllDDLSpan() []*heartbeatpb.TableSpan {
spans := make([]*heartbeatpb.TableSpan, 0, 2)
start, end := common.GetTableRange(JobTableID)
spans = append(spans, heartbeatpb.TableSpan{
spans = append(spans, &heartbeatpb.TableSpan{
TableID: JobTableID,
StartKey: common.ToComparableKey(start),
EndKey: common.ToComparableKey(end),
})
start, end = common.GetTableRange(JobHistoryID)
spans = append(spans, heartbeatpb.TableSpan{
spans = append(spans, &heartbeatpb.TableSpan{
TableID: JobHistoryID,
StartKey: common.ToComparableKey(start),
EndKey: common.ToComparableKey(end),
Expand Down
10 changes: 5 additions & 5 deletions pkg/common/span_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
var UpperBoundKey = []byte{255, 255, 255, 255, 255}

// HackTableSpan will set End as UpperBoundKey if End is Nil.
func HackTableSpan(span heartbeatpb.TableSpan) heartbeatpb.TableSpan {
func HackTableSpan(span *heartbeatpb.TableSpan) *heartbeatpb.TableSpan {
if span.StartKey == nil {
span.StartKey = []byte{}
}
Expand Down Expand Up @@ -92,10 +92,10 @@ func EndCompare(lhs []byte, rhs []byte) int {
}

// GetIntersectSpan return the intersect part of lhs and rhs span
func GetIntersectSpan(lhs heartbeatpb.TableSpan, rhs heartbeatpb.TableSpan) heartbeatpb.TableSpan {
func GetIntersectSpan(lhs *heartbeatpb.TableSpan, rhs *heartbeatpb.TableSpan) *heartbeatpb.TableSpan {
if len(lhs.StartKey) != 0 && EndCompare(lhs.StartKey, rhs.EndKey) >= 0 ||
len(rhs.StartKey) != 0 && EndCompare(rhs.StartKey, lhs.EndKey) >= 0 {
return heartbeatpb.TableSpan{
return &heartbeatpb.TableSpan{
StartKey: nil,
EndKey: nil,
}
Expand All @@ -113,15 +113,15 @@ func GetIntersectSpan(lhs heartbeatpb.TableSpan, rhs heartbeatpb.TableSpan) hear
end = rhs.EndKey
}

return heartbeatpb.TableSpan{
return &heartbeatpb.TableSpan{
StartKey: start,
EndKey: end,
}
}

// IsEmptySpan returns true if the span is empty.
// TODO: check whether need span.StartKey >= span.EndKey
func IsEmptySpan(span heartbeatpb.TableSpan) bool {
func IsEmptySpan(span *heartbeatpb.TableSpan) bool {
return len(span.StartKey) == 0 && len(span.EndKey) == 0
}

Expand Down

0 comments on commit c2f69f8

Please sign in to comment.