diff --git a/server/replication/replication_mode.go b/server/replication/replication_mode.go index 4d939d357dc..d276bd8ec18 100644 --- a/server/replication/replication_mode.go +++ b/server/replication/replication_mode.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/logutil" - "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/syncutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" @@ -70,11 +69,11 @@ type ModeManager struct { initTime time.Time syncutil.RWMutex - config config.ReplicationModeConfig - storage endpoint.ReplicationStatusStorage - cluster schedule.Cluster - fileReplicater FileReplicater - replicatedMembers []uint64 + config config.ReplicationModeConfig + storage endpoint.ReplicationStatusStorage + cluster schedule.Cluster + fileReplicater FileReplicater + replicateState sync.Map drAutoSync drAutoSyncStatus // intermediate states of the recovery process @@ -240,7 +239,6 @@ func (m *ModeManager) drSwitchToAsyncWait(availableStores []uint64) error { return err } dr := drAutoSyncStatus{State: drStateAsyncWait, StateID: id, AvailableStores: availableStores} - m.drPersistStatusWithLock(dr) if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -263,7 +261,6 @@ func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error { return err } dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores} - m.drPersistStatusWithLock(dr) if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -287,7 +284,6 @@ func (m *ModeManager) drSwitchToSyncRecoverWithLock() error { } now := time.Now() dr := drAutoSyncStatus{State: drStateSyncRecover, StateID: id, RecoverStartTime: &now} - m.drPersistStatusWithLock(dr) if err = m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -307,7 +303,6 @@ func (m *ModeManager) drSwitchToSync() error { return err } dr := drAutoSyncStatus{State: drStateSync, StateID: id} - m.drPersistStatusWithLock(dr) if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -317,50 +312,6 @@ func (m *ModeManager) drSwitchToSync() error { return nil } -func (m *ModeManager) drPersistStatusWithLock(status drAutoSyncStatus) { - ctx, cancel := context.WithTimeout(context.Background(), persistFileTimeout) - defer cancel() - - members, err := m.fileReplicater.GetMembers() - if err != nil { - log.Warn("failed to get members", zap.String("replicate-mode", modeDRAutoSync)) - return - } - - data, _ := json.Marshal(status) - - m.replicatedMembers = m.replicatedMembers[:0] - for _, member := range members { - if err := m.fileReplicater.ReplicateFileToMember(ctx, member, drStatusFile, data); err != nil { - log.Warn("failed to switch state", zap.String("replicate-mode", modeDRAutoSync), zap.String("new-state", status.State), errs.ZapError(err)) - // Throw away the error to make it possible to switch to async when - // primary and dr DC are disconnected. This will result in the - // inability to accurately determine whether data is fully - // synchronized when using dr DC to disaster recovery. - // Since the member will not be in `replicatedMembers` list, PD will - // try to replicate state file later. - } else { - m.replicatedMembers = append(m.replicatedMembers, member.GetMemberId()) - } - } -} - -func (m *ModeManager) drCheckNeedPersistStatus(members []*pdpb.Member) bool { - m.RLock() - defer m.RUnlock() - return slice.AnyOf(members, func(i int) bool { // if there is any member in the new list - return slice.NoneOf(m.replicatedMembers, func(j int) bool { // not replicated - return m.replicatedMembers[j] == members[i].GetMemberId() - }) - }) -} - -func (m *ModeManager) drPersistStatus() { - m.Lock() - defer m.Unlock() - m.drPersistStatusWithLock(drAutoSyncStatus{State: m.drAutoSync.State, StateID: m.drAutoSync.StateID}) -} - func (m *ModeManager) drGetState() string { m.RLock() defer m.RUnlock() @@ -368,8 +319,9 @@ func (m *ModeManager) drGetState() string { } const ( - idleTimeout = time.Minute - tickInterval = 500 * time.Millisecond + idleTimeout = time.Minute + tickInterval = 500 * time.Millisecond + replicateStateInterval = time.Second * 5 ) // Run starts the background job. @@ -380,17 +332,38 @@ func (m *ModeManager) Run(ctx context.Context) { case <-ctx.Done(): return } - for { - select { - case <-time.After(tickInterval): - case <-ctx.Done(): - return + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + for { + select { + case <-time.After(tickInterval): + case <-ctx.Done(): + return + } + m.tickUpdateState() } - m.tickDR() - } + }() + + go func() { + defer wg.Done() + for { + select { + case <-time.After(replicateStateInterval): + case <-ctx.Done(): + return + } + m.tickReplicateStatus() + } + }() + + wg.Wait() } -func (m *ModeManager) tickDR() { +func (m *ModeManager) tickUpdateState() { if m.getModeName() != modeDRAutoSync { return } @@ -483,8 +456,42 @@ func (m *ModeManager) tickDR() { } } } +} + +func (m *ModeManager) tickReplicateStatus() { + if m.getModeName() != modeDRAutoSync { + return + } + + m.RLock() + state := drAutoSyncStatus{ + State: m.drAutoSync.State, + StateID: m.drAutoSync.StateID, + AvailableStores: m.drAutoSync.AvailableStores, + RecoverStartTime: m.drAutoSync.RecoverStartTime, + } + m.RUnlock() - m.checkReplicateFile() + data, _ := json.Marshal(state) + + members, err := m.fileReplicater.GetMembers() + if err != nil { + log.Warn("failed to get members", zap.String("replicate-mode", modeDRAutoSync)) + return + } + for _, member := range members { + stateID, ok := m.replicateState.Load(member.GetMemberId()) + if !ok || stateID.(uint64) != state.StateID { + ctx, cancel := context.WithTimeout(context.Background(), persistFileTimeout) + err := m.fileReplicater.ReplicateFileToMember(ctx, member, drStatusFile, data) + if err != nil { + log.Warn("failed to switch state", zap.String("replicate-mode", modeDRAutoSync), zap.String("new-state", state.State), errs.ZapError(err)) + } else { + m.replicateState.Store(member.GetMemberId(), state.StateID) + } + cancel() + } + } } const ( @@ -556,17 +563,6 @@ func (m *ModeManager) drCheckStoreStateUpdated(stores []uint64) bool { return true } -func (m *ModeManager) checkReplicateFile() { - members, err := m.fileReplicater.GetMembers() - if err != nil { - log.Warn("failed to get members", zap.String("replicate-mode", modeDRAutoSync)) - return - } - if m.drCheckNeedPersistStatus(members) { - m.drPersistStatus() - } -} - var ( regionScanBatchSize = 1024 regionMinSampleSize = 512 diff --git a/server/replication/replication_mode_test.go b/server/replication/replication_mode_test.go index 1dfd0bd8a69..7ddd4880011 100644 --- a/server/replication/replication_mode_test.go +++ b/server/replication/replication_mode_test.go @@ -184,6 +184,7 @@ func TestStateSwitch(t *testing.T) { re.Equal(drStateSync, rep.drGetState()) stateID := rep.drAutoSync.StateID re.NotEqual(uint64(0), stateID) + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[1]) assertStateIDUpdate := func() { re.NotEqual(stateID, rep.drAutoSync.StateID) @@ -197,9 +198,10 @@ func TestStateSwitch(t *testing.T) { } // only one zone, sync -> async_wait -> async - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) assertStateIDUpdate() + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) re.False(rep.GetReplicationStatus().GetDrAutoSync().GetPauseRegionSplit()) @@ -208,112 +210,119 @@ func TestStateSwitch(t *testing.T) { re.True(rep.GetReplicationStatus().GetDrAutoSync().GetPauseRegionSplit()) syncStoreStatus(1, 2, 3, 4) - rep.tickDR() + rep.tickUpdateState() assertStateIDUpdate() + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) // add new store in dr zone. cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"}) cluster.AddLabersStoreWithLearnerCount(6, 1, 1, map[string]string{"zone": "zone2"}) // async -> sync - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) rep.drSwitchToSync() re.Equal(drStateSync, rep.drGetState()) assertStateIDUpdate() // sync -> async_wait - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSync, rep.drGetState()) setStoreState(cluster, "down", "up", "up", "up", "up", "up") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSync, rep.drGetState()) setStoreState(cluster, "down", "down", "up", "up", "up", "up") setStoreState(cluster, "down", "down", "down", "up", "up", "up") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSync, rep.drGetState()) // cannot guarantee majority, keep sync. setStoreState(cluster, "up", "up", "up", "up", "up", "down") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSync, rep.drGetState()) // once the voter node down, even learner node up, swith to async state. setStoreState(cluster, "up", "up", "up", "up", "down", "up") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) rep.drSwitchToSync() replicator.errors[2] = errors.New("fail to replicate") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) assertStateIDUpdate() delete(replicator.errors, 1) // async_wait -> sync setStoreState(cluster, "up", "up", "up", "up", "up", "up") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSync, rep.drGetState()) re.False(rep.GetReplicationStatus().GetDrAutoSync().GetPauseRegionSplit()) // async_wait -> async_wait setStoreState(cluster, "up", "up", "up", "up", "down", "up") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) assertStateIDUpdate() + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) setStoreState(cluster, "down", "up", "up", "up", "down", "up") - rep.tickDR() + rep.tickUpdateState() assertStateIDUpdate() + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[2,3,4]}`, stateID), replicator.lastData[1]) setStoreState(cluster, "up", "down", "up", "up", "down", "up") - rep.tickDR() + rep.tickUpdateState() assertStateIDUpdate() + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) // async_wait -> async - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) syncStoreStatus(1, 3) - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) syncStoreStatus(4) - rep.tickDR() + rep.tickUpdateState() assertStateIDUpdate() + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) // async -> async setStoreState(cluster, "up", "up", "up", "up", "down", "up") - rep.tickDR() + rep.tickUpdateState() // store 2 won't be available before it syncs status. + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) syncStoreStatus(1, 2, 3, 4) - rep.tickDR() + rep.tickUpdateState() assertStateIDUpdate() + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) // async -> sync_recover setStoreState(cluster, "up", "up", "up", "up", "up", "up") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) assertStateIDUpdate() rep.drSwitchToAsync([]uint64{1, 2, 3, 4, 5}) setStoreState(cluster, "down", "up", "up", "up", "up", "up") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) assertStateIDUpdate() // sync_recover -> async - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) setStoreState(cluster, "up", "up", "up", "up", "down", "up") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateAsync, rep.drGetState()) assertStateIDUpdate() // lost majority, does not switch to async. rep.drSwitchToSyncRecover() assertStateIDUpdate() setStoreState(cluster, "down", "down", "up", "up", "down", "up") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) // sync_recover -> sync @@ -327,7 +336,7 @@ func TestStateSwitch(t *testing.T) { State: pb.RegionReplicationState_SIMPLE_MAJORITY, })) cluster.PutRegion(region) - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) region = region.Clone(core.SetReplicationStatus(&pb.RegionReplicationStatus{ @@ -335,14 +344,14 @@ func TestStateSwitch(t *testing.T) { StateId: rep.drAutoSync.StateID - 1, // mismatch state id })) cluster.PutRegion(region) - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) region = region.Clone(core.SetReplicationStatus(&pb.RegionReplicationStatus{ State: pb.RegionReplicationState_INTEGRITY_OVER_LABEL, StateId: rep.drAutoSync.StateID, })) cluster.PutRegion(region) - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateSync, rep.drGetState()) assertStateIDUpdate() } @@ -367,25 +376,27 @@ func TestReplicateState(t *testing.T) { stateID := rep.drAutoSync.StateID // replicate after initialized + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[1]) // repliate state to new member replicator.memberIDs = append(replicator.memberIDs, 2, 3) - rep.checkReplicateFile() + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[2]) re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[3]) // inject error replicator.errors[2] = errors.New("failed to persist") - rep.tickDR() // switch async_wait since there is only one zone + rep.tickUpdateState() // switch async_wait since there is only one zone newStateID := rep.drAutoSync.StateID + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d}`, newStateID), replicator.lastData[1]) re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[2]) re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d}`, newStateID), replicator.lastData[3]) // clear error, replicate to node 2 next time delete(replicator.errors, 2) - rep.checkReplicateFile() + rep.tickReplicateStatus() re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d}`, newStateID), replicator.lastData[2]) } @@ -412,7 +423,7 @@ func TestAsynctimeout(t *testing.T) { cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone2"}) setStoreState(cluster, "up", "up", "down") - rep.tickDR() + rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) }