Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Bug fixes of multi_zone consumer group (#170)
Browse files Browse the repository at this point in the history
* fix two multi_zone cg bugs

* more fix

* address comments, also fix another bug in replicator reconcilation

* remove noisy log

* add gauge metrics when no active zone is not found
  • Loading branch information
datoug authored May 1, 2017
1 parent 226fb2c commit ba68fc2
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 79 deletions.
6 changes: 6 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,8 @@ const (
ControllerNumOpenCGExtents
// ControllerNumConsumedCGExtents represents the count of consumed cg extents
ControllerNumConsumedCGExtents
// ControllerNoActiveZone indicates there's no active zone from dynamic config
ControllerNoActiveZone

// -- Replicator metrics -- //

Expand All @@ -1048,6 +1050,8 @@ const (
// ReplicatorOutConnMsgRead indicates how many messages OutConn read
ReplicatorOutConnMsgRead

// ReplicatorReconcileDestRun indicates the reconcile fails
ReplicatorReconcileFail
// ReplicatorReconcileDestRun indicates the reconcile for dest runs
ReplicatorReconcileDestRun
// ReplicatorReconcileDestFail indicates the reconcile for dest fails
Expand Down Expand Up @@ -1232,6 +1236,7 @@ var metricDefs = map[ServiceIdx]map[int]metricDefinition{
ControllerRetentionJobDuration: {Timer, "controller.retentionmgr.jobduration"},
ControllerGetAddressLatency: {Timer, "controller.retentionmgr.getaddresslatency"},
ControllerPurgeMessagesLatency: {Timer, "controller.retentionmgr.purgemessageslatency"},
ControllerNoActiveZone: {Gauge, "controller.no-active-zone"},
},

// definitions for Replicator metrics
Expand All @@ -1245,6 +1250,7 @@ var metricDefs = map[ServiceIdx]map[int]metricDefinition{
ReplicatorInConnMsgWritten: {Counter, "replicator.inconn.msgwritten"},
ReplicatorOutConnCreditsSent: {Counter, "replicator.outconn.creditssent"},
ReplicatorOutConnMsgRead: {Counter, "replicator.outconn.msgread"},
ReplicatorReconcileFail: {Gauge, "replicator.reconcile.fail"},
ReplicatorReconcileDestRun: {Gauge, "replicator.reconcile.dest.run"},
ReplicatorReconcileDestFail: {Gauge, "replicator.reconcile.dest.fail"},
ReplicatorReconcileDestFoundMissing: {Gauge, "replicator.reconcile.dest.foundmissing"},
Expand Down
32 changes: 19 additions & 13 deletions services/controllerhost/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,7 @@ func refreshOutputHostsForConsGroup(context *Context,
var dstType = getDstType(dstDesc)
var outputAddrs []string
var outputIDs []string
var consumeDisabled bool
var outputHosts map[string]*common.HostInfo

cgDesc, err := context.mm.ReadConsumerGroup(dstID, "", cgID, "")
Expand All @@ -782,11 +783,12 @@ func refreshOutputHostsForConsGroup(context *Context,

context.resultCache.write(cgID,
resultCacheParams{
dstType: dstType,
nExtents: nConsumable,
maxExtents: maxExtentsToConsume,
hostIDs: outputIDs,
expiry: now + ttl,
dstType: dstType,
nExtents: nConsumable,
maxExtents: maxExtentsToConsume,
hostIDs: outputIDs,
consumeDisabled: consumeDisabled,
expiry: now + ttl,
})
}

Expand All @@ -799,7 +801,8 @@ func refreshOutputHostsForConsGroup(context *Context,
}

// If we shouldn't consume in this zone(for a multi_zone cg), short circuit and return
if !shouldConsumeInZone(context.localZone, cgDesc, cfg) {
if !shouldConsumeInZone(context, m3Scope, cgDesc, cfg) {
consumeDisabled = true
writeToCache(int64(outputCacheTTL))
return outputAddrs, nil
}
Expand Down Expand Up @@ -859,21 +862,24 @@ func refreshOutputHostsForConsGroup(context *Context,
// shouldConsumeInZone indicated whether we should consume from this zone for a multi_zone consumer group
// If failover mode is enabled in dynamic config, the active zone will be the one specified in dynamic config
// Otherwise, use the per cg override if it's specified
// Last, check the active zone in dynamic config. If specified, use it. Otherwise always return true
func shouldConsumeInZone(zone string, cgDesc *shared.ConsumerGroupDescription, dConfig ControllerDynamicConfig) bool {
// Last, check the active zone in dynamic config. If specified, use it. Otherwise always return false
func shouldConsumeInZone(context *Context, m3Scope int, cgDesc *shared.ConsumerGroupDescription, dConfig ControllerDynamicConfig) bool {
if strings.EqualFold(dConfig.FailoverMode, `enabled`) {
return strings.EqualFold(zone, dConfig.ActiveZone)
return strings.EqualFold(context.localZone, dConfig.ActiveZone)
}

if cgDesc.IsSetActiveZone() {
return strings.EqualFold(zone, cgDesc.GetActiveZone())
if len(cgDesc.GetActiveZone()) > 0 {
return strings.EqualFold(context.localZone, cgDesc.GetActiveZone())
}

if len(dConfig.ActiveZone) > 0 {
return strings.EqualFold(zone, dConfig.ActiveZone)
return strings.EqualFold(context.localZone, dConfig.ActiveZone)
}

return true
context.log.Warn(`no active zone from dynamic config !`)
context.m3Client.UpdateGauge(m3Scope, metrics.ControllerNoActiveZone, 1)

return false
}

func getControllerDynamicConfig(context *Context) (ControllerDynamicConfig, error) {
Expand Down
20 changes: 10 additions & 10 deletions services/controllerhost/controllerhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,33 +384,33 @@ func (mcp *Mcp) GetOutputHosts(ctx thrift.Context, inReq *c.GetOutputHostsReques
return nil, ErrMalformedUUID
}

response := func(outputHostIDs []string, err error) (*c.GetOutputHostsResult_, error) {
if len(outputHostIDs) < 1 {
// only count as failure if our answer contains no endpoints at all
response := func() (*c.GetOutputHostsResult_, error) {
if len(result.cachedResult) < 1 && !result.consumeDisabled {
// only count as failure if our answer contains no endpoints at all and consuming is not disabled
context.m3Client.IncCounter(metrics.GetOutputHostsScope, metrics.ControllerFailures)
return nil, err
return nil, ErrUnavailable
}
return &c.GetOutputHostsResult_{OutputHostIds: outputHostIDs}, nil
return &c.GetOutputHostsResult_{OutputHostIds: result.cachedResult}, nil
}

var now = context.timeSource.Now().UnixNano()

result = context.resultCache.readOutputHosts(cgUUID, now)
if result.cacheHit && !result.refreshCache {
return response(result.cachedResult, ErrUnavailable)
return response()
}

if !context.dstLock.TryLock(dstUUID, getLockTimeout(result)) {
context.m3Client.IncCounter(metrics.GetOutputHostsScope, metrics.ControllerErrTryLockCounter)
return response(result.cachedResult, ErrTryLock)
return response()
}

// With the lock being held, make sure someone else did not already
// refresh the cache in the mean time
result = context.resultCache.readOutputHosts(cgUUID, now)
if result.cacheHit && !result.refreshCache {
context.dstLock.Unlock(dstUUID)
return response(result.cachedResult, ErrUnavailable)
return response()
}

hostIDs, err := refreshOutputHostsForConsGroup(context, dstUUID, cgUUID, *result, now)
Expand All @@ -420,10 +420,10 @@ func (mcp *Mcp) GetOutputHosts(ctx thrift.Context, inReq *c.GetOutputHostsReques
context.m3Client.IncCounter(metrics.GetOutputHostsScope, metrics.ControllerErrBadEntityCounter)
return nil, err
}
return response(result.cachedResult, &shared.InternalServiceError{Message: err.Error()})
return response()
}

return response(hostIDs, ErrUnavailable)
return &c.GetOutputHostsResult_{OutputHostIds: hostIDs}, nil
}

// GetQueueDepthInfo to return queue depth backlog infor for consumer group
Expand Down
24 changes: 15 additions & 9 deletions services/controllerhost/resultcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,25 @@ type (
nExtents int
maxExtents int
dstType
hostIDs []string
hostIDs []string
consumeDisabled bool
}

resultCacheReadResult struct {
cachedResult []string
cacheHit bool
refreshCache bool
cachedResult []string
consumeDisabled bool
cacheHit bool
refreshCache bool
*resultCacheEntry
}

resultCacheParams struct {
dstType dstType
nExtents int
maxExtents int
hostIDs []string
expiry int64
dstType dstType
nExtents int
maxExtents int
hostIDs []string
consumeDisabled bool
expiry int64
}
)

Expand Down Expand Up @@ -108,6 +111,7 @@ func (cache *resultCache) write(key string, write resultCacheParams) {
cacheEntry.dstType = write.dstType
cacheEntry.expiry = write.expiry
cacheEntry.hostIDs = write.hostIDs
cacheEntry.consumeDisabled = write.consumeDisabled
cacheEntry.nExtents = write.nExtents
cacheEntry.maxExtents = write.maxExtents
cache.Put(key, cacheEntry)
Expand Down Expand Up @@ -135,13 +139,15 @@ func readResultCache(cache *resultCache, key string, svc string, now int64) *res

return &resultCacheReadResult{
cachedResult: cachedResult,
consumeDisabled: cacheEntry.consumeDisabled,
cacheHit: cacheHit,
refreshCache: refreshCache,
resultCacheEntry: cacheEntry,
}
}
return &resultCacheReadResult{
cachedResult: cachedResult,
consumeDisabled: false,
cacheHit: cacheHit,
refreshCache: refreshCache,
resultCacheEntry: &resultCacheEntry{},
Expand Down
12 changes: 6 additions & 6 deletions services/frontendhost/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,15 +1029,15 @@ func (h *Frontend) ReadConsumerGroupHosts(ctx thrift.Context, readRequest *c.Rea

getOutputHostReq := &controller.GetOutputHostsRequest{DestinationUUID: common.StringPtr(mCGDesc.GetDestinationUUID()), ConsumerGroupUUID: common.StringPtr(mCGDesc.GetConsumerGroupUUID())}
getOutputHostResp, err := cClient.GetOutputHosts(ctx, getOutputHostReq)
if err != nil || len(getOutputHostResp.GetOutputHostIds()) < 1 {
if err != nil {
lclLg = lclLg.WithField(common.TagErr, err)
}

lclLg.Error("No hosts returned from controller")
if err != nil {
lclLg.WithField(common.TagErr, err).Error(`error getting hosts from controller`)
return nil, err
}

if len(getOutputHostResp.GetOutputHostIds()) < 1 {
return c.NewReadConsumerGroupHostsResult_(), nil
}

outputHostIds := getOutputHostResp.GetOutputHostIds()

// Build our result
Expand Down
63 changes: 23 additions & 40 deletions services/replicator/metadataReconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,34 @@ func (r *metadataReconciler) run() {
return
}

// Get the local destination and cgs
localDests, err := r.getAllMultiZoneDestInLocalZone()
if err != nil {
r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileFail, 1)
return
}
localCgs, err := r.getAllMultiZoneCgInLocalZone(localDests)
if err != nil {
r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileFail, 1)
return
}

// destination/cg metadata reconciliation is only needed if this is a non-authoritative zone
var localCgs []*shared.ConsumerGroupDescription
if r.localZone != r.replicator.getAuthoritativeZone() {
r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileDestRun, 1)
localDests, remoteDests, err2 := r.reconcileDestMetadata()
if err2 != nil {
authoritativeZoneDests, err := r.getAllMultiZoneDestInAuthoritativeZone()
if err != nil {
r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileDestFail, 1)
return
}
} else {
r.reconcileDest(localDests, authoritativeZoneDests)

r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileCgRun, 1)
localCgs, err = r.reconcileCgMetadata(localDests, remoteDests)
if err != nil {
r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileCgFail, 1)
return
r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileCgRun, 1)
authoritativeZoneCgs, err := r.getAllMultiZoneCgInAuthoritativeZone(authoritativeZoneDests)
if err != nil {
r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileCgFail, 1)
} else {
r.reconcileCg(localCgs, authoritativeZoneCgs)
}
}
}

Expand All @@ -154,36 +167,6 @@ func (r *metadataReconciler) run() {
atomic.StoreInt64(&r.running, 0)
}

func (r *metadataReconciler) reconcileDestMetadata() ([]*shared.DestinationDescription, []*shared.DestinationDescription, error) {
localDests, err := r.getAllMultiZoneDestInLocalZone()
if err != nil {
return nil, nil, err
}

remoteDests, err := r.getAllMultiZoneDestInAuthoritativeZone()
if err != nil {
return nil, nil, err
}

r.reconcileDest(localDests, remoteDests)
return localDests, remoteDests, nil
}

func (r *metadataReconciler) reconcileCgMetadata(localDests []*shared.DestinationDescription, remoteDests []*shared.DestinationDescription) ([]*shared.ConsumerGroupDescription, error) {
localCgs, err := r.getAllMultiZoneCgInLocalZone(localDests)
if err != nil {
return nil, err
}

remoteCgs, err := r.getAllMultiZoneCgInAuthoritativeZone(remoteDests)
if err != nil {
return nil, err
}

r.reconcileCg(localCgs, remoteCgs)
return localCgs, nil
}

func (r *metadataReconciler) reconcileDest(localDests []*shared.DestinationDescription, remoteDests []*shared.DestinationDescription) {
var replicatorReconcileDestFoundMissingCount int64
localDestsSet := make(map[string]*shared.DestinationDescription, len(localDests))
Expand Down
1 change: 0 additions & 1 deletion services/replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,6 @@ func (r *Replicator) SetAckOffset(ctx thrift.Context, request *shared.SetAckOffs
return err
}

lcllg.Info(`Ack offset updated in metadata`)
return nil
}

Expand Down

0 comments on commit ba68fc2

Please sign in to comment.