Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async workflows integration test with kafka #5678

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,6 @@ COPY docker/start.sh /start.sh

CMD /start.sh

# All-in-one Cadence server with Kafka (~550mb)
FROM cadence-auto-setup AS cadence-auto-setup-with-kafka

RUN apk add openjdk11
RUN wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz -O kafka.tgz
RUN mkdir -p kafka
RUN tar -xvzf kafka.tgz --strip 1 -C kafka
ENV KAFKA_HOME /etc/cadence/kafka

# Cadence CLI
FROM alpine AS cadence-cli

Expand Down
12 changes: 10 additions & 2 deletions common/asyncworkflow/queue/consumer/default_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (

const (
defaultShutdownTimeout = 5 * time.Second
defaultStartWFTimeout = 3 * time.Second
defaultConcurrency = 100
)

Expand All @@ -60,6 +61,7 @@ type DefaultConsumer struct {
cancelFn context.CancelFunc
wg sync.WaitGroup
shutdownTimeout time.Duration
startWFTimeout time.Duration
msgDecoder codec.BinaryEncoder
concurrency int
}
Expand Down Expand Up @@ -90,6 +92,7 @@ func New(
ctx: ctx,
cancelFn: cancelFn,
shutdownTimeout: defaultShutdownTimeout,
startWFTimeout: defaultStartWFTimeout,
msgDecoder: codec.NewThriftRWEncoder(),
concurrency: defaultConcurrency,
}
Expand Down Expand Up @@ -190,9 +193,12 @@ func (c *DefaultConsumer) processRequest(logger log.Logger, request *sqlblobs.As

yarpcCallOpts := getYARPCOptions(request.GetHeader())
scope := scope.Tagged(metrics.DomainTag(startWFReq.GetDomain()))

var resp *types.StartWorkflowExecutionResponse
op := func() error {
resp, err = c.frontendClient.StartWorkflowExecution(c.ctx, startWFReq, yarpcCallOpts...)
ctx, cancel := context.WithTimeout(c.ctx, c.startWFTimeout)
defer cancel()
resp, err = c.frontendClient.StartWorkflowExecution(ctx, startWFReq, yarpcCallOpts...)
return err
}

Expand All @@ -214,7 +220,9 @@ func (c *DefaultConsumer) processRequest(logger log.Logger, request *sqlblobs.As
scope := c.scope.Tagged(metrics.DomainTag(startWFReq.GetDomain()))
var resp *types.StartWorkflowExecutionResponse
op := func() error {
resp, err = c.frontendClient.SignalWithStartWorkflowExecution(c.ctx, startWFReq, yarpcCallOpts...)
ctx, cancel := context.WithTimeout(c.ctx, c.startWFTimeout)
defer cancel()
resp, err = c.frontendClient.SignalWithStartWorkflowExecution(ctx, startWFReq, yarpcCallOpts...)
return err
}

Expand Down
80 changes: 49 additions & 31 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type (
GetCacheSize() (sizeOfCacheByName int64, sizeOfCacheByID int64)
}

domainCache struct {
DefaultDomainCache struct {
taylanisikdemir marked this conversation as resolved.
Show resolved Hide resolved
status int32
shutdownChan chan struct{}
clusterGroup string
Expand Down Expand Up @@ -141,15 +141,26 @@ type (
}
)

type DomainCacheOption func(*DefaultDomainCache)

func WithTimeSource(timeSource clock.TimeSource) DomainCacheOption {
return func(cache *DefaultDomainCache) {
if timeSource != nil {
cache.timeSource = timeSource
}
}
}

// NewDomainCache creates a new instance of cache for holding onto domain information to reduce the load on persistence
func NewDomainCache(
domainManager persistence.DomainManager,
metadata cluster.Metadata,
metricsClient metrics.Client,
logger log.Logger,
) DomainCache {
opts ...DomainCacheOption,
) *DefaultDomainCache {

cache := &domainCache{
cache := &DefaultDomainCache{
status: domainCacheInitialized,
shutdownChan: make(chan struct{}),
clusterGroup: getClusterGroupIdentifier(metadata),
Expand All @@ -165,6 +176,10 @@ func NewDomainCache(
cache.cacheNameToID.Store(newDomainCache())
cache.cacheByID.Store(newDomainCache())

for _, opt := range opts {
opt(cache)
}

return cache
}

Expand Down Expand Up @@ -239,12 +254,12 @@ func NewDomainCacheEntryForTest(
}
}

func (c *domainCache) GetCacheSize() (sizeOfCacheByName int64, sizeOfCacheByID int64) {
func (c *DefaultDomainCache) GetCacheSize() (sizeOfCacheByName int64, sizeOfCacheByID int64) {
return int64(c.cacheByID.Load().(Cache).Size()), int64(c.cacheNameToID.Load().(Cache).Size())
}

// Start starts the background refresh of domain
func (c *domainCache) Start() {
func (c *DefaultDomainCache) Start() {
if !atomic.CompareAndSwapInt32(&c.status, domainCacheInitialized, domainCacheStarted) {
return
}
Expand All @@ -258,14 +273,14 @@ func (c *domainCache) Start() {
}

// Stop stops background refresh of domain
func (c *domainCache) Stop() {
func (c *DefaultDomainCache) Stop() {
if !atomic.CompareAndSwapInt32(&c.status, domainCacheStarted, domainCacheStopped) {
return
}
close(c.shutdownChan)
}

func (c *domainCache) GetAllDomain() map[string]*DomainCacheEntry {
func (c *DefaultDomainCache) GetAllDomain() map[string]*DomainCacheEntry {
result := make(map[string]*DomainCacheEntry)
ite := c.cacheByID.Load().(Cache).Iterator()
defer ite.Close()
Expand All @@ -286,7 +301,7 @@ func (c *domainCache) GetAllDomain() map[string]*DomainCacheEntry {
// WARN: the beforeCallback function will be triggered by domain cache when holding the domain cache lock,
// make sure the callback function will not call domain cache again in case of dead lock
// afterCallback will be invoked when NOT holding the domain cache lock.
func (c *domainCache) RegisterDomainChangeCallback(
func (c *DefaultDomainCache) RegisterDomainChangeCallback(
shard int,
initialNotificationVersion int64,
prepareCallback PrepareCallbackFn,
Expand Down Expand Up @@ -321,7 +336,7 @@ func (c *domainCache) RegisterDomainChangeCallback(
}

// UnregisterDomainChangeCallback delete a domain failover callback
func (c *domainCache) UnregisterDomainChangeCallback(
func (c *DefaultDomainCache) UnregisterDomainChangeCallback(
shard int,
) {

Expand All @@ -334,7 +349,7 @@ func (c *domainCache) UnregisterDomainChangeCallback(

// GetDomain retrieves the information from the cache if it exists, otherwise retrieves the information from metadata
// store and writes it to the cache with an expiry before returning back
func (c *domainCache) GetDomain(
func (c *DefaultDomainCache) GetDomain(
name string,
) (*DomainCacheEntry, error) {

Expand All @@ -347,7 +362,7 @@ func (c *domainCache) GetDomain(

// GetDomainByID retrieves the information from the cache if it exists, otherwise retrieves the information from metadata
// store and writes it to the cache with an expiry before returning back
func (c *domainCache) GetDomainByID(
func (c *DefaultDomainCache) GetDomainByID(
id string,
) (*DomainCacheEntry, error) {

Expand All @@ -358,7 +373,7 @@ func (c *domainCache) GetDomainByID(
}

// GetDomainID retrieves domainID by using GetDomain
func (c *domainCache) GetDomainID(
func (c *DefaultDomainCache) GetDomainID(
name string,
) (string, error) {

Expand All @@ -370,7 +385,7 @@ func (c *domainCache) GetDomainID(
}

// GetDomainName returns domain name given the domain id
func (c *domainCache) GetDomainName(
func (c *DefaultDomainCache) GetDomainName(
id string,
) (string, error) {

Expand All @@ -381,37 +396,38 @@ func (c *domainCache) GetDomainName(
return entry.info.Name, nil
}

func (c *domainCache) refreshLoop() {
timer := time.NewTicker(DomainCacheRefreshInterval)
func (c *DefaultDomainCache) refreshLoop() {
timer := c.timeSource.NewTicker(DomainCacheRefreshInterval)
defer timer.Stop()

for {
select {
case <-c.shutdownChan:
return
case <-timer.C:
case <-timer.Chan():
for err := c.refreshDomains(); err != nil; err = c.refreshDomains() {
select {
case <-c.shutdownChan:
return
default:
c.logger.Error("Error refreshing domain cache", tag.Error(err))
time.Sleep(DomainCacheRefreshFailureRetryInterval)
c.timeSource.Sleep(DomainCacheRefreshFailureRetryInterval)
}
}
c.logger.Debug("Domain cache refreshed")
}
}
}

func (c *domainCache) refreshDomains() error {
func (c *DefaultDomainCache) refreshDomains() error {
c.refreshLock.Lock()
defer c.refreshLock.Unlock()
return c.refreshDomainsLocked()
}

// this function only refresh the domains in the v2 table
// the domains in the v1 table will be refreshed if cache is stale
func (c *domainCache) refreshDomainsLocked() error {
func (c *DefaultDomainCache) refreshDomainsLocked() error {
now := c.timeSource.Now()
if now.Sub(c.lastRefreshTime) < domainCacheMinRefreshInterval {
return nil
Expand Down Expand Up @@ -509,7 +525,7 @@ UpdateLoop:
return nil
}

func (c *domainCache) checkDomainExists(
func (c *DefaultDomainCache) checkDomainExists(
name string,
id string,
) error {
Expand All @@ -520,7 +536,7 @@ func (c *domainCache) checkDomainExists(
return err
}

func (c *domainCache) updateNameToIDCache(
func (c *DefaultDomainCache) updateNameToIDCache(
cacheNameToID Cache,
name string,
id string,
Expand All @@ -529,7 +545,7 @@ func (c *domainCache) updateNameToIDCache(
cacheNameToID.Put(name, id)
}

func (c *domainCache) updateIDToDomainCache(
func (c *DefaultDomainCache) updateIDToDomainCache(
cacheByID Cache,
id string,
record *DomainCacheEntry,
Expand Down Expand Up @@ -562,7 +578,7 @@ func (c *domainCache) updateIDToDomainCache(

// getDomain retrieves the information from the cache if it exists, otherwise retrieves the information from metadata
// store and writes it to the cache with an expiry before returning back
func (c *domainCache) getDomain(
func (c *DefaultDomainCache) getDomain(
name string,
) (*DomainCacheEntry, error) {

Expand All @@ -589,12 +605,12 @@ func (c *domainCache) getDomain(
return c.getDomainByID(id, true)
}
// impossible case
return nil, &types.InternalServiceError{Message: "domainCache encounter case where domain exists but cannot be loaded"}
return nil, &types.InternalServiceError{Message: "DefaultDomainCache encounter case where domain exists but cannot be loaded"}
}

// getDomainByID retrieves the information from the cache if it exists, otherwise retrieves the information from metadata
// store and writes it to the cache with an expiry before returning back
func (c *domainCache) getDomainByID(
func (c *DefaultDomainCache) getDomainByID(
id string,
deepCopy bool,
) (*DomainCacheEntry, error) {
Expand Down Expand Up @@ -641,10 +657,10 @@ func (c *domainCache) getDomainByID(
return result, nil
}
// impossible case
return nil, &types.InternalServiceError{Message: "domainCache encounter case where domain exists but cannot be loaded"}
return nil, &types.InternalServiceError{Message: "DefaultDomainCache encounter case where domain exists but cannot be loaded"}
}

func (c *domainCache) triggerDomainChangePrepareCallbackLocked() {
func (c *DefaultDomainCache) triggerDomainChangePrepareCallbackLocked() {
sw := c.scope.StartTimer(metrics.DomainCachePrepareCallbacksLatency)
defer sw.Stop()

Expand All @@ -653,7 +669,7 @@ func (c *domainCache) triggerDomainChangePrepareCallbackLocked() {
}
}

func (c *domainCache) triggerDomainChangeCallbackLocked(
func (c *DefaultDomainCache) triggerDomainChangeCallbackLocked(
nextDomains []*DomainCacheEntry,
) {

Expand All @@ -665,7 +681,7 @@ func (c *domainCache) triggerDomainChangeCallbackLocked(
}
}

func (c *domainCache) buildEntryFromRecord(
func (c *DefaultDomainCache) buildEntryFromRecord(
record *persistence.GetDomainResponse,
) *DomainCacheEntry {

Expand Down Expand Up @@ -718,12 +734,14 @@ func (entry *DomainCacheEntry) duplicate() *DomainCacheEntry {
VisibilityArchivalStatus: entry.config.VisibilityArchivalStatus,
VisibilityArchivalURI: entry.config.VisibilityArchivalURI,
BadBinaries: copyResetBinary(entry.config.BadBinaries),
AsyncWorkflowConfig: entry.config.AsyncWorkflowConfig.DeepCopy(),
// Q: Should we set IsolationGroups as well? Othewise domaincache will not be able to detect changes in isolation groups
taylanisikdemir marked this conversation as resolved.
Show resolved Hide resolved
}
result.replicationConfig = &persistence.DomainReplicationConfig{
ActiveClusterName: entry.replicationConfig.ActiveClusterName,
}
for _, clusterName := range entry.replicationConfig.Clusters {
result.replicationConfig.Clusters = append(result.replicationConfig.Clusters, &*clusterName)
for _, clusterCfg := range entry.replicationConfig.Clusters {
result.replicationConfig.Clusters = append(result.replicationConfig.Clusters, &*clusterCfg)
}
result.configVersion = entry.configVersion
result.failoverVersion = entry.failoverVersion
Expand Down
11 changes: 8 additions & 3 deletions common/cache/domainCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type (

metadataMgr *mocks.MetadataManager

domainCache *domainCache
domainCache *DefaultDomainCache
logger log.Logger
}
)
Expand All @@ -75,7 +75,7 @@ func (s *domainCacheSuite) SetupTest() {

s.metadataMgr = &mocks.MetadataManager{}
metricsClient := metrics.NewClient(tally.NoopScope, metrics.History)
s.domainCache = NewDomainCache(s.metadataMgr, cluster.GetTestClusterMetadata(true), metricsClient, s.logger).(*domainCache)
s.domainCache = NewDomainCache(s.metadataMgr, cluster.GetTestClusterMetadata(true), metricsClient, s.logger)

s.domainCache.timeSource = clock.NewMockedTimeSource()
}
Expand All @@ -93,7 +93,12 @@ func (s *domainCacheSuite) TestListDomain() {
Retention: 1,
BadBinaries: types.BadBinaries{
Binaries: map[string]*types.BadBinaryInfo{},
}},
},
AsyncWorkflowConfig: types.AsyncWorkflowConfiguration{
Enabled: true,
PredefinedQueueName: "test-async-wf-queue",
},
},
ReplicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
Expand Down
2 changes: 2 additions & 0 deletions common/resource/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/uber/cadence/common/asyncworkflow/queue"
"github.com/uber/cadence/common/authorization"
"github.com/uber/cadence/common/blobstore"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/dynamicconfig"
Expand Down Expand Up @@ -78,5 +79,6 @@ type (
PinotConfig *config.PinotVisibilityConfig
PinotClient pinot.GenericClient
AsyncWorkflowQueueProvider queue.Provider
TimeSource clock.TimeSource
}
)
1 change: 1 addition & 0 deletions common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func New(
params.ClusterMetadata,
params.MetricsClient,
logger,
cache.WithTimeSource(params.TimeSource),
)

domainMetricsScopeCache := cache.NewDomainMetricsScopeCache()
Expand Down
Loading