From 18e1bde892e35f042b7cae1abac4d1a586573668 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Tue, 8 May 2018 08:29:24 -0700 Subject: [PATCH 1/5] healthcheck: refactor stream timeout code Instead of looping through every stream, this change makes every stream goroutine perform its own timeout check. Signed-off-by: Sugu Sougoumarane --- go/vt/discovery/healthcheck.go | 134 +++++++++++----------------- go/vt/discovery/healthcheck_test.go | 25 ++++++ 2 files changed, 75 insertions(+), 84 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 0719d90308d..cae7d963d16 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -297,13 +297,16 @@ type healthCheckConn struct { ctx context.Context cancelFunc context.CancelFunc + // healthCheckTimeout specifies how long to wait for + // a health check update. + healthCheckTimeout time.Duration + // mu protects all the following fields. // When locking both mutex from HealthCheck and healthCheckConn, // HealthCheck.mu goes first. // Note tabletStats.Tablet and tabletStats.Name are immutable. mu sync.RWMutex conn queryservice.QueryService - streamCancelFunc context.CancelFunc tabletStats TabletStats loggedServingState bool lastResponseTimestamp time.Time // timestamp of the last healthcheck response @@ -316,7 +319,6 @@ type HealthCheckImpl struct { sendDownEvents bool retryDelay time.Duration healthCheckTimeout time.Duration - closeChan chan struct{} // signals the process gorouting to terminate // wg keeps track of all launched Go routines. wg sync.WaitGroup @@ -351,34 +353,8 @@ func NewHealthCheck(retryDelay, healthCheckTimeout time.Duration) HealthCheck { addrToConns: make(map[string]*healthCheckConn), retryDelay: retryDelay, healthCheckTimeout: healthCheckTimeout, - closeChan: make(chan struct{}), } - hc.wg.Add(1) - go func() { - defer hc.wg.Done() - // Start another go routine to check timeout. - // Currently vttablet sends healthcheck response every 20 seconds. - // We set the default timeout to 1 minute (20s * 3), - // and also perform the timeout check in sync with vttablet frequency. - // When we change the healthcheck frequency on vttablet, - // we should also adjust here. - t := time.NewTicker(healthCheckTimeout / 3) - defer t.Stop() - for { - select { - case <-hc.closeChan: - return - case _, ok := <-t.C: - if !ok { - // the ticker stoped - return - } - hc.checkHealthCheckTimeout() - } - } - }() - healthcheckOnce.Do(func() { http.Handle("/debug/gateway", hc) }) @@ -496,18 +472,56 @@ func (hc *HealthCheckImpl) checkConn(hcc *healthCheckConn, name string) { retryDelay := hc.retryDelay for { - ctx, cancel := context.WithCancel(hcc.ctx) - hcc.mu.Lock() - hcc.streamCancelFunc = cancel - hcc.mu.Unlock() + streamCtx, streamCancel := context.WithCancel(hcc.ctx) + + // Setup a watcher that restarts the timer every time an update is received. + // If a timeout occurs for a serving tablet, we make it non-serving and send + // a status update. The stream is also terminated so it can be retried. + // servingStatus feeds into the serving var, which keeps track of the serving + // status transmitted by the tablet. + servingStatus := make(chan bool, 5) + serving := ts.Serving + go func() { + for { + select { + case serving = <-servingStatus: + continue + case <-time.After(hcc.healthCheckTimeout): + // Ignore if not serving. + if !serving { + continue + } + hcc.mu.Lock() + hcc.tabletStats.LastError = fmt.Errorf("healthcheck timed out (latest %v)", hcc.lastResponseTimestamp) + hcc.setServingState(false, hcc.tabletStats.LastError.Error()) + hcc.mu.Unlock() + ts := hcc.tabletStats + if hc.listener != nil { + hc.listener.StatsUpdate(&ts) + } + hcErrorCounters.Add([]string{ts.Target.Keyspace, ts.Target.Shard, topoproto.TabletTypeLString(ts.Target.TabletType)}, 1) + streamCancel() + return + case <-streamCtx.Done(): + // This is a failsafe code path. If a stream terminates when + // a tablet is in not serving state, this function will loop + // forever. + return + } + } + }() // Read stream health responses. - hcc.stream(ctx, hc, func(shr *querypb.StreamHealthResponse) error { + hcc.stream(streamCtx, hc, func(shr *querypb.StreamHealthResponse) error { // We received a message. Reset the back-off. retryDelay = hc.retryDelay + servingStatus <- shr.Serving return hcc.processResponse(hc, shr) }) + // streamCancel to make sure the watcher goroutine terminates. + streamCancel() + // Streaming RPC failed e.g. because vttablet was restarted or took too long. // Sleep until the next retry is up or the context is done/canceled. select { @@ -667,54 +681,6 @@ func (hcc *healthCheckConn) update(shr *querypb.StreamHealthResponse, serving bo return hcc.tabletStats } -func (hc *HealthCheckImpl) checkHealthCheckTimeout() { - hc.mu.RLock() - list := make([]*healthCheckConn, 0, len(hc.addrToConns)) - for _, hcc := range hc.addrToConns { - list = append(list, hcc) - } - hc.mu.RUnlock() - for _, hcc := range list { - hcc.mu.RLock() - if !hcc.tabletStats.Serving { - // ignore non-serving tablet - hcc.mu.RUnlock() - continue - } - if time.Now().Sub(hcc.lastResponseTimestamp) < hc.healthCheckTimeout { - // received a healthcheck response recently - hcc.mu.RUnlock() - continue - } - hcc.mu.RUnlock() - // mark the tablet non-serving as we have not seen a health check response for a long time - hcc.mu.Lock() - // check again to avoid race condition - if !hcc.tabletStats.Serving { - // ignore non-serving tablet - hcc.mu.Unlock() - continue - } - if time.Now().Sub(hcc.lastResponseTimestamp) < hc.healthCheckTimeout { - // received a healthcheck response recently - hcc.mu.Unlock() - continue - } - - //Timeout detected. Cancel the current streaming RPC and let checkConn() restart it. - hcc.streamCancelFunc() - hcc.tabletStats.LastError = fmt.Errorf("healthcheck timed out (latest %v)", hcc.lastResponseTimestamp) - hcc.setServingState(false, hcc.tabletStats.LastError.Error()) - ts := hcc.tabletStats - hcc.mu.Unlock() - // notify downstream for serving status change - if hc.listener != nil { - hc.listener.StatsUpdate(&ts) - } - hcErrorCounters.Add([]string{ts.Target.Keyspace, ts.Target.Shard, topoproto.TabletTypeLString(ts.Target.TabletType)}, 1) - } -} - func (hc *HealthCheckImpl) deleteConn(tablet *topodatapb.Tablet) { hc.mu.Lock() defer hc.mu.Unlock() @@ -757,8 +723,9 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet, name string) { ctx, cancelFunc := context.WithCancel(context.Background()) key := TabletToMapKey(tablet) hcc := &healthCheckConn{ - ctx: ctx, - cancelFunc: cancelFunc, + ctx: ctx, + cancelFunc: cancelFunc, + healthCheckTimeout: hc.healthCheckTimeout, tabletStats: TabletStats{ Key: key, Tablet: tablet, @@ -938,7 +905,6 @@ func (hc *HealthCheckImpl) cacheStatusMap() map[string]*TabletsCacheStatus { // currently executing and won't be called again. func (hc *HealthCheckImpl) Close() error { hc.mu.Lock() - close(hc.closeChan) for _, hcc := range hc.addrToConns { hcc.cancelFunc() } diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index 1bb7d570145..616a15dfdff 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -545,6 +545,25 @@ func TestHealthCheckTimeout(t *testing.T) { t.Errorf("StreamHealth should be canceled after timeout, but is not") } + // repeat the wait. There should be no error or cancelation. + fc.resetCanceledFlag() + time.Sleep(2 * timeout) + t.Logf(`Sleep(2 * timeout)`) + + select { + case res = <-l.output: + t.Errorf(`<-l.output: %+v; want not message`, res) + default: + } + + if err := checkErrorCounter("k", "s", topodatapb.TabletType_MASTER, 1); err != nil { + t.Errorf("%v", err) + } + + if fc.isCanceled() { + t.Errorf("StreamHealth should not be canceled after timeout") + } + // send a healthcheck response, it should be serving again input <- shr t.Logf(`input <- {{Keyspace: "k", Shard: "s", TabletType: MASTER}, Serving: true, TabletExternallyReparentedTimestamp: 10, {SecondsBehindMaster: 1, CpuUsage: 0.2}}`) @@ -694,6 +713,12 @@ func (fc *fakeConn) isCanceled() bool { return fc.canceled } +func (fc *fakeConn) resetCanceledFlag() { + fc.mu.Lock() + defer fc.mu.Unlock() + fc.canceled = false +} + func checkErrorCounter(keyspace, shard string, tabletType topodatapb.TabletType, want int64) error { statsKey := []string{keyspace, shard, topoproto.TabletTypeLString(tabletType)} name := strings.Join(statsKey, ".") From 0f3a18e8a8b4dda7a4a2b4b5320ba83a1376b164 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Tue, 8 May 2018 15:17:22 -0700 Subject: [PATCH 2/5] healthcheck: refactor locking scheme This change gets rid of all locks in hcc. Instead we use the approach of "sharing by communicating". Whenever there is a change in state, hcc communicates the change to hc, which then performs the necessary updates and handling. Also, now that hc updates are trivial, the lock has been changed to a simple Mutex, which is more efficient that RWMutex. Signed-off-by: Sugu Sougoumarane --- go/vt/discovery/healthcheck.go | 271 +++++++++++++++------------------ 1 file changed, 125 insertions(+), 146 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index cae7d963d16..faee85dd00d 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -204,6 +204,12 @@ func (e *TabletStats) DeepEqual(f *TabletStats) bool { (e.LastError != nil && f.LastError != nil && e.LastError.Error() == f.LastError.Error())) } +// Copy produces a copy of TabletStats. +func (e *TabletStats) Copy() *TabletStats { + ts := *e + return &ts +} + // GetTabletHostPort formats a tablet host port address. func (e TabletStats) GetTabletHostPort() string { vtPort := e.Tablet.PortMap["vt"] @@ -294,24 +300,25 @@ type HealthCheck interface { // about a tablet. type healthCheckConn struct { // set at construction time - ctx context.Context - cancelFunc context.CancelFunc - - // healthCheckTimeout specifies how long to wait for - // a health check update. - healthCheckTimeout time.Duration + ctx context.Context // mu protects all the following fields. // When locking both mutex from HealthCheck and healthCheckConn, // HealthCheck.mu goes first. // Note tabletStats.Tablet and tabletStats.Name are immutable. - mu sync.RWMutex conn queryservice.QueryService tabletStats TabletStats loggedServingState bool lastResponseTimestamp time.Time // timestamp of the last healthcheck response } +// tabletHealth maintains the health status of a tablet fed by the health check connection. +type tabletHealth struct { + cancelFunc context.CancelFunc + conn queryservice.QueryService + tabletStats TabletStats +} + // HealthCheckImpl performs health checking and notifies downstream components about any changes. type HealthCheckImpl struct { // Immutable fields set at construction time. @@ -325,10 +332,10 @@ type HealthCheckImpl struct { // mu protects all the following fields. // When locking both mutex from HealthCheck and healthCheckConn, // HealthCheck.mu goes first. - mu sync.RWMutex + mu sync.Mutex - // addrToConns maps from address to the healthCheckConn object. - addrToConns map[string]*healthCheckConn + // addrToHealth maps from address to tabletHealth. + addrToHealth map[string]*tabletHealth // Wait group that's used to wait until all initial StatsUpdate() calls are made after the AddTablet() calls. initialUpdatesWG sync.WaitGroup @@ -350,7 +357,7 @@ func NewDefaultHealthCheck() HealthCheck { // not healthy. func NewHealthCheck(retryDelay, healthCheckTimeout time.Duration) HealthCheck { hc := &HealthCheckImpl{ - addrToConns: make(map[string]*healthCheckConn), + addrToHealth: make(map[string]*tabletHealth), retryDelay: retryDelay, healthCheckTimeout: healthCheckTimeout, } @@ -394,16 +401,13 @@ func (hc *HealthCheckImpl) ServeHTTP(w http.ResponseWriter, _ *http.Request) { // servingConnStats returns the number of serving tablets per keyspace/shard/tablet type. func (hc *HealthCheckImpl) servingConnStats() map[string]int64 { res := make(map[string]int64) - hc.mu.RLock() - defer hc.mu.RUnlock() - for _, hcc := range hc.addrToConns { - hcc.mu.RLock() - if !hcc.tabletStats.Up || !hcc.tabletStats.Serving || hcc.tabletStats.LastError != nil { - hcc.mu.RUnlock() + hc.mu.Lock() + defer hc.mu.Unlock() + for _, th := range hc.addrToHealth { + if !th.tabletStats.Up || !th.tabletStats.Serving || th.tabletStats.LastError != nil { continue } - key := fmt.Sprintf("%s.%s.%s", hcc.tabletStats.Target.Keyspace, hcc.tabletStats.Target.Shard, topoproto.TabletTypeLString(hcc.tabletStats.Target.TabletType)) - hcc.mu.RUnlock() + key := fmt.Sprintf("%s.%s.%s", th.tabletStats.Target.Keyspace, th.tabletStats.Target.Shard, topoproto.TabletTypeLString(th.tabletStats.Target.TabletType)) res[key]++ } return res @@ -431,29 +435,62 @@ func (hc *HealthCheckImpl) stateChecksum() int64 { return int64(crc32.ChecksumIEEE(buf.Bytes())) } +// updateHealth updates the tabletHealth record and transmits the tablet stats +// to the listener. +func (hc *HealthCheckImpl) updateHealth(ts *TabletStats, conn queryservice.QueryService) { + // Unconditionally send the received update at the end. + defer func() { + if hc.listener != nil { + hc.listener.StatsUpdate(ts) + } + }() + + hc.mu.Lock() + th, ok := hc.addrToHealth[ts.Key] + if !ok { + // This can happen on delete because the entry is removed first. + hc.mu.Unlock() + return + } + oldts := th.tabletStats + th.tabletStats = *ts + th.conn = conn + hc.mu.Unlock() + + // In the case where a tablet changes type (but not for the + // initial message), we want to log it, and maybe advertise it too. + if oldts.Target.TabletType != topodatapb.TabletType_UNKNOWN && oldts.Target.TabletType != ts.Target.TabletType { + // Log and maybe notify + log.Infof("HealthCheckUpdate(Type Change): %v, tablet: %s, target %+v => %+v, reparent time: %v", + oldts.Name, topotools.TabletIdent(oldts.Tablet), topotools.TargetIdent(oldts.Target), topotools.TargetIdent(ts.Target), ts.TabletExternallyReparentedTimestamp) + if hc.listener != nil && hc.sendDownEvents { + oldts.Up = false + hc.listener.StatsUpdate(&oldts) + } + + // Track how often a tablet gets promoted to master. It is used for + // comparing against the variables in go/vtgate/buffer/variables.go. + if oldts.Target.TabletType != topodatapb.TabletType_MASTER && ts.Target.TabletType == topodatapb.TabletType_MASTER { + hcMasterPromotedCounters.Add([]string{ts.Target.Keyspace, ts.Target.Shard}, 1) + } + } +} + // finalizeConn closes the health checking connection and sends the final // notification about the tablet to downstream. To be called only on exit from // checkConn(). func (hc *HealthCheckImpl) finalizeConn(hcc *healthCheckConn) { - hcc.mu.Lock() - hccConn := hcc.conn - hccCtx := hcc.ctx - hcc.conn = nil + if hcc.conn != nil { + // Don't use hcc.ctx because it's already closed. + hcc.conn.Close(context.Background()) + hcc.conn = nil + } hcc.tabletStats.Up = false hcc.setServingState(false, "finalizeConn closing connection") // Note: checkConn() exits only when hcc.ctx.Done() is closed. Thus it's // safe to simply get Err() value here and assign to LastError. hcc.tabletStats.LastError = hcc.ctx.Err() - ts := hcc.tabletStats - hcc.mu.Unlock() - - if hccConn != nil { - hccConn.Close(hccCtx) - } - - if hc.listener != nil { - hc.listener.StatsUpdate(&ts) - } + hc.updateHealth(hcc.tabletStats.Copy(), hcc.conn) } // checkConn performs health checking on the given tablet. @@ -462,12 +499,7 @@ func (hc *HealthCheckImpl) checkConn(hcc *healthCheckConn, name string) { defer hc.finalizeConn(hcc) // Initial notification for downstream about the tablet existence. - hcc.mu.Lock() - ts := hcc.tabletStats - hcc.mu.Unlock() - if hc.listener != nil { - hc.listener.StatsUpdate(&ts) - } + hc.updateHealth(hcc.tabletStats.Copy(), hcc.conn) hc.initialUpdatesWG.Done() retryDelay := hc.retryDelay @@ -480,32 +512,25 @@ func (hc *HealthCheckImpl) checkConn(hcc *healthCheckConn, name string) { // servingStatus feeds into the serving var, which keeps track of the serving // status transmitted by the tablet. servingStatus := make(chan bool, 5) - serving := ts.Serving + serving := hcc.tabletStats.Serving + timedout := false go func() { for { select { case serving = <-servingStatus: continue - case <-time.After(hcc.healthCheckTimeout): + case <-time.After(hc.healthCheckTimeout): // Ignore if not serving. if !serving { continue } - hcc.mu.Lock() - hcc.tabletStats.LastError = fmt.Errorf("healthcheck timed out (latest %v)", hcc.lastResponseTimestamp) - hcc.setServingState(false, hcc.tabletStats.LastError.Error()) - hcc.mu.Unlock() - ts := hcc.tabletStats - if hc.listener != nil { - hc.listener.StatsUpdate(&ts) - } - hcErrorCounters.Add([]string{ts.Target.Keyspace, ts.Target.Shard, topoproto.TabletTypeLString(ts.Target.TabletType)}, 1) + timedout = true streamCancel() return case <-streamCtx.Done(): - // This is a failsafe code path. If a stream terminates when - // a tablet is in not serving state, this function will loop - // forever. + // If stream returns while serving is false, the function + // will get stuck in an infinite loop. This code path + // breaks the loop. return } } @@ -522,6 +547,16 @@ func (hc *HealthCheckImpl) checkConn(hcc *healthCheckConn, name string) { // streamCancel to make sure the watcher goroutine terminates. streamCancel() + // If there was a timeout send an error. We do this after stream has returned. + // This will ensure that this update prevails over any previous message that + // stream could have sent. + if timedout { + hcc.tabletStats.LastError = fmt.Errorf("healthcheck timed out (latest %v)", hcc.lastResponseTimestamp) + hcc.setServingState(false, hcc.tabletStats.LastError.Error()) + hc.updateHealth(hcc.tabletStats.Copy(), hcc.conn) + hcErrorCounters.Add([]string{hcc.tabletStats.Target.Keyspace, hcc.tabletStats.Target.Shard, topoproto.TabletTypeLString(hcc.tabletStats.Target.TabletType)}, 1) + } + // Streaming RPC failed e.g. because vttablet was restarted or took too long. // Sleep until the next retry is up or the context is done/canceled. select { @@ -562,41 +597,23 @@ func (hcc *healthCheckConn) setServingState(serving bool, reason string) { // stream streams healthcheck responses to callback. func (hcc *healthCheckConn) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*querypb.StreamHealthResponse) error) { - hcc.mu.Lock() - conn := hcc.conn - hcc.mu.Unlock() - - if conn == nil { - var err error - conn, err = tabletconn.GetDialer()(hcc.tabletStats.Tablet, grpcclient.FailFast(true)) + if hcc.conn == nil { + conn, err := tabletconn.GetDialer()(hcc.tabletStats.Tablet, grpcclient.FailFast(true)) if err != nil { - hcc.mu.Lock() hcc.tabletStats.LastError = err - hcc.mu.Unlock() return } - - hcc.mu.Lock() hcc.conn = conn hcc.tabletStats.LastError = nil - hcc.mu.Unlock() } - if err := conn.StreamHealth(ctx, callback); err != nil { - hcc.mu.Lock() + if err := hcc.conn.StreamHealth(ctx, callback); err != nil { + hcc.conn.Close(ctx) hcc.conn = nil hcc.setServingState(false, err.Error()) hcc.tabletStats.LastError = err - ts := hcc.tabletStats - hcc.mu.Unlock() - // notify downstream for serving status change - if hc.listener != nil { - hc.listener.StatsUpdate(&ts) - } - conn.Close(ctx) - return + hc.updateHealth(hcc.tabletStats.Copy(), hcc.conn) } - return } // processResponse reads one health check response, and notifies HealthCheckStatsListener. @@ -612,10 +629,6 @@ func (hcc *healthCheckConn) processResponse(hc *HealthCheckImpl, shr *querypb.St return fmt.Errorf("health stats is not valid: %v", shr) } - hcc.mu.RLock() - oldTs := hcc.tabletStats - hcc.mu.RUnlock() - // an app-level error from tablet, force serving state. var healthErr error serving := shr.Serving @@ -624,28 +637,10 @@ func (hcc *healthCheckConn) processResponse(hc *HealthCheckImpl, shr *querypb.St serving = false } - // oldTs.Tablet.Alias.Uid may be 0 because the youtube internal mechanism uses a different + // hcc.TabletStats.Tablet.Alias.Uid may be 0 because the youtube internal mechanism uses a different // code path to initialize this value. If so, we should skip this check. - if shr.TabletAlias != nil && oldTs.Tablet.Alias.Uid != 0 && !proto.Equal(shr.TabletAlias, oldTs.Tablet.Alias) { - return fmt.Errorf("health stats mismatch, tablet %+v alias does not match response alias %v", oldTs.Tablet, shr.TabletAlias) - } - - // In the case where a tablet changes type (but not for the - // initial message), we want to log it, and maybe advertise it too. - if hcc.tabletStats.Target.TabletType != topodatapb.TabletType_UNKNOWN && hcc.tabletStats.Target.TabletType != shr.Target.TabletType { - // Log and maybe notify - log.Infof("HealthCheckUpdate(Type Change): %v, tablet: %s, target %+v => %+v, reparent time: %v", - oldTs.Name, topotools.TabletIdent(oldTs.Tablet), topotools.TargetIdent(oldTs.Target), topotools.TargetIdent(shr.Target), shr.TabletExternallyReparentedTimestamp) - if hc.listener != nil && hc.sendDownEvents { - oldTs.Up = false - hc.listener.StatsUpdate(&oldTs) - } - - // Track how often a tablet gets promoted to master. It is used for - // comparing against the variables in go/vtgate/buffer/variables.go. - if oldTs.Target.TabletType != topodatapb.TabletType_MASTER && shr.Target.TabletType == topodatapb.TabletType_MASTER { - hcMasterPromotedCounters.Add([]string{shr.Target.Keyspace, shr.Target.Shard}, 1) - } + if shr.TabletAlias != nil && hcc.tabletStats.Tablet.Alias.Uid != 0 && !proto.Equal(shr.TabletAlias, hcc.tabletStats.Tablet.Alias) { + return fmt.Errorf("health stats mismatch, tablet %+v alias does not match response alias %v", hcc.tabletStats.Tablet, shr.TabletAlias) } // In this case where a new tablet is initialized or a tablet type changes, we want to @@ -656,18 +651,6 @@ func (hcc *healthCheckConn) processResponse(hc *HealthCheckImpl, shr *querypb.St // Update our record, and notify downstream for tabletType and // realtimeStats change. - ts := hcc.update(shr, serving, healthErr) - if hc.listener != nil { - hc.listener.StatsUpdate(&ts) - } - return nil -} - -// update updates the stats of a healthCheckConn, and returns a copy -// of its tabletStats. -func (hcc *healthCheckConn) update(shr *querypb.StreamHealthResponse, serving bool, healthErr error) TabletStats { - hcc.mu.Lock() - defer hcc.mu.Unlock() hcc.lastResponseTimestamp = time.Now() hcc.tabletStats.Target = shr.Target hcc.tabletStats.TabletExternallyReparentedTimestamp = shr.TabletExternallyReparentedTimestamp @@ -678,7 +661,8 @@ func (hcc *healthCheckConn) update(shr *querypb.StreamHealthResponse, serving bo reason = "healthCheck update error: " + healthErr.Error() } hcc.setServingState(serving, reason) - return hcc.tabletStats + hc.updateHealth(hcc.tabletStats.Copy(), hcc.conn) + return nil } func (hc *HealthCheckImpl) deleteConn(tablet *topodatapb.Tablet) { @@ -686,16 +670,14 @@ func (hc *HealthCheckImpl) deleteConn(tablet *topodatapb.Tablet) { defer hc.mu.Unlock() key := TabletToMapKey(tablet) - hcc, ok := hc.addrToConns[key] + th, ok := hc.addrToHealth[key] if !ok { log.Warningf("deleting unknown tablet: %+v", tablet) return } - hcc.mu.Lock() - hcc.tabletStats.Up = false - hcc.mu.Unlock() - hcc.cancelFunc() - delete(hc.addrToConns, key) + th.tabletStats.Up = false + th.cancelFunc() + delete(hc.addrToHealth, key) } // SetListener sets the listener for healthcheck updates. @@ -708,7 +690,7 @@ func (hc *HealthCheckImpl) SetListener(listener HealthCheckStatsListener, sendDo hc.mu.Lock() defer hc.mu.Unlock() - if len(hc.addrToConns) > 0 { + if len(hc.addrToHealth) > 0 { panic("must not call SetListener after tablets were added") } @@ -723,9 +705,7 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet, name string) { ctx, cancelFunc := context.WithCancel(context.Background()) key := TabletToMapKey(tablet) hcc := &healthCheckConn{ - ctx: ctx, - cancelFunc: cancelFunc, - healthCheckTimeout: hc.healthCheckTimeout, + ctx: ctx, tabletStats: TabletStats{ Key: key, Tablet: tablet, @@ -735,12 +715,15 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet, name string) { }, } hc.mu.Lock() - if _, ok := hc.addrToConns[key]; ok { + if _, ok := hc.addrToHealth[key]; ok { hc.mu.Unlock() log.Warningf("adding duplicate tablet %v for %v: %+v", name, tablet.Alias.Cell, tablet) return } - hc.addrToConns[key] = hcc + hc.addrToHealth[key] = &tabletHealth{ + cancelFunc: cancelFunc, + tabletStats: hcc.tabletStats, + } hc.initialUpdatesWG.Add(1) hc.mu.Unlock() @@ -770,16 +753,14 @@ func (hc *HealthCheckImpl) WaitForInitialStatsUpdates() { // GetConnection returns the TabletConn of the given tablet. func (hc *HealthCheckImpl) GetConnection(key string) queryservice.QueryService { - hc.mu.RLock() - hcc := hc.addrToConns[key] - if hcc == nil { - hc.mu.RUnlock() + hc.mu.Lock() + defer hc.mu.Unlock() + + th := hc.addrToHealth[key] + if th == nil { return nil } - hc.mu.RUnlock() - hcc.mu.RLock() - defer hcc.mu.RUnlock() - return hcc.conn + return th.conn } // TabletsCacheStatus is the current tablets for a cell/target. @@ -879,22 +860,20 @@ func (hc *HealthCheckImpl) CacheStatus() TabletsCacheStatusList { func (hc *HealthCheckImpl) cacheStatusMap() map[string]*TabletsCacheStatus { tcsMap := make(map[string]*TabletsCacheStatus) - hc.mu.RLock() - defer hc.mu.RUnlock() - for _, hcc := range hc.addrToConns { - hcc.mu.RLock() - key := fmt.Sprintf("%v.%v.%v.%v", hcc.tabletStats.Tablet.Alias.Cell, hcc.tabletStats.Target.Keyspace, hcc.tabletStats.Target.Shard, hcc.tabletStats.Target.TabletType.String()) + hc.mu.Lock() + defer hc.mu.Unlock() + for _, th := range hc.addrToHealth { + key := fmt.Sprintf("%v.%v.%v.%v", th.tabletStats.Tablet.Alias.Cell, th.tabletStats.Target.Keyspace, th.tabletStats.Target.Shard, th.tabletStats.Target.TabletType.String()) var tcs *TabletsCacheStatus var ok bool if tcs, ok = tcsMap[key]; !ok { tcs = &TabletsCacheStatus{ - Cell: hcc.tabletStats.Tablet.Alias.Cell, - Target: hcc.tabletStats.Target, + Cell: th.tabletStats.Tablet.Alias.Cell, + Target: th.tabletStats.Target, } tcsMap[key] = tcs } - stats := hcc.tabletStats - hcc.mu.RUnlock() + stats := th.tabletStats tcs.TabletsStats = append(tcs.TabletsStats, &stats) } return tcsMap @@ -905,10 +884,10 @@ func (hc *HealthCheckImpl) cacheStatusMap() map[string]*TabletsCacheStatus { // currently executing and won't be called again. func (hc *HealthCheckImpl) Close() error { hc.mu.Lock() - for _, hcc := range hc.addrToConns { - hcc.cancelFunc() + for _, th := range hc.addrToHealth { + th.cancelFunc() } - hc.addrToConns = make(map[string]*healthCheckConn) + hc.addrToHealth = make(map[string]*tabletHealth) // Release the lock early or a pending checkHealthCheckTimeout // cannot get a read lock on it. hc.mu.Unlock() From 2debc94dd21ec43de43a4017562ae23452b85cf3 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Fri, 25 May 2018 21:56:19 -0700 Subject: [PATCH 3/5] healthcheck: address review comments. Signed-off-by: Sugu Sougoumarane --- go/vt/discovery/healthcheck.go | 117 +++++++++++++++++++-------------- 1 file changed, 68 insertions(+), 49 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index faee85dd00d..728ac9dfce7 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -53,6 +53,7 @@ import ( "golang.org/x/net/context" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo/topoproto" @@ -295,31 +296,13 @@ type HealthCheck interface { Close() error } -// healthCheckConn contains details about a tablet. -// It is used internally by HealthCheckImpl to keep all the info -// about a tablet. -type healthCheckConn struct { - // set at construction time - ctx context.Context - - // mu protects all the following fields. - // When locking both mutex from HealthCheck and healthCheckConn, - // HealthCheck.mu goes first. - // Note tabletStats.Tablet and tabletStats.Name are immutable. - conn queryservice.QueryService - tabletStats TabletStats - loggedServingState bool - lastResponseTimestamp time.Time // timestamp of the last healthcheck response -} - -// tabletHealth maintains the health status of a tablet fed by the health check connection. -type tabletHealth struct { - cancelFunc context.CancelFunc - conn queryservice.QueryService - tabletStats TabletStats -} - // HealthCheckImpl performs health checking and notifies downstream components about any changes. +// It contains a map of tabletHealth objects, each of which stores the health information for +// a tablet. A checkConn goroutine is spawned for each tabletHealh, which is responsible for +// keeping that tabletHealth up-to-date. This is done through callbacks to updateHealth. +// If checkConn terminates for any reason, it updates tabletHealth.Up as false. If a tabletHealth +// gets removed from the map, its cancelFunc gets called, which ensures that the associated +// checkConn goroutine eventually terminates. type HealthCheckImpl struct { // Immutable fields set at construction time. listener HealthCheckStatsListener @@ -330,8 +313,6 @@ type HealthCheckImpl struct { wg sync.WaitGroup // mu protects all the following fields. - // When locking both mutex from HealthCheck and healthCheckConn, - // HealthCheck.mu goes first. mu sync.Mutex // addrToHealth maps from address to tabletHealth. @@ -341,6 +322,37 @@ type HealthCheckImpl struct { initialUpdatesWG sync.WaitGroup } +// healthCheckConn is a structure that lives within the scope of +// the checkConn goroutine to maintain its internal state. Therefore, +// it does not require synchronization. Changes that are relevant to +// healthcheck are transmitted through calls to HealthCheckImpl.updateHealth. +// TODO(sougou): move this and associated functions to a separate file. +type healthCheckConn struct { + // set at construction time + ctx context.Context + + // mu protects all the following fields. + // When locking both mutex from HealthCheck and healthCheckConn, + // HealthCheck.mu goes first. + // Note tabletStats.Tablet and tabletStats.Name are immutable. + conn queryservice.QueryService + tabletStats TabletStats + loggedServingState bool + lastResponseTimestamp time.Time // timestamp of the last healthcheck response +} + +// tabletHealth maintains the health status of a tablet. A map of this +// structure is maintained in HealthCheckImpl. +type tabletHealth struct { + // cancelFunc must be called before discarding tabletHealth. + // This will ensure that the associated checkConn goroutine will terminate. + cancelFunc context.CancelFunc + // conn is the connection associated with the tablet. + conn queryservice.QueryService + // latestTabletStats stores the latest health stats of the tablet. + latestTabletStats TabletStats +} + // NewDefaultHealthCheck creates a new HealthCheck object with a default configuration. func NewDefaultHealthCheck() HealthCheck { return NewHealthCheck(DefaultHealthCheckRetryDelay, DefaultHealthCheckTimeout) @@ -404,10 +416,10 @@ func (hc *HealthCheckImpl) servingConnStats() map[string]int64 { hc.mu.Lock() defer hc.mu.Unlock() for _, th := range hc.addrToHealth { - if !th.tabletStats.Up || !th.tabletStats.Serving || th.tabletStats.LastError != nil { + if !th.latestTabletStats.Up || !th.latestTabletStats.Serving || th.latestTabletStats.LastError != nil { continue } - key := fmt.Sprintf("%s.%s.%s", th.tabletStats.Target.Keyspace, th.tabletStats.Target.Shard, topoproto.TabletTypeLString(th.tabletStats.Target.TabletType)) + key := fmt.Sprintf("%s.%s.%s", th.latestTabletStats.Target.Keyspace, th.latestTabletStats.Target.Shard, topoproto.TabletTypeLString(th.latestTabletStats.Target.TabletType)) res[key]++ } return res @@ -452,8 +464,8 @@ func (hc *HealthCheckImpl) updateHealth(ts *TabletStats, conn queryservice.Query hc.mu.Unlock() return } - oldts := th.tabletStats - th.tabletStats = *ts + oldts := th.latestTabletStats + th.latestTabletStats = *ts th.conn = conn hc.mu.Unlock() @@ -480,17 +492,17 @@ func (hc *HealthCheckImpl) updateHealth(ts *TabletStats, conn queryservice.Query // notification about the tablet to downstream. To be called only on exit from // checkConn(). func (hc *HealthCheckImpl) finalizeConn(hcc *healthCheckConn) { - if hcc.conn != nil { - // Don't use hcc.ctx because it's already closed. - hcc.conn.Close(context.Background()) - hcc.conn = nil - } hcc.tabletStats.Up = false hcc.setServingState(false, "finalizeConn closing connection") // Note: checkConn() exits only when hcc.ctx.Done() is closed. Thus it's // safe to simply get Err() value here and assign to LastError. hcc.tabletStats.LastError = hcc.ctx.Err() - hc.updateHealth(hcc.tabletStats.Copy(), hcc.conn) + hc.updateHealth(hcc.tabletStats.Copy(), nil) + if hcc.conn != nil { + // Don't use hcc.ctx because it's already closed. + hcc.conn.Close(context.Background()) + hcc.conn = nil + } } // checkConn performs health checking on the given tablet. @@ -511,10 +523,13 @@ func (hc *HealthCheckImpl) checkConn(hcc *healthCheckConn, name string) { // a status update. The stream is also terminated so it can be retried. // servingStatus feeds into the serving var, which keeps track of the serving // status transmitted by the tablet. - servingStatus := make(chan bool, 5) - serving := hcc.tabletStats.Serving - timedout := false + servingStatus := make(chan bool, 1) + // timedout is accessed atomically because there could be a race + // between the goroutine that sets it and the check for its value + // later. + timedout := sync2.NewAtomicBool(false) go func() { + serving := hcc.tabletStats.Serving for { select { case serving = <-servingStatus: @@ -524,7 +539,7 @@ func (hc *HealthCheckImpl) checkConn(hcc *healthCheckConn, name string) { if !serving { continue } - timedout = true + timedout.Set(true) streamCancel() return case <-streamCtx.Done(): @@ -540,7 +555,11 @@ func (hc *HealthCheckImpl) checkConn(hcc *healthCheckConn, name string) { hcc.stream(streamCtx, hc, func(shr *querypb.StreamHealthResponse) error { // We received a message. Reset the back-off. retryDelay = hc.retryDelay - servingStatus <- shr.Serving + // Don't block on send to avoid deadlocks. + select { + case servingStatus <- shr.Serving: + default: + } return hcc.processResponse(hc, shr) }) @@ -550,7 +569,7 @@ func (hc *HealthCheckImpl) checkConn(hcc *healthCheckConn, name string) { // If there was a timeout send an error. We do this after stream has returned. // This will ensure that this update prevails over any previous message that // stream could have sent. - if timedout { + if timedout.Get() { hcc.tabletStats.LastError = fmt.Errorf("healthcheck timed out (latest %v)", hcc.lastResponseTimestamp) hcc.setServingState(false, hcc.tabletStats.LastError.Error()) hc.updateHealth(hcc.tabletStats.Copy(), hcc.conn) @@ -675,7 +694,7 @@ func (hc *HealthCheckImpl) deleteConn(tablet *topodatapb.Tablet) { log.Warningf("deleting unknown tablet: %+v", tablet) return } - th.tabletStats.Up = false + th.latestTabletStats.Up = false th.cancelFunc() delete(hc.addrToHealth, key) } @@ -721,8 +740,8 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet, name string) { return } hc.addrToHealth[key] = &tabletHealth{ - cancelFunc: cancelFunc, - tabletStats: hcc.tabletStats, + cancelFunc: cancelFunc, + latestTabletStats: hcc.tabletStats, } hc.initialUpdatesWG.Add(1) hc.mu.Unlock() @@ -863,17 +882,17 @@ func (hc *HealthCheckImpl) cacheStatusMap() map[string]*TabletsCacheStatus { hc.mu.Lock() defer hc.mu.Unlock() for _, th := range hc.addrToHealth { - key := fmt.Sprintf("%v.%v.%v.%v", th.tabletStats.Tablet.Alias.Cell, th.tabletStats.Target.Keyspace, th.tabletStats.Target.Shard, th.tabletStats.Target.TabletType.String()) + key := fmt.Sprintf("%v.%v.%v.%v", th.latestTabletStats.Tablet.Alias.Cell, th.latestTabletStats.Target.Keyspace, th.latestTabletStats.Target.Shard, th.latestTabletStats.Target.TabletType.String()) var tcs *TabletsCacheStatus var ok bool if tcs, ok = tcsMap[key]; !ok { tcs = &TabletsCacheStatus{ - Cell: th.tabletStats.Tablet.Alias.Cell, - Target: th.tabletStats.Target, + Cell: th.latestTabletStats.Tablet.Alias.Cell, + Target: th.latestTabletStats.Target, } tcsMap[key] = tcs } - stats := th.tabletStats + stats := th.latestTabletStats tcs.TabletsStats = append(tcs.TabletsStats, &stats) } return tcsMap From a939f11acf6312b5816bbeb2e58d88e4682bd8df Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Sat, 26 May 2018 09:16:44 -0700 Subject: [PATCH 4/5] healthcheck: fix data race It turns out that moving the serving assignment within the goroutine introduces a data race. I've moved it back out. Also found another incidental data race: did you know that functions like t.Fatalf should not be called from goroutines? Signed-off-by: Sugu Sougoumarane --- go/vt/discovery/healthcheck.go | 2 +- go/vt/worker/split_clone_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 728ac9dfce7..7a9a53830b7 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -528,8 +528,8 @@ func (hc *HealthCheckImpl) checkConn(hcc *healthCheckConn, name string) { // between the goroutine that sets it and the check for its value // later. timedout := sync2.NewAtomicBool(false) + serving := hcc.tabletStats.Serving go func() { - serving := hcc.tabletStats.Serving for { select { case serving = <-servingStatus: diff --git a/go/vt/worker/split_clone_test.go b/go/vt/worker/split_clone_test.go index 184e87d1dc1..993771b2232 100644 --- a/go/vt/worker/split_clone_test.go +++ b/go/vt/worker/split_clone_test.go @@ -1019,7 +1019,7 @@ func TestSplitCloneV2_NoMasterAvailable(t *testing.T) { select { case <-ctx.Done(): - t.Fatalf("timed out waiting for vtworker to retry due to NoMasterAvailable: %v", ctx.Err()) + panic(fmt.Errorf("timed out waiting for vtworker to retry due to NoMasterAvailable: %v", ctx.Err())) case <-time.After(10 * time.Millisecond): // Poll constantly. } From de45de683908e4252c77becf3e43fffb38056d5d Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Tue, 9 Oct 2018 15:58:13 -0700 Subject: [PATCH 5/5] healthcheck: address comments Also found a race between Close and other calls that modify state. That case is also fixed. Signed-off-by: Sugu Sougoumarane --- go/vt/discovery/healthcheck.go | 40 +++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 7a9a53830b7..7fde2774bd4 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -298,7 +298,7 @@ type HealthCheck interface { // HealthCheckImpl performs health checking and notifies downstream components about any changes. // It contains a map of tabletHealth objects, each of which stores the health information for -// a tablet. A checkConn goroutine is spawned for each tabletHealh, which is responsible for +// a tablet. A checkConn goroutine is spawned for each tabletHealth, which is responsible for // keeping that tabletHealth up-to-date. This is done through callbacks to updateHealth. // If checkConn terminates for any reason, it updates tabletHealth.Up as false. If a tabletHealth // gets removed from the map, its cancelFunc gets called, which ensures that the associated @@ -309,8 +309,8 @@ type HealthCheckImpl struct { sendDownEvents bool retryDelay time.Duration healthCheckTimeout time.Duration - // wg keeps track of all launched Go routines. - wg sync.WaitGroup + // connsWG keeps track of all launched Go routines that monitor tablet connections. + connsWG sync.WaitGroup // mu protects all the following fields. mu sync.Mutex @@ -328,13 +328,8 @@ type HealthCheckImpl struct { // healthcheck are transmitted through calls to HealthCheckImpl.updateHealth. // TODO(sougou): move this and associated functions to a separate file. type healthCheckConn struct { - // set at construction time ctx context.Context - // mu protects all the following fields. - // When locking both mutex from HealthCheck and healthCheckConn, - // HealthCheck.mu goes first. - // Note tabletStats.Tablet and tabletStats.Name are immutable. conn queryservice.QueryService tabletStats TabletStats loggedServingState bool @@ -460,7 +455,8 @@ func (hc *HealthCheckImpl) updateHealth(ts *TabletStats, conn queryservice.Query hc.mu.Lock() th, ok := hc.addrToHealth[ts.Key] if !ok { - // This can happen on delete because the entry is removed first. + // This can happen on delete because the entry is removed first, + // or if HealthCheckImpl has been closed. hc.mu.Unlock() return } @@ -500,14 +496,17 @@ func (hc *HealthCheckImpl) finalizeConn(hcc *healthCheckConn) { hc.updateHealth(hcc.tabletStats.Copy(), nil) if hcc.conn != nil { // Don't use hcc.ctx because it's already closed. - hcc.conn.Close(context.Background()) + // Use a separate context, and add a timeout to prevent unbounded waits. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + hcc.conn.Close(ctx) hcc.conn = nil } } // checkConn performs health checking on the given tablet. func (hc *HealthCheckImpl) checkConn(hcc *healthCheckConn, name string) { - defer hc.wg.Done() + defer hc.connsWG.Done() defer hc.finalizeConn(hcc) // Initial notification for downstream about the tablet existence. @@ -627,11 +626,12 @@ func (hcc *healthCheckConn) stream(ctx context.Context, hc *HealthCheckImpl, cal } if err := hcc.conn.StreamHealth(ctx, callback); err != nil { - hcc.conn.Close(ctx) - hcc.conn = nil hcc.setServingState(false, err.Error()) hcc.tabletStats.LastError = err - hc.updateHealth(hcc.tabletStats.Copy(), hcc.conn) + // Send nil because we intend to close the connection. + hc.updateHealth(hcc.tabletStats.Copy(), nil) + hcc.conn.Close(ctx) + hcc.conn = nil } } @@ -691,7 +691,6 @@ func (hc *HealthCheckImpl) deleteConn(tablet *topodatapb.Tablet) { key := TabletToMapKey(tablet) th, ok := hc.addrToHealth[key] if !ok { - log.Warningf("deleting unknown tablet: %+v", tablet) return } th.latestTabletStats.Up = false @@ -734,6 +733,11 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet, name string) { }, } hc.mu.Lock() + if hc.addrToHealth == nil { + // already closed. + hc.mu.Unlock() + return + } if _, ok := hc.addrToHealth[key]; ok { hc.mu.Unlock() log.Warningf("adding duplicate tablet %v for %v: %+v", name, tablet.Alias.Cell, tablet) @@ -744,9 +748,9 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet, name string) { latestTabletStats: hcc.tabletStats, } hc.initialUpdatesWG.Add(1) + hc.connsWG.Add(1) hc.mu.Unlock() - hc.wg.Add(1) go hc.checkConn(hcc, name) } @@ -906,14 +910,14 @@ func (hc *HealthCheckImpl) Close() error { for _, th := range hc.addrToHealth { th.cancelFunc() } - hc.addrToHealth = make(map[string]*tabletHealth) + hc.addrToHealth = nil // Release the lock early or a pending checkHealthCheckTimeout // cannot get a read lock on it. hc.mu.Unlock() // Wait for the checkHealthCheckTimeout Go routine and each Go // routine per tablet. - hc.wg.Wait() + hc.connsWG.Wait() return nil }