diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index a82376430fa..0291bc5863d 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -112,6 +112,39 @@ func (s *state) getKeyspaceGroupMeta( return s.ams[groupID], s.kgs[groupID] } +func (s *state) checkTSOSplit( + targetGroupID uint32, +) (splitTargetAM, splitSourceAM *AllocatorManager, err error) { + s.RLock() + defer s.RUnlock() + splitTargetAM, splitTargetGroup := s.ams[targetGroupID], s.kgs[targetGroupID] + // Only the split target keyspace group needs to check the TSO split. + if !splitTargetGroup.IsSplitTarget() { + return nil, nil, nil // it isn't in the split state + } + sourceGroupID := splitTargetGroup.SplitSource() + splitSourceAM, splitSourceGroup := s.ams[sourceGroupID], s.kgs[sourceGroupID] + if splitSourceAM == nil || splitSourceGroup == nil { + log.Error("the split source keyspace group is not initialized", + zap.Uint32("source", sourceGroupID)) + return nil, nil, errs.ErrKeyspaceGroupNotInitialized.FastGenByArgs(sourceGroupID) + } + return splitTargetAM, splitSourceAM, nil +} + +// Reject any request if the keyspace group is in merging state, +// we need to wait for the merging checker to finish the TSO merging. +func (s *state) checkTSOMerge( + groupID uint32, +) error { + s.RLock() + defer s.RUnlock() + if s.kgs[groupID] == nil || !s.kgs[groupID].IsMerging() { + return nil + } + return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(groupID) +} + // getKeyspaceGroupMetaWithCheck returns the keyspace group meta of the given keyspace. // It also checks if the keyspace is served by the given keyspace group. If not, it returns the meta // of the keyspace group to which the keyspace currently belongs and returns NotServed (by the given @@ -957,7 +990,7 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest( if err != nil { return pdpb.Timestamp{}, curKeyspaceGroupID, err } - err = kgm.checkTSOMerge(curKeyspaceGroupID) + err = kgm.state.checkTSOMerge(curKeyspaceGroupID) if err != nil { return pdpb.Timestamp{}, curKeyspaceGroupID, err } @@ -1032,19 +1065,11 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit( keyspaceGroupID uint32, dcLocation string, ) error { - splitAM, splitGroup := kgm.getKeyspaceGroupMeta(keyspaceGroupID) - // Only the split target keyspace group needs to check the TSO split. - if !splitGroup.IsSplitTarget() { - return nil - } - splitSource := splitGroup.SplitSource() - splitSourceAM, splitSourceGroup := kgm.getKeyspaceGroupMeta(splitSource) - if splitSourceAM == nil || splitSourceGroup == nil { - log.Error("the split source keyspace group is not initialized", - zap.Uint32("source", splitSource)) - return errs.ErrKeyspaceGroupNotInitialized.FastGenByArgs(splitSource) + splitTargetAM, splitSourceAM, err := kgm.state.checkTSOSplit(keyspaceGroupID) + if err != nil || splitTargetAM == nil { + return err } - splitAllocator, err := splitAM.GetAllocator(dcLocation) + splitTargetAllocator, err := splitTargetAM.GetAllocator(dcLocation) if err != nil { return err } @@ -1052,7 +1077,7 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit( if err != nil { return err } - splitTSO, err := splitAllocator.GenerateTSO(1) + splitTargetTSO, err := splitTargetAllocator.GenerateTSO(1) if err != nil { return err } @@ -1061,19 +1086,19 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit( return err } // If the split source TSO is not greater than the newly split TSO, we don't need to do anything. - if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTSO) <= 0 { + if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTargetTSO) <= 0 { log.Info("the split source tso is less than the newly split tso", zap.Int64("split-source-tso-physical", splitSourceTSO.Physical), zap.Int64("split-source-tso-logical", splitSourceTSO.Logical), - zap.Int64("split-tso-physical", splitTSO.Physical), - zap.Int64("split-tso-logical", splitTSO.Logical)) + zap.Int64("split-tso-physical", splitTargetTSO.Physical), + zap.Int64("split-tso-logical", splitTargetTSO.Logical)) // Finish the split state directly. return kgm.finishSplitKeyspaceGroup(keyspaceGroupID) } // If the split source TSO is greater than the newly split TSO, we need to update the split // TSO to make sure the following TSO will be greater than the split keyspaces ever had // in the past. - err = splitAllocator.SetTSO(tsoutil.GenerateTS(&pdpb.Timestamp{ + err = splitTargetAllocator.SetTSO(tsoutil.GenerateTS(&pdpb.Timestamp{ Physical: splitSourceTSO.Physical + 1, Logical: splitSourceTSO.Logical, }), true, true) @@ -1083,8 +1108,8 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit( log.Info("the split source tso is greater than the newly split tso", zap.Int64("split-source-tso-physical", splitSourceTSO.Physical), zap.Int64("split-source-tso-logical", splitSourceTSO.Logical), - zap.Int64("split-tso-physical", splitTSO.Physical), - zap.Int64("split-tso-logical", splitTSO.Logical)) + zap.Int64("split-tso-physical", splitTargetTSO.Physical), + zap.Int64("split-tso-logical", splitTargetTSO.Logical)) // Finish the split state. return kgm.finishSplitKeyspaceGroup(keyspaceGroupID) } @@ -1116,9 +1141,13 @@ func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error { zap.Int("status-code", statusCode)) return errs.ErrSendRequest.FastGenByArgs() } - // Pre-update the split keyspace group split state in memory. - splitGroup.SplitState = nil - kgm.kgs[id] = splitGroup + // Pre-update the split keyspace group's split state in memory. + // Note: to avoid data race with state read APIs, we always replace the group in memory as a whole. + // For now, we only have scenarios to update split state/merge state, and the other fields are always + // loaded from etcd without any modification, so we can simply copy the group and replace the state. + newSplitGroup := *splitGroup + newSplitGroup.SplitState = nil + kgm.kgs[id] = &newSplitGroup return nil } @@ -1146,9 +1175,14 @@ func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error { zap.Int("status-code", statusCode)) return errs.ErrSendRequest.FastGenByArgs() } - // Pre-update the split keyspace group split state in memory. - mergeTarget.MergeState = nil - kgm.kgs[id] = mergeTarget + + // Pre-update the merge target keyspace group's merge state in memory. + // Note: to avoid data race with state read APIs, we always replace the group in memory as a whole. + // For now, we only have scenarios to update split state/merge state, and the other fields are always + // loaded from etcd without any modification, so we can simply copy the group and replace the state. + newTargetGroup := *mergeTarget + newTargetGroup.MergeState = nil + kgm.kgs[id] = &newTargetGroup return nil } @@ -1286,15 +1320,3 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget return } } - -// Reject any request if the keyspace group is in merging state, -// we need to wait for the merging checker to finish the TSO merging. -func (kgm *KeyspaceGroupManager) checkTSOMerge( - keyspaceGroupID uint32, -) error { - _, group := kgm.getKeyspaceGroupMeta(keyspaceGroupID) - if !group.IsMerging() { - return nil - } - return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(keyspaceGroupID) -}