diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index bfceff6d1a333..0a0eba286648e 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -44,16 +44,18 @@ const ( MaxObsoletTimeLimit = time.Hour ) -// MPPSotreState the state for MPPStore. -type MPPSotreState struct { +// MPPStoreState the state for MPPStore. +type MPPStoreState struct { address string // MPPStore TiFlash address tikvClient tikv.Client - lock sync.Mutex + lock struct { + sync.Mutex - recoveryTime time.Time - lastLookupTime time.Time - lastDetectTime time.Time + recoveryTime time.Time + lastLookupTime time.Time + lastDetectTime time.Time + } } // MPPFailedStoreProber use for detecting of failed TiFlash instance @@ -71,40 +73,40 @@ type MPPFailedStoreProber struct { maxObsoletTimeLimit time.Duration } -func (t *MPPSotreState) detect(ctx context.Context, detectPeriod time.Duration, detectTimeoutLimit time.Duration) { - if time.Since(t.lastDetectTime) < detectPeriod { +func (t *MPPStoreState) detect(ctx context.Context, detectPeriod time.Duration, detectTimeoutLimit time.Duration) { + if time.Since(t.lock.lastDetectTime) < detectPeriod { return } - defer func() { t.lastDetectTime = time.Now() }() + defer func() { t.lock.lastDetectTime = time.Now() }() metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(0) ok := detectMPPStore(ctx, t.tikvClient, t.address, detectTimeoutLimit) if !ok { metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(1) - t.recoveryTime = time.Time{} // if detect failed,reset recovery time to zero. + t.lock.recoveryTime = time.Time{} // if detect failed,reset recovery time to zero. return } // record the time of the first recovery - if t.recoveryTime.IsZero() { - t.recoveryTime = time.Now() + if t.lock.recoveryTime.IsZero() { + t.lock.recoveryTime = time.Now() } } -func (t *MPPSotreState) isRecovery(ctx context.Context, recoveryTTL time.Duration) bool { +func (t *MPPStoreState) isRecovery(ctx context.Context, recoveryTTL time.Duration) bool { if !t.lock.TryLock() { return false } defer t.lock.Unlock() - t.lastLookupTime = time.Now() - if !t.recoveryTime.IsZero() && time.Since(t.recoveryTime) > recoveryTTL { + t.lock.lastLookupTime = time.Now() + if !t.lock.recoveryTime.IsZero() && time.Since(t.lock.recoveryTime) > recoveryTTL { return true } logutil.Logger(ctx).Debug("Cannot detect store's availability "+ "because the current time has not recovery or wait mppStoreFailTTL", zap.String("store address", t.address), - zap.Time("recovery time", t.recoveryTime), + zap.Time("recovery time", t.lock.recoveryTime), zap.Duration("MPPStoreFailTTL", recoveryTTL)) return false } @@ -118,9 +120,9 @@ func (t MPPFailedStoreProber) scan(ctx context.Context) { do := func(k, v any) { address := fmt.Sprint(k) - state, ok := v.(*MPPSotreState) + state, ok := v.(*MPPStoreState) if !ok { - logutil.BgLogger().Warn("MPPSotreState struct assert failed,will be clean", + logutil.BgLogger().Warn("MPPStoreState struct assert failed,will be clean", zap.String("address", address)) t.Delete(address) return @@ -134,10 +136,10 @@ func (t MPPFailedStoreProber) scan(ctx context.Context) { state.detect(ctx, t.detectPeriod, t.detectTimeoutLimit) // clean restored store - if !state.recoveryTime.IsZero() && time.Since(state.recoveryTime) > t.maxRecoveryTimeLimit { + if !state.lock.recoveryTime.IsZero() && time.Since(state.lock.recoveryTime) > t.maxRecoveryTimeLimit { t.Delete(address) // clean store that may be obsolete - } else if state.recoveryTime.IsZero() && time.Since(state.lastLookupTime) > t.maxObsoletTimeLimit { + } else if state.lock.recoveryTime.IsZero() && time.Since(state.lock.lastLookupTime) > t.maxObsoletTimeLimit { t.Delete(address) } } @@ -153,11 +155,11 @@ func (t MPPFailedStoreProber) scan(ctx context.Context) { // Add add a store when sync probe failed func (t *MPPFailedStoreProber) Add(ctx context.Context, address string, tikvClient tikv.Client) { - state := MPPSotreState{ - address: address, - tikvClient: tikvClient, - lastLookupTime: time.Now(), + state := MPPStoreState{ + address: address, + tikvClient: tikvClient, } + state.lock.lastLookupTime = time.Now() logutil.Logger(ctx).Debug("add mpp store to failed list", zap.String("address", address)) t.failedMPPStores.Store(address, &state) } @@ -172,9 +174,9 @@ func (t *MPPFailedStoreProber) IsRecovery(ctx context.Context, address string, r return true } - state, ok := v.(*MPPSotreState) + state, ok := v.(*MPPStoreState) if !ok { - logutil.BgLogger().Warn("MPPSotreState struct assert failed,will be clean", + logutil.BgLogger().Warn("MPPStoreState struct assert failed,will be clean", zap.String("address", address)) t.Delete(address) return false