diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 3e91d069..6d4ab17d 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -1025,6 +1025,8 @@ const ( ControllerNumOpenCGExtents // ControllerNumConsumedCGExtents represents the count of consumed cg extents ControllerNumConsumedCGExtents + // ControllerNoActiveZone indicates there's no active zone from dynamic config + ControllerNoActiveZone // -- Replicator metrics -- // @@ -1048,6 +1050,8 @@ const ( // ReplicatorOutConnMsgRead indicates how many messages OutConn read ReplicatorOutConnMsgRead + // ReplicatorReconcileDestRun indicates the reconcile fails + ReplicatorReconcileFail // ReplicatorReconcileDestRun indicates the reconcile for dest runs ReplicatorReconcileDestRun // ReplicatorReconcileDestFail indicates the reconcile for dest fails @@ -1232,6 +1236,7 @@ var metricDefs = map[ServiceIdx]map[int]metricDefinition{ ControllerRetentionJobDuration: {Timer, "controller.retentionmgr.jobduration"}, ControllerGetAddressLatency: {Timer, "controller.retentionmgr.getaddresslatency"}, ControllerPurgeMessagesLatency: {Timer, "controller.retentionmgr.purgemessageslatency"}, + ControllerNoActiveZone: {Gauge, "controller.no-active-zone"}, }, // definitions for Replicator metrics @@ -1245,6 +1250,7 @@ var metricDefs = map[ServiceIdx]map[int]metricDefinition{ ReplicatorInConnMsgWritten: {Counter, "replicator.inconn.msgwritten"}, ReplicatorOutConnCreditsSent: {Counter, "replicator.outconn.creditssent"}, ReplicatorOutConnMsgRead: {Counter, "replicator.outconn.msgread"}, + ReplicatorReconcileFail: {Gauge, "replicator.reconcile.fail"}, ReplicatorReconcileDestRun: {Gauge, "replicator.reconcile.dest.run"}, ReplicatorReconcileDestFail: {Gauge, "replicator.reconcile.dest.fail"}, ReplicatorReconcileDestFoundMissing: {Gauge, "replicator.reconcile.dest.foundmissing"}, diff --git a/services/controllerhost/consumer.go b/services/controllerhost/consumer.go index 543b42c5..d9833563 100644 --- a/services/controllerhost/consumer.go +++ b/services/controllerhost/consumer.go @@ -764,6 +764,7 @@ func refreshOutputHostsForConsGroup(context *Context, var dstType = getDstType(dstDesc) var outputAddrs []string var outputIDs []string + var consumeDisabled bool var outputHosts map[string]*common.HostInfo cgDesc, err := context.mm.ReadConsumerGroup(dstID, "", cgID, "") @@ -782,11 +783,12 @@ func refreshOutputHostsForConsGroup(context *Context, context.resultCache.write(cgID, resultCacheParams{ - dstType: dstType, - nExtents: nConsumable, - maxExtents: maxExtentsToConsume, - hostIDs: outputIDs, - expiry: now + ttl, + dstType: dstType, + nExtents: nConsumable, + maxExtents: maxExtentsToConsume, + hostIDs: outputIDs, + consumeDisabled: consumeDisabled, + expiry: now + ttl, }) } @@ -799,7 +801,8 @@ func refreshOutputHostsForConsGroup(context *Context, } // If we shouldn't consume in this zone(for a multi_zone cg), short circuit and return - if !shouldConsumeInZone(context.localZone, cgDesc, cfg) { + if !shouldConsumeInZone(context, m3Scope, cgDesc, cfg) { + consumeDisabled = true writeToCache(int64(outputCacheTTL)) return outputAddrs, nil } @@ -859,21 +862,24 @@ func refreshOutputHostsForConsGroup(context *Context, // shouldConsumeInZone indicated whether we should consume from this zone for a multi_zone consumer group // If failover mode is enabled in dynamic config, the active zone will be the one specified in dynamic config // Otherwise, use the per cg override if it's specified -// Last, check the active zone in dynamic config. If specified, use it. Otherwise always return true -func shouldConsumeInZone(zone string, cgDesc *shared.ConsumerGroupDescription, dConfig ControllerDynamicConfig) bool { +// Last, check the active zone in dynamic config. If specified, use it. Otherwise always return false +func shouldConsumeInZone(context *Context, m3Scope int, cgDesc *shared.ConsumerGroupDescription, dConfig ControllerDynamicConfig) bool { if strings.EqualFold(dConfig.FailoverMode, `enabled`) { - return strings.EqualFold(zone, dConfig.ActiveZone) + return strings.EqualFold(context.localZone, dConfig.ActiveZone) } - if cgDesc.IsSetActiveZone() { - return strings.EqualFold(zone, cgDesc.GetActiveZone()) + if len(cgDesc.GetActiveZone()) > 0 { + return strings.EqualFold(context.localZone, cgDesc.GetActiveZone()) } if len(dConfig.ActiveZone) > 0 { - return strings.EqualFold(zone, dConfig.ActiveZone) + return strings.EqualFold(context.localZone, dConfig.ActiveZone) } - return true + context.log.Warn(`no active zone from dynamic config !`) + context.m3Client.UpdateGauge(m3Scope, metrics.ControllerNoActiveZone, 1) + + return false } func getControllerDynamicConfig(context *Context) (ControllerDynamicConfig, error) { diff --git a/services/controllerhost/controllerhost.go b/services/controllerhost/controllerhost.go index a4e93ffa..c8e299f9 100644 --- a/services/controllerhost/controllerhost.go +++ b/services/controllerhost/controllerhost.go @@ -384,25 +384,25 @@ func (mcp *Mcp) GetOutputHosts(ctx thrift.Context, inReq *c.GetOutputHostsReques return nil, ErrMalformedUUID } - response := func(outputHostIDs []string, err error) (*c.GetOutputHostsResult_, error) { - if len(outputHostIDs) < 1 { - // only count as failure if our answer contains no endpoints at all + response := func() (*c.GetOutputHostsResult_, error) { + if len(result.cachedResult) < 1 && !result.consumeDisabled { + // only count as failure if our answer contains no endpoints at all and consuming is not disabled context.m3Client.IncCounter(metrics.GetOutputHostsScope, metrics.ControllerFailures) - return nil, err + return nil, ErrUnavailable } - return &c.GetOutputHostsResult_{OutputHostIds: outputHostIDs}, nil + return &c.GetOutputHostsResult_{OutputHostIds: result.cachedResult}, nil } var now = context.timeSource.Now().UnixNano() result = context.resultCache.readOutputHosts(cgUUID, now) if result.cacheHit && !result.refreshCache { - return response(result.cachedResult, ErrUnavailable) + return response() } if !context.dstLock.TryLock(dstUUID, getLockTimeout(result)) { context.m3Client.IncCounter(metrics.GetOutputHostsScope, metrics.ControllerErrTryLockCounter) - return response(result.cachedResult, ErrTryLock) + return response() } // With the lock being held, make sure someone else did not already @@ -410,7 +410,7 @@ func (mcp *Mcp) GetOutputHosts(ctx thrift.Context, inReq *c.GetOutputHostsReques result = context.resultCache.readOutputHosts(cgUUID, now) if result.cacheHit && !result.refreshCache { context.dstLock.Unlock(dstUUID) - return response(result.cachedResult, ErrUnavailable) + return response() } hostIDs, err := refreshOutputHostsForConsGroup(context, dstUUID, cgUUID, *result, now) @@ -420,10 +420,10 @@ func (mcp *Mcp) GetOutputHosts(ctx thrift.Context, inReq *c.GetOutputHostsReques context.m3Client.IncCounter(metrics.GetOutputHostsScope, metrics.ControllerErrBadEntityCounter) return nil, err } - return response(result.cachedResult, &shared.InternalServiceError{Message: err.Error()}) + return response() } - return response(hostIDs, ErrUnavailable) + return &c.GetOutputHostsResult_{OutputHostIds: hostIDs}, nil } // GetQueueDepthInfo to return queue depth backlog infor for consumer group diff --git a/services/controllerhost/resultcache.go b/services/controllerhost/resultcache.go index de730304..834d392e 100644 --- a/services/controllerhost/resultcache.go +++ b/services/controllerhost/resultcache.go @@ -42,22 +42,25 @@ type ( nExtents int maxExtents int dstType - hostIDs []string + hostIDs []string + consumeDisabled bool } resultCacheReadResult struct { - cachedResult []string - cacheHit bool - refreshCache bool + cachedResult []string + consumeDisabled bool + cacheHit bool + refreshCache bool *resultCacheEntry } resultCacheParams struct { - dstType dstType - nExtents int - maxExtents int - hostIDs []string - expiry int64 + dstType dstType + nExtents int + maxExtents int + hostIDs []string + consumeDisabled bool + expiry int64 } ) @@ -108,6 +111,7 @@ func (cache *resultCache) write(key string, write resultCacheParams) { cacheEntry.dstType = write.dstType cacheEntry.expiry = write.expiry cacheEntry.hostIDs = write.hostIDs + cacheEntry.consumeDisabled = write.consumeDisabled cacheEntry.nExtents = write.nExtents cacheEntry.maxExtents = write.maxExtents cache.Put(key, cacheEntry) @@ -135,6 +139,7 @@ func readResultCache(cache *resultCache, key string, svc string, now int64) *res return &resultCacheReadResult{ cachedResult: cachedResult, + consumeDisabled: cacheEntry.consumeDisabled, cacheHit: cacheHit, refreshCache: refreshCache, resultCacheEntry: cacheEntry, @@ -142,6 +147,7 @@ func readResultCache(cache *resultCache, key string, svc string, now int64) *res } return &resultCacheReadResult{ cachedResult: cachedResult, + consumeDisabled: false, cacheHit: cacheHit, refreshCache: refreshCache, resultCacheEntry: &resultCacheEntry{}, diff --git a/services/frontendhost/frontend.go b/services/frontendhost/frontend.go index 08530007..16f881ca 100644 --- a/services/frontendhost/frontend.go +++ b/services/frontendhost/frontend.go @@ -1015,15 +1015,15 @@ func (h *Frontend) ReadConsumerGroupHosts(ctx thrift.Context, readRequest *c.Rea getOutputHostReq := &controller.GetOutputHostsRequest{DestinationUUID: common.StringPtr(mCGDesc.GetDestinationUUID()), ConsumerGroupUUID: common.StringPtr(mCGDesc.GetConsumerGroupUUID())} getOutputHostResp, err := cClient.GetOutputHosts(ctx, getOutputHostReq) - if err != nil || len(getOutputHostResp.GetOutputHostIds()) < 1 { - if err != nil { - lclLg = lclLg.WithField(common.TagErr, err) - } - - lclLg.Error("No hosts returned from controller") + if err != nil { + lclLg.WithField(common.TagErr, err).Error(`error getting hosts from controller`) return nil, err } + if len(getOutputHostResp.GetOutputHostIds()) < 1 { + return c.NewReadConsumerGroupHostsResult_(), nil + } + outputHostIds := getOutputHostResp.GetOutputHostIds() // Build our result diff --git a/services/replicator/metadataReconciler.go b/services/replicator/metadataReconciler.go index eb556065..18b2b381 100644 --- a/services/replicator/metadataReconciler.go +++ b/services/replicator/metadataReconciler.go @@ -119,21 +119,34 @@ func (r *metadataReconciler) run() { return } + // Get the local destination and cgs + localDests, err := r.getAllMultiZoneDestInLocalZone() + if err != nil { + r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileFail, 1) + return + } + localCgs, err := r.getAllMultiZoneCgInLocalZone(localDests) + if err != nil { + r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileFail, 1) + return + } + // destination/cg metadata reconciliation is only needed if this is a non-authoritative zone - var localCgs []*shared.ConsumerGroupDescription if r.localZone != r.replicator.getAuthoritativeZone() { r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileDestRun, 1) - localDests, remoteDests, err2 := r.reconcileDestMetadata() - if err2 != nil { + authoritativeZoneDests, err := r.getAllMultiZoneDestInAuthoritativeZone() + if err != nil { r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileDestFail, 1) - return - } + } else { + r.reconcileDest(localDests, authoritativeZoneDests) - r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileCgRun, 1) - localCgs, err = r.reconcileCgMetadata(localDests, remoteDests) - if err != nil { - r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileCgFail, 1) - return + r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileCgRun, 1) + authoritativeZoneCgs, err := r.getAllMultiZoneCgInAuthoritativeZone(authoritativeZoneDests) + if err != nil { + r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileCgFail, 1) + } else { + r.reconcileCg(localCgs, authoritativeZoneCgs) + } } } @@ -154,36 +167,6 @@ func (r *metadataReconciler) run() { atomic.StoreInt64(&r.running, 0) } -func (r *metadataReconciler) reconcileDestMetadata() ([]*shared.DestinationDescription, []*shared.DestinationDescription, error) { - localDests, err := r.getAllMultiZoneDestInLocalZone() - if err != nil { - return nil, nil, err - } - - remoteDests, err := r.getAllMultiZoneDestInAuthoritativeZone() - if err != nil { - return nil, nil, err - } - - r.reconcileDest(localDests, remoteDests) - return localDests, remoteDests, nil -} - -func (r *metadataReconciler) reconcileCgMetadata(localDests []*shared.DestinationDescription, remoteDests []*shared.DestinationDescription) ([]*shared.ConsumerGroupDescription, error) { - localCgs, err := r.getAllMultiZoneCgInLocalZone(localDests) - if err != nil { - return nil, err - } - - remoteCgs, err := r.getAllMultiZoneCgInAuthoritativeZone(remoteDests) - if err != nil { - return nil, err - } - - r.reconcileCg(localCgs, remoteCgs) - return localCgs, nil -} - func (r *metadataReconciler) reconcileDest(localDests []*shared.DestinationDescription, remoteDests []*shared.DestinationDescription) { var replicatorReconcileDestFoundMissingCount int64 localDestsSet := make(map[string]*shared.DestinationDescription, len(localDests)) diff --git a/services/replicator/replicator.go b/services/replicator/replicator.go index 2036eff1..9b637d32 100644 --- a/services/replicator/replicator.go +++ b/services/replicator/replicator.go @@ -1064,7 +1064,6 @@ func (r *Replicator) SetAckOffset(ctx thrift.Context, request *shared.SetAckOffs return err } - lcllg.Info(`Ack offset updated in metadata`) return nil }