Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
return empty output host lists if current zone is not active zone for…
Browse files Browse the repository at this point in the history
… multi_zone cg (#139)

* [WIP] multi_zone consumer group active zone switch

* code checkpointing

* limit the multi_zone config to controller only

* add dummy config updater

* minor update

* address comments
  • Loading branch information
datoug authored Apr 12, 2017
1 parent 6d95e65 commit a3d3d4a
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 28 deletions.
2 changes: 1 addition & 1 deletion cmd/servicecmd/servicestartcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func StartControllerService() {
reporter := common.NewMetricReporterWithHostname(cfg.GetServiceConfig(serviceName))
dClient := dconfigclient.NewDconfigClient(cfg.GetServiceConfig(serviceName), serviceName)
sVice := common.NewService(serviceName, uuid.New(), cfg.GetServiceConfig(serviceName), common.NewUUIDResolver(meta), hwInfoReader, reporter, dClient)
mcp, tc := controllerhost.NewController(cfg, sVice, meta)
mcp, tc := controllerhost.NewController(cfg, sVice, meta, common.NewDummyZoneFailoverManager())
mcp.Start(tc)
common.ServiceLoop(cfg.GetServiceConfig(serviceName).GetPort()+diagnosticPortOffset, cfg, mcp.Service)
}
Expand Down
24 changes: 24 additions & 0 deletions common/zonefailovermanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package common

// ZoneFailoverManager is a daemon that can be used to manage the zone failover behavior.
type (
ZoneFailoverManager interface {
Daemon
}

dummyZoneFailoverManager struct {
}
)

// NewDummyZoneFailoverManager creates a dummy zone failover manager
func NewDummyZoneFailoverManager() ZoneFailoverManager {
return &dummyZoneFailoverManager{}
}

func (d *dummyZoneFailoverManager) Start() {
return
}

func (d *dummyZoneFailoverManager) Stop() {
return
}
6 changes: 6 additions & 0 deletions services/controllerhost/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,16 @@ type (
AdminStatus string `name:"adminStatus" default:"enabled"`
}

// ControllerDynamicConfig contains the config
// parameters needed for controller
ControllerDynamicConfig struct {
NumPublisherExtentsByPath []string `name:"numPublisherExtentsByPath" default:"/=4"`
NumConsumerExtentsByPath []string `name:"numConsumerExtentsByPath" default:"/=8"`
NumRemoteConsumerExtentsByPath []string `name:"numRemoteConsumerExtentsByPath" default:"/=4"`

// configs for multi_zone consumer group
ActiveZone string `name:"activeZone" default:""`
FailoverMode string `name:"failoverMode" default:"disabled"`
}
)

Expand Down
55 changes: 53 additions & 2 deletions services/controllerhost/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package controllerhost

import (
"strings"
"time"

"github.com/uber-common/bark"
Expand Down Expand Up @@ -628,6 +629,7 @@ func refreshOutputHostsForConsGroup(context *Context,
}

var nConsumable int
var maxExtentsToConsume int
var dstType = getDstType(dstDesc)
var outputAddrs []string
var outputIDs []string
Expand All @@ -643,8 +645,6 @@ func refreshOutputHostsForConsGroup(context *Context,
return nil, err
}

var maxExtentsToConsume = maxExtentsToConsumeForDst(context, dstDesc.GetPath(), cgDesc.GetConsumerGroupName(), dstType, dstDesc.GetZoneConfigs())

writeToCache := func(ttl int64) {

outputIDs, outputAddrs = hostInfoMapToSlice(outputHosts)
Expand All @@ -659,6 +659,23 @@ func refreshOutputHostsForConsGroup(context *Context,
})
}

if cgDesc.GetIsMultiZone() {
cfg, err := getControllerDynamicConfig(context)
if err != nil {
context.m3Client.IncCounter(m3Scope, metrics.ControllerErrMetadataReadCounter)
context.m3Client.IncCounter(m3Scope, metrics.ControllerFailures)
return nil, err
}

// If we shouldn't consume in this zone(for a multi_zone cg), short circuit and return
if !shouldConsumeInZone(context.localZone, cgDesc, cfg) {
writeToCache(int64(outputCacheTTL))
return outputAddrs, nil
}
}

maxExtentsToConsume = maxExtentsToConsumeForDst(context, dstDesc.GetPath(), cgDesc.GetConsumerGroupName(), dstType, dstDesc.GetZoneConfigs())

cgExtents, outputHosts, err := fetchClassifyOpenCGExtents(context, dstID, cgID, m3Scope)
if err != nil {
context.m3Client.IncCounter(m3Scope, metrics.ControllerFailures)
Expand Down Expand Up @@ -707,3 +724,37 @@ func refreshOutputHostsForConsGroup(context *Context,
writeToCache(failBackoffInterval)
return outputAddrs, err
}

// shouldConsumeInZone indicated whether we should consume from this zone for a multi_zone consumer group
// If failover mode is enabled in dynamic config, the active zone will be the one specified in dynamic config
// Otherwise, use the per cg override if it's specified
// Last, check the active zone in dynamic config. If specified, use it. Otherwise always return true
func shouldConsumeInZone(zone string, cgDesc *shared.ConsumerGroupDescription, dConfig ControllerDynamicConfig) bool {
if strings.EqualFold(dConfig.FailoverMode, `enabled`) {
return strings.EqualFold(zone, dConfig.ActiveZone)
}

if cgDesc.IsSetActiveZone() {
return strings.EqualFold(zone, cgDesc.GetActiveZone())
}

if len(dConfig.ActiveZone) > 0 {
return strings.EqualFold(zone, dConfig.ActiveZone)
}

return true
}

func getControllerDynamicConfig(context *Context) (ControllerDynamicConfig, error) {
cfgObj, err := context.cfgMgr.Get(common.ControllerServiceName, "*", "*", "*")
if err != nil {
return ControllerDynamicConfig{}, err
}

cfg, ok := cfgObj.(ControllerDynamicConfig)
if !ok {
context.log.Fatal("Unexpected type mismatch, cfgObj.(ControllerDynamicConfig) failed !")
}

return cfg, nil
}
47 changes: 26 additions & 21 deletions services/controllerhost/controllerhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,26 +100,27 @@ type (
}
// Context holds the run-time context for controller
Context struct {
hostID string
localZone string
mm MetadataMgr
rpm common.RingpopMonitor
failureDetector Dfdd
log bark.Logger
dstLock LockMgr
eventPipeline EventPipeline
resultCache *resultCache
extentMonitor *extentStateMonitor
timeSource common.TimeSource
channel *tchannel.Channel
clientFactory common.ClientFactory
retMgr *retMgrRunner
appConfig configure.CommonAppConfig
m3Client metrics.Client
cfgMgr dconfig.ConfigManager
loadMetrics load.MetricsAggregator
placement Placement
extentSeals struct {
hostID string
localZone string
mm MetadataMgr
rpm common.RingpopMonitor
zoneFailoverManager common.ZoneFailoverManager
failureDetector Dfdd
log bark.Logger
dstLock LockMgr
eventPipeline EventPipeline
resultCache *resultCache
extentMonitor *extentStateMonitor
timeSource common.TimeSource
channel *tchannel.Channel
clientFactory common.ClientFactory
retMgr *retMgrRunner
appConfig configure.CommonAppConfig
m3Client metrics.Client
cfgMgr dconfig.ConfigManager
loadMetrics load.MetricsAggregator
placement Placement
extentSeals struct {
// set of extents for which seal is in progress
// if an extent exist in this set, some worker
// is guaranteed to be working on this extent
Expand Down Expand Up @@ -147,7 +148,7 @@ type (
var _ c.TChanController = (*Mcp)(nil)

// NewController creates and returns a new instance of Mcp controller
func NewController(cfg configure.CommonAppConfig, sVice *common.Service, metadataClient m.TChanMetadataService) (*Mcp, []thrift.TChanServer) {
func NewController(cfg configure.CommonAppConfig, sVice *common.Service, metadataClient m.TChanMetadataService, zoneFailoverManager common.ZoneFailoverManager) (*Mcp, []thrift.TChanServer) {
hostID := uuid.New()

instance := new(Mcp)
Expand All @@ -169,6 +170,7 @@ func NewController(cfg configure.CommonAppConfig, sVice *common.Service, metadat
}

context.localZone, _ = common.GetLocalClusterInfo(strings.ToLower(deploymentName))
context.zoneFailoverManager = zoneFailoverManager

context.dstLock = lockMgr
context.m3Client = metrics.NewClient(instance.Service.GetMetricsReporter(), metrics.Controller)
Expand Down Expand Up @@ -203,6 +205,8 @@ func (mcp *Mcp) Start(thriftService []thrift.TChanServer) {
context.channel = mcp.GetTChannel()
context.rpm = mcp.GetRingpopMonitor()

context.zoneFailoverManager.Start()

context.eventPipeline = NewEventPipeline(context, nEventPipelineWorkers)
context.eventPipeline.Start()

Expand Down Expand Up @@ -235,6 +239,7 @@ func (mcp *Mcp) Start(thriftService []thrift.TChanServer) {
// Stop stops the controller service
func (mcp *Mcp) Stop() {
mcp.hostIDHeartbeater.Stop()
mcp.context.zoneFailoverManager.Stop()
mcp.context.extentMonitor.Stop()
mcp.context.retMgr.Stop()
mcp.context.failureDetector.Stop()
Expand Down
2 changes: 1 addition & 1 deletion services/controllerhost/controllerhost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *McpSuite) startController() {

sVice := common.NewService(serviceName, uuid.New(), serviceConfig, common.NewUUIDResolver(s.mClient), common.NewHostHardwareInfoReader(s.mClient), reporter, dClient)
//serviceConfig.SetRingHosts(
mcp, tc := NewController(s.cfg, sVice, s.mClient)
mcp, tc := NewController(s.cfg, sVice, s.mClient, common.NewDummyZoneFailoverManager())
s.mcp = mcp

context := s.mcp.context
Expand Down
2 changes: 1 addition & 1 deletion services/controllerhost/event_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (s *EventPipelineSuite) SetupTest() {
reporter := common.NewMetricReporterWithHostname(configure.NewCommonServiceConfig())
dClient := dconfig.NewDconfigClient(serviceConfig, common.ControllerServiceName)
sVice := common.NewService(serviceName, uuid.New(), serviceConfig, common.NewUUIDResolver(s.mClient), common.NewHostHardwareInfoReader(s.mClient), reporter, dClient)
mcp, _ := NewController(s.cfg, sVice, s.mClient)
mcp, _ := NewController(s.cfg, sVice, s.mClient, common.NewDummyZoneFailoverManager())
mcp.context.m3Client = &MockM3Metrics{}
s.mcp = mcp
ch, err := tchannel.NewChannel("event-pipeline-test", nil)
Expand Down
2 changes: 1 addition & 1 deletion services/controllerhost/extentmon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *ExtentStateMonitorSuite) SetupTest() {

sVice := common.NewService(serviceName, uuid.New(), serviceConfig, common.NewUUIDResolver(s.mClient), common.NewHostHardwareInfoReader(s.mClient), reporter, dClient)

mcp, _ := NewController(s.cfg, sVice, s.mClient)
mcp, _ := NewController(s.cfg, sVice, s.mClient, common.NewDummyZoneFailoverManager())
mcp.context.m3Client = &MockM3Metrics{}
s.mcp = mcp
ch, err := tchannel.NewChannel("extent-state-monitor-test", nil)
Expand Down
2 changes: 1 addition & 1 deletion test/integration/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {
reporter := common.NewTestMetricsReporter()
dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.ControllerServiceName)
sVice := common.NewService(serviceName, uuid.New(), cfg.ServiceConfig[serviceName], tb.UUIDResolver, hwInfoReader, reporter, dClient)
ch, tc := controllerhost.NewController(cfg, sVice, tb.mClient)
ch, tc := controllerhost.NewController(cfg, sVice, tb.mClient, common.NewDummyZoneFailoverManager())
ch.Start(tc)
tb.Controllers[hostID] = ch
}
Expand Down

0 comments on commit a3d3d4a

Please sign in to comment.