Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async wf consumer manager should watch its enabled/disabled state instead of relying on restart #5966

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 73 additions & 19 deletions service/worker/asyncworkflow/async_workflow_consumer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/uber/cadence/common/asyncworkflow/queue/provider"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
Expand All @@ -58,6 +59,18 @@ func WithRefreshInterval(interval time.Duration) ConsumerManagerOptions {
}
}

func WithEnabledPropertyFn(enabledFn dynamicconfig.BoolPropertyFn) ConsumerManagerOptions {
return func(c *ConsumerManager) {
c.enabledFn = enabledFn
}
}

func WithEmitConsumerCountMetrifFn(fn func(int)) ConsumerManagerOptions {
return func(c *ConsumerManager) {
c.emitConsumerCountMetricFn = fn
}
}

func NewConsumerManager(
logger log.Logger,
metricsClient metrics.Client,
Expand All @@ -68,6 +81,7 @@ func NewConsumerManager(
) *ConsumerManager {
ctx, cancel := context.WithCancel(context.Background())
cm := &ConsumerManager{
enabledFn: dynamicconfig.GetBoolPropertyFn(true),
logger: logger.WithTags(tag.ComponentAsyncWFConsumptionManager),
metricsClient: metricsClient,
domainCache: domainCache,
Expand All @@ -81,25 +95,30 @@ func NewConsumerManager(
timeSrc: clock.NewRealTimeSource(),
}

cm.emitConsumerCountMetricFn = cm.emitConsumerCountMetric

for _, opt := range options {
opt(cm)
}
return cm
}

type ConsumerManager struct {
logger log.Logger
metricsClient metrics.Client
timeSrc clock.TimeSource
domainCache cache.DomainCache
queueProvider queue.Provider
frontendClient frontend.Client
refreshInterval time.Duration
shutdownTimeout time.Duration
ctx context.Context
cancelFn context.CancelFunc
wg sync.WaitGroup
activeConsumers map[string]provider.Consumer
// all member variables are accessed without any mutex with the assumption that they are only accessed by the background loop
enabledFn dynamicconfig.BoolPropertyFn
logger log.Logger
metricsClient metrics.Client
timeSrc clock.TimeSource
domainCache cache.DomainCache
queueProvider queue.Provider
frontendClient frontend.Client
refreshInterval time.Duration
shutdownTimeout time.Duration
ctx context.Context
cancelFn context.CancelFunc
wg sync.WaitGroup
activeConsumers map[string]provider.Consumer
emitConsumerCountMetricFn func(int)
}

func (c *ConsumerManager) Start() {
Expand All @@ -117,10 +136,7 @@ func (c *ConsumerManager) Stop() {
return
}

for qID, consumer := range c.activeConsumers {
consumer.Stop()
c.logger.Info("Stopped consumer", tag.AsyncWFQueueID(qID))
}
c.stopConsumers()

c.logger.Info("Stopped ConsumerManager")
}
Expand All @@ -132,12 +148,30 @@ func (c *ConsumerManager) run() {
defer ticker.Stop()
c.logger.Info("ConsumerManager background loop started", tag.Dynamic("refresh-interval", c.refreshInterval))

c.refreshConsumers()
enabled := c.enabledFn()
if enabled {
c.refreshConsumers()
} else {
c.logger.Info("ConsumerManager is disabled at the moment so skipping initial refresh")
}

for {
select {
case <-ticker.Chan():
c.refreshConsumers()
previouslyEnabled := enabled
enabled = c.enabledFn()
if enabled != previouslyEnabled {
c.logger.Info("ConsumerManager enabled state changed", tag.Dynamic("enabled", enabled))
}

if enabled {
// refresh consumers every round when consumer is enabled
c.refreshConsumers()
} else {
// stop consumers when consumer is disabled
c.stopConsumers()
}

case <-c.ctx.Done():
c.logger.Info("ConsumerManager background loop stopped because context is done")
return
Expand Down Expand Up @@ -218,7 +252,27 @@ func (c *ConsumerManager) refreshConsumers() {
}

c.logger.Info("Refreshed consumers", tag.Dynamic("consumer-count", len(c.activeConsumers)))
c.metricsClient.Scope(metrics.AsyncWorkflowConsumerScope).UpdateGauge(metrics.AsyncWorkflowConsumerCount, float64(len(c.activeConsumers)))
c.emitConsumerCountMetricFn(len(c.activeConsumers))
}

func (c *ConsumerManager) emitConsumerCountMetric(count int) {
c.metricsClient.Scope(metrics.AsyncWorkflowConsumerScope).UpdateGauge(metrics.AsyncWorkflowConsumerCount, float64(count))
}

func (c *ConsumerManager) stopConsumers() {
if len(c.activeConsumers) == 0 {
return
}

c.logger.Info("Stopping all active consumers", tag.Dynamic("consumer-count", len(c.activeConsumers)))
for qID, consumer := range c.activeConsumers {
consumer.Stop()
c.logger.Info("Stopped consumer", tag.AsyncWFQueueID(qID))
delete(c.activeConsumers, qID)
}

c.emitConsumerCountMetricFn(len(c.activeConsumers))
c.logger.Info("Stopped all active consumers", tag.Dynamic("consumer-count", len(c.activeConsumers)))
}

func (c *ConsumerManager) getQueue(cfg types.AsyncWorkflowConfiguration) (provider.Queue, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"errors"
"fmt"
"sort"
"sync/atomic"
"testing"
"time"

Expand All @@ -36,6 +37,7 @@ import (
"github.com/uber/cadence/common/asyncworkflow/queue/provider"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -319,6 +321,84 @@ func TestConsumerManager(t *testing.T) {
}
}

func TestConsumerManagerEnabledDisabled(t *testing.T) {
ctrl := gomock.NewController(t)
mockTimeSrc := clock.NewMockedTimeSource()
mockDomainCache := cache.NewMockDomainCache(ctrl)
mockQueueProvider := queue.NewMockProvider(ctrl)
dwc := domainWithConfig{
name: "domain1",
asyncWFCfg: types.AsyncWorkflowConfiguration{
Enabled: true,
QueueType: "kafka",
QueueConfig: &types.DataBlob{
EncodingType: types.EncodingTypeJSON.Ptr(),
Data: []byte(`{"brokers":["localhost:9092"],"topics":["test-topic"]}`),
},
},
}

mockDomainCache.EXPECT().GetAllDomain().Return(toDomainCacheEntries([]domainWithConfig{dwc})).AnyTimes()
queueMock := provider.NewMockQueue(ctrl)
queueMock.EXPECT().ID().Return(queueID(dwc.asyncWFCfg)).AnyTimes()
mockQueueProvider.EXPECT().GetQueue(gomock.Any(), gomock.Any()).Return(queueMock, nil).AnyTimes()
mockConsumer := provider.NewMockConsumer(ctrl)
mockConsumer.EXPECT().Start().Return(nil).AnyTimes()
mockConsumer.EXPECT().Stop().AnyTimes()
queueMock.EXPECT().CreateConsumer(gomock.Any()).Return(mockConsumer, nil).AnyTimes()

var consumerMgrEnabled, consumerCount int32

// create consumer manager
cm := NewConsumerManager(
testlogger.New(t),
metrics.NewNoopMetricsClient(),
mockDomainCache,
mockQueueProvider,
nil,
WithTimeSource(mockTimeSrc),
WithEnabledPropertyFn(func(opts ...dynamicconfig.FilterOption) bool {
return atomic.LoadInt32(&consumerMgrEnabled) == 1
}),
WithEmitConsumerCountMetrifFn(func(count int) {
atomic.StoreInt32(&consumerCount, int32(count))
}),
)

cm.Start()
defer cm.Stop()

// wait for the first round of consumers to be created and verify consumer count
atomic.StoreInt32(&consumerMgrEnabled, 1)
time.Sleep(50 * time.Millisecond)
t.Log("first round comparison")
got := atomic.LoadInt32(&consumerCount)
want := 1 // consumer manager is enabled
if got != int32(want) {
t.Fatalf("Consumer count mismatch after first round, want: %v, got: %v", want, got)
}

// disable consumer manager and wait for the second round of refresh
atomic.StoreInt32(&consumerMgrEnabled, 0)
mockTimeSrc.Advance(defaultRefreshInterval)
time.Sleep(50 * time.Millisecond)
got = atomic.LoadInt32(&consumerCount)
want = 0 // all consumers should be stopped when consumer manager is disabled
if got != int32(want) {
t.Fatalf("Consumer count mismatch after second round, want: %v, got: %v", want, got)
}

// enable consumer manager and wait for the third round of refresh
atomic.StoreInt32(&consumerMgrEnabled, 1)
mockTimeSrc.Advance(defaultRefreshInterval)
time.Sleep(50 * time.Millisecond)
got = atomic.LoadInt32(&consumerCount)
want = 1 // consumer manager is enabled
if got != int32(want) {
t.Fatalf("Consumer count mismatch after third round, want: %v, got: %v", want, got)
}
}

func toDomainCacheEntries(domains []domainWithConfig) map[string]*cache.DomainCacheEntry {
result := make(map[string]*cache.DomainCacheEntry, len(domains))
for _, d := range domains {
Expand Down
7 changes: 3 additions & 4 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,8 @@ func (s *Service) Start() {
s.startFailoverManager()
}

if s.config.EnableAsyncWorkflowConsumption() {
cm := s.startAsyncWorkflowConsumerManager()
defer cm.Stop()
}
cm := s.startAsyncWorkflowConsumerManager()
defer cm.Stop()

logger.Info("worker started", tag.ComponentWorker)
<-s.stopC
Expand Down Expand Up @@ -407,6 +405,7 @@ func (s *Service) startAsyncWorkflowConsumerManager() common.Daemon {
s.GetDomainCache(),
s.Resource.GetAsyncWorkflowQueueProvider(),
s.GetFrontendClient(),
asyncworkflow.WithEnabledPropertyFn(s.config.EnableAsyncWorkflowConsumption),
)
cm.Start()
return cm
Expand Down
Loading