Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Bug fixes of multi_zone consumer group #170

Merged
merged 5 commits into from
May 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,8 @@ const (
ControllerNumOpenCGExtents
// ControllerNumConsumedCGExtents represents the count of consumed cg extents
ControllerNumConsumedCGExtents
// ControllerNoActiveZone indicates there's no active zone from dynamic config
ControllerNoActiveZone

// -- Replicator metrics -- //

Expand All @@ -1048,6 +1050,8 @@ const (
// ReplicatorOutConnMsgRead indicates how many messages OutConn read
ReplicatorOutConnMsgRead

// ReplicatorReconcileDestRun indicates the reconcile fails
ReplicatorReconcileFail
// ReplicatorReconcileDestRun indicates the reconcile for dest runs
ReplicatorReconcileDestRun
// ReplicatorReconcileDestFail indicates the reconcile for dest fails
Expand Down Expand Up @@ -1232,6 +1236,7 @@ var metricDefs = map[ServiceIdx]map[int]metricDefinition{
ControllerRetentionJobDuration: {Timer, "controller.retentionmgr.jobduration"},
ControllerGetAddressLatency: {Timer, "controller.retentionmgr.getaddresslatency"},
ControllerPurgeMessagesLatency: {Timer, "controller.retentionmgr.purgemessageslatency"},
ControllerNoActiveZone: {Gauge, "controller.no-active-zone"},
},

// definitions for Replicator metrics
Expand All @@ -1245,6 +1250,7 @@ var metricDefs = map[ServiceIdx]map[int]metricDefinition{
ReplicatorInConnMsgWritten: {Counter, "replicator.inconn.msgwritten"},
ReplicatorOutConnCreditsSent: {Counter, "replicator.outconn.creditssent"},
ReplicatorOutConnMsgRead: {Counter, "replicator.outconn.msgread"},
ReplicatorReconcileFail: {Gauge, "replicator.reconcile.fail"},
ReplicatorReconcileDestRun: {Gauge, "replicator.reconcile.dest.run"},
ReplicatorReconcileDestFail: {Gauge, "replicator.reconcile.dest.fail"},
ReplicatorReconcileDestFoundMissing: {Gauge, "replicator.reconcile.dest.foundmissing"},
Expand Down
32 changes: 19 additions & 13 deletions services/controllerhost/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,7 @@ func refreshOutputHostsForConsGroup(context *Context,
var dstType = getDstType(dstDesc)
var outputAddrs []string
var outputIDs []string
var consumeDisabled bool
var outputHosts map[string]*common.HostInfo

cgDesc, err := context.mm.ReadConsumerGroup(dstID, "", cgID, "")
Expand All @@ -782,11 +783,12 @@ func refreshOutputHostsForConsGroup(context *Context,

context.resultCache.write(cgID,
resultCacheParams{
dstType: dstType,
nExtents: nConsumable,
maxExtents: maxExtentsToConsume,
hostIDs: outputIDs,
expiry: now + ttl,
dstType: dstType,
nExtents: nConsumable,
maxExtents: maxExtentsToConsume,
hostIDs: outputIDs,
consumeDisabled: consumeDisabled,
expiry: now + ttl,
})
}

Expand All @@ -799,7 +801,8 @@ func refreshOutputHostsForConsGroup(context *Context,
}

// If we shouldn't consume in this zone(for a multi_zone cg), short circuit and return
if !shouldConsumeInZone(context.localZone, cgDesc, cfg) {
if !shouldConsumeInZone(context, m3Scope, cgDesc, cfg) {
consumeDisabled = true
writeToCache(int64(outputCacheTTL))
return outputAddrs, nil
}
Expand Down Expand Up @@ -859,21 +862,24 @@ func refreshOutputHostsForConsGroup(context *Context,
// shouldConsumeInZone indicated whether we should consume from this zone for a multi_zone consumer group
// If failover mode is enabled in dynamic config, the active zone will be the one specified in dynamic config
// Otherwise, use the per cg override if it's specified
// Last, check the active zone in dynamic config. If specified, use it. Otherwise always return true
func shouldConsumeInZone(zone string, cgDesc *shared.ConsumerGroupDescription, dConfig ControllerDynamicConfig) bool {
// Last, check the active zone in dynamic config. If specified, use it. Otherwise always return false
func shouldConsumeInZone(context *Context, m3Scope int, cgDesc *shared.ConsumerGroupDescription, dConfig ControllerDynamicConfig) bool {
if strings.EqualFold(dConfig.FailoverMode, `enabled`) {
return strings.EqualFold(zone, dConfig.ActiveZone)
return strings.EqualFold(context.localZone, dConfig.ActiveZone)
}

if cgDesc.IsSetActiveZone() {
return strings.EqualFold(zone, cgDesc.GetActiveZone())
if len(cgDesc.GetActiveZone()) > 0 {
return strings.EqualFold(context.localZone, cgDesc.GetActiveZone())
}

if len(dConfig.ActiveZone) > 0 {
return strings.EqualFold(zone, dConfig.ActiveZone)
return strings.EqualFold(context.localZone, dConfig.ActiveZone)
}

return true
context.log.Warn(`no active zone from dynamic config !`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be a good idea to emit a metric here

context.m3Client.UpdateGauge(m3Scope, metrics.ControllerNoActiveZone, 1)

return false
}

func getControllerDynamicConfig(context *Context) (ControllerDynamicConfig, error) {
Expand Down
20 changes: 10 additions & 10 deletions services/controllerhost/controllerhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,33 +384,33 @@ func (mcp *Mcp) GetOutputHosts(ctx thrift.Context, inReq *c.GetOutputHostsReques
return nil, ErrMalformedUUID
}

response := func(outputHostIDs []string, err error) (*c.GetOutputHostsResult_, error) {
if len(outputHostIDs) < 1 {
// only count as failure if our answer contains no endpoints at all
response := func() (*c.GetOutputHostsResult_, error) {
if len(result.cachedResult) < 1 && !result.consumeDisabled {
// only count as failure if our answer contains no endpoints at all and consuming is not disabled
context.m3Client.IncCounter(metrics.GetOutputHostsScope, metrics.ControllerFailures)
return nil, err
return nil, ErrUnavailable
}
return &c.GetOutputHostsResult_{OutputHostIds: outputHostIDs}, nil
return &c.GetOutputHostsResult_{OutputHostIds: result.cachedResult}, nil
}

var now = context.timeSource.Now().UnixNano()

result = context.resultCache.readOutputHosts(cgUUID, now)
if result.cacheHit && !result.refreshCache {
return response(result.cachedResult, ErrUnavailable)
return response()
}

if !context.dstLock.TryLock(dstUUID, getLockTimeout(result)) {
context.m3Client.IncCounter(metrics.GetOutputHostsScope, metrics.ControllerErrTryLockCounter)
return response(result.cachedResult, ErrTryLock)
return response()
}

// With the lock being held, make sure someone else did not already
// refresh the cache in the mean time
result = context.resultCache.readOutputHosts(cgUUID, now)
if result.cacheHit && !result.refreshCache {
context.dstLock.Unlock(dstUUID)
return response(result.cachedResult, ErrUnavailable)
return response()
}

hostIDs, err := refreshOutputHostsForConsGroup(context, dstUUID, cgUUID, *result, now)
Expand All @@ -420,10 +420,10 @@ func (mcp *Mcp) GetOutputHosts(ctx thrift.Context, inReq *c.GetOutputHostsReques
context.m3Client.IncCounter(metrics.GetOutputHostsScope, metrics.ControllerErrBadEntityCounter)
return nil, err
}
return response(result.cachedResult, &shared.InternalServiceError{Message: err.Error()})
return response()
}

return response(hostIDs, ErrUnavailable)
return &c.GetOutputHostsResult_{OutputHostIds: hostIDs}, nil
}

// GetQueueDepthInfo to return queue depth backlog infor for consumer group
Expand Down
24 changes: 15 additions & 9 deletions services/controllerhost/resultcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,25 @@ type (
nExtents int
maxExtents int
dstType
hostIDs []string
hostIDs []string
consumeDisabled bool
}

resultCacheReadResult struct {
cachedResult []string
cacheHit bool
refreshCache bool
cachedResult []string
consumeDisabled bool
cacheHit bool
refreshCache bool
*resultCacheEntry
}

resultCacheParams struct {
dstType dstType
nExtents int
maxExtents int
hostIDs []string
expiry int64
dstType dstType
nExtents int
maxExtents int
hostIDs []string
consumeDisabled bool
expiry int64
}
)

Expand Down Expand Up @@ -108,6 +111,7 @@ func (cache *resultCache) write(key string, write resultCacheParams) {
cacheEntry.dstType = write.dstType
cacheEntry.expiry = write.expiry
cacheEntry.hostIDs = write.hostIDs
cacheEntry.consumeDisabled = write.consumeDisabled
cacheEntry.nExtents = write.nExtents
cacheEntry.maxExtents = write.maxExtents
cache.Put(key, cacheEntry)
Expand Down Expand Up @@ -135,13 +139,15 @@ func readResultCache(cache *resultCache, key string, svc string, now int64) *res

return &resultCacheReadResult{
cachedResult: cachedResult,
consumeDisabled: cacheEntry.consumeDisabled,
cacheHit: cacheHit,
refreshCache: refreshCache,
resultCacheEntry: cacheEntry,
}
}
return &resultCacheReadResult{
cachedResult: cachedResult,
consumeDisabled: false,
cacheHit: cacheHit,
refreshCache: refreshCache,
resultCacheEntry: &resultCacheEntry{},
Expand Down
12 changes: 6 additions & 6 deletions services/frontendhost/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,15 +1015,15 @@ func (h *Frontend) ReadConsumerGroupHosts(ctx thrift.Context, readRequest *c.Rea

getOutputHostReq := &controller.GetOutputHostsRequest{DestinationUUID: common.StringPtr(mCGDesc.GetDestinationUUID()), ConsumerGroupUUID: common.StringPtr(mCGDesc.GetConsumerGroupUUID())}
getOutputHostResp, err := cClient.GetOutputHosts(ctx, getOutputHostReq)
if err != nil || len(getOutputHostResp.GetOutputHostIds()) < 1 {
if err != nil {
lclLg = lclLg.WithField(common.TagErr, err)
}

lclLg.Error("No hosts returned from controller")
if err != nil {
lclLg.WithField(common.TagErr, err).Error(`error getting hosts from controller`)
return nil, err
}

if len(getOutputHostResp.GetOutputHostIds()) < 1 {
return c.NewReadConsumerGroupHostsResult_(), nil
}

outputHostIds := getOutputHostResp.GetOutputHostIds()

// Build our result
Expand Down
63 changes: 23 additions & 40 deletions services/replicator/metadataReconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,34 @@ func (r *metadataReconciler) run() {
return
}

// Get the local destination and cgs
localDests, err := r.getAllMultiZoneDestInLocalZone()
if err != nil {
r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileFail, 1)
return
}
localCgs, err := r.getAllMultiZoneCgInLocalZone(localDests)
if err != nil {
r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileFail, 1)
return
}

// destination/cg metadata reconciliation is only needed if this is a non-authoritative zone
var localCgs []*shared.ConsumerGroupDescription
if r.localZone != r.replicator.getAuthoritativeZone() {
r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileDestRun, 1)
localDests, remoteDests, err2 := r.reconcileDestMetadata()
if err2 != nil {
authoritativeZoneDests, err := r.getAllMultiZoneDestInAuthoritativeZone()
if err != nil {
r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileDestFail, 1)
return
}
} else {
r.reconcileDest(localDests, authoritativeZoneDests)

r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileCgRun, 1)
localCgs, err = r.reconcileCgMetadata(localDests, remoteDests)
if err != nil {
r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileCgFail, 1)
return
r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileCgRun, 1)
authoritativeZoneCgs, err := r.getAllMultiZoneCgInAuthoritativeZone(authoritativeZoneDests)
if err != nil {
r.m3Client.UpdateGauge(metrics.ReplicatorReconcileScope, metrics.ReplicatorReconcileCgFail, 1)
} else {
r.reconcileCg(localCgs, authoritativeZoneCgs)
}
}
}

Expand All @@ -154,36 +167,6 @@ func (r *metadataReconciler) run() {
atomic.StoreInt64(&r.running, 0)
}

func (r *metadataReconciler) reconcileDestMetadata() ([]*shared.DestinationDescription, []*shared.DestinationDescription, error) {
localDests, err := r.getAllMultiZoneDestInLocalZone()
if err != nil {
return nil, nil, err
}

remoteDests, err := r.getAllMultiZoneDestInAuthoritativeZone()
if err != nil {
return nil, nil, err
}

r.reconcileDest(localDests, remoteDests)
return localDests, remoteDests, nil
}

func (r *metadataReconciler) reconcileCgMetadata(localDests []*shared.DestinationDescription, remoteDests []*shared.DestinationDescription) ([]*shared.ConsumerGroupDescription, error) {
localCgs, err := r.getAllMultiZoneCgInLocalZone(localDests)
if err != nil {
return nil, err
}

remoteCgs, err := r.getAllMultiZoneCgInAuthoritativeZone(remoteDests)
if err != nil {
return nil, err
}

r.reconcileCg(localCgs, remoteCgs)
return localCgs, nil
}

func (r *metadataReconciler) reconcileDest(localDests []*shared.DestinationDescription, remoteDests []*shared.DestinationDescription) {
var replicatorReconcileDestFoundMissingCount int64
localDestsSet := make(map[string]*shared.DestinationDescription, len(localDests))
Expand Down
1 change: 0 additions & 1 deletion services/replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,6 @@ func (r *Replicator) SetAckOffset(ctx thrift.Context, request *shared.SetAckOffs
return err
}

lcllg.Info(`Ack offset updated in metadata`)
return nil
}

Expand Down