Skip to content

Commit

Permalink
fix some problem
Browse files Browse the repository at this point in the history
  • Loading branch information
hackersean committed Dec 22, 2022
1 parent 59b45b8 commit 18226b2
Showing 1 changed file with 28 additions and 26 deletions.
54 changes: 28 additions & 26 deletions store/copr/mpp_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,18 @@ const (
MaxObsoletTimeLimit = time.Hour
)

// MPPSotreState the state for MPPStore.
type MPPSotreState struct {
// MPPStoreState the state for MPPStore.
type MPPStoreState struct {
address string // MPPStore TiFlash address
tikvClient tikv.Client

lock sync.Mutex
lock struct {
sync.Mutex

recoveryTime time.Time
lastLookupTime time.Time
lastDetectTime time.Time
recoveryTime time.Time
lastLookupTime time.Time
lastDetectTime time.Time
}
}

// MPPFailedStoreProber use for detecting of failed TiFlash instance
Expand All @@ -71,40 +73,40 @@ type MPPFailedStoreProber struct {
maxObsoletTimeLimit time.Duration
}

func (t *MPPSotreState) detect(ctx context.Context, detectPeriod time.Duration, detectTimeoutLimit time.Duration) {
if time.Since(t.lastDetectTime) < detectPeriod {
func (t *MPPStoreState) detect(ctx context.Context, detectPeriod time.Duration, detectTimeoutLimit time.Duration) {
if time.Since(t.lock.lastDetectTime) < detectPeriod {
return
}

defer func() { t.lastDetectTime = time.Now() }()
defer func() { t.lock.lastDetectTime = time.Now() }()
metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(0)
ok := detectMPPStore(ctx, t.tikvClient, t.address, detectTimeoutLimit)
if !ok {
metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(1)
t.recoveryTime = time.Time{} // if detect failed,reset recovery time to zero.
t.lock.recoveryTime = time.Time{} // if detect failed,reset recovery time to zero.
return
}

// record the time of the first recovery
if t.recoveryTime.IsZero() {
t.recoveryTime = time.Now()
if t.lock.recoveryTime.IsZero() {
t.lock.recoveryTime = time.Now()
}
}

func (t *MPPSotreState) isRecovery(ctx context.Context, recoveryTTL time.Duration) bool {
func (t *MPPStoreState) isRecovery(ctx context.Context, recoveryTTL time.Duration) bool {
if !t.lock.TryLock() {
return false
}
defer t.lock.Unlock()

t.lastLookupTime = time.Now()
if !t.recoveryTime.IsZero() && time.Since(t.recoveryTime) > recoveryTTL {
t.lock.lastLookupTime = time.Now()
if !t.lock.recoveryTime.IsZero() && time.Since(t.lock.recoveryTime) > recoveryTTL {
return true
}
logutil.Logger(ctx).Debug("Cannot detect store's availability "+
"because the current time has not recovery or wait mppStoreFailTTL",
zap.String("store address", t.address),
zap.Time("recovery time", t.recoveryTime),
zap.Time("recovery time", t.lock.recoveryTime),
zap.Duration("MPPStoreFailTTL", recoveryTTL))
return false
}
Expand All @@ -118,9 +120,9 @@ func (t MPPFailedStoreProber) scan(ctx context.Context) {

do := func(k, v any) {
address := fmt.Sprint(k)
state, ok := v.(*MPPSotreState)
state, ok := v.(*MPPStoreState)
if !ok {
logutil.BgLogger().Warn("MPPSotreState struct assert failed,will be clean",
logutil.BgLogger().Warn("MPPStoreState struct assert failed,will be clean",
zap.String("address", address))
t.Delete(address)
return
Expand All @@ -134,10 +136,10 @@ func (t MPPFailedStoreProber) scan(ctx context.Context) {
state.detect(ctx, t.detectPeriod, t.detectTimeoutLimit)

// clean restored store
if !state.recoveryTime.IsZero() && time.Since(state.recoveryTime) > t.maxRecoveryTimeLimit {
if !state.lock.recoveryTime.IsZero() && time.Since(state.lock.recoveryTime) > t.maxRecoveryTimeLimit {
t.Delete(address)
// clean store that may be obsolete
} else if state.recoveryTime.IsZero() && time.Since(state.lastLookupTime) > t.maxObsoletTimeLimit {
} else if state.lock.recoveryTime.IsZero() && time.Since(state.lock.lastLookupTime) > t.maxObsoletTimeLimit {
t.Delete(address)
}
}
Expand All @@ -153,11 +155,11 @@ func (t MPPFailedStoreProber) scan(ctx context.Context) {

// Add add a store when sync probe failed
func (t *MPPFailedStoreProber) Add(ctx context.Context, address string, tikvClient tikv.Client) {
state := MPPSotreState{
address: address,
tikvClient: tikvClient,
lastLookupTime: time.Now(),
state := MPPStoreState{
address: address,
tikvClient: tikvClient,
}
state.lock.lastLookupTime = time.Now()
logutil.Logger(ctx).Debug("add mpp store to failed list", zap.String("address", address))
t.failedMPPStores.Store(address, &state)
}
Expand All @@ -172,9 +174,9 @@ func (t *MPPFailedStoreProber) IsRecovery(ctx context.Context, address string, r
return true
}

state, ok := v.(*MPPSotreState)
state, ok := v.(*MPPStoreState)
if !ok {
logutil.BgLogger().Warn("MPPSotreState struct assert failed,will be clean",
logutil.BgLogger().Warn("MPPStoreState struct assert failed,will be clean",
zap.String("address", address))
t.Delete(address)
return false
Expand Down

0 comments on commit 18226b2

Please sign in to comment.