Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit metrics when transfer tasks could be ratelimited #5652

Merged
merged 4 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ frontend.validSearchAttributes:
service: 1
user: 1
IsDeleted: 4
constraints: {}
constraints: {}
1 change: 1 addition & 0 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ func (h *handlerImpl) CreateEngine(
h.GetMatchingRawClient(),
h.queueTaskProcessor,
h.failoverCoordinator,
h.workflowIDCache,
)
}

Expand Down
5 changes: 5 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/task"
"github.com/uber/cadence/service/history/workflow"
"github.com/uber/cadence/service/history/workflowcache"
warchiver "github.com/uber/cadence/service/worker/archiver"
)

Expand Down Expand Up @@ -123,6 +124,7 @@ type (
clientChecker client.VersionChecker
replicationDLQHandler replication.DLQHandler
failoverMarkerNotifier failover.MarkerNotifier
wfIDCache workflowcache.WFCache
}
)

Expand Down Expand Up @@ -152,6 +154,7 @@ func NewEngineWithShardContext(
rawMatchingClient matching.Client,
queueTaskProcessor task.Processor,
failoverCoordinator failover.Coordinator,
wfIDCache workflowcache.WFCache,
) engine.Engine {
currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName()

Expand Down Expand Up @@ -234,6 +237,7 @@ func NewEngineWithShardContext(
replicationTaskStore: replicationTaskStore,
replicationMetricsEmitter: replication.NewMetricsEmitter(
shard.GetShardID(), shard, replicationReader, shard.GetMetricsClient()),
wfIDCache: wfIDCache,
}
historyEngImpl.decisionHandler = decision.NewHandler(
shard,
Expand All @@ -255,6 +259,7 @@ func NewEngineWithShardContext(
historyEngImpl.workflowResetter,
historyEngImpl.archivalClient,
openExecutionCheck,
historyEngImpl.wfIDCache,
)

historyEngImpl.timerProcessor = queue.NewTimerQueueProcessor(
Expand Down
4 changes: 4 additions & 0 deletions service/history/queue/transfer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/uber/cadence/service/history/reset"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/task"
"github.com/uber/cadence/service/history/workflowcache"
"github.com/uber/cadence/service/worker/archiver"
)

Expand Down Expand Up @@ -87,6 +88,7 @@ func NewTransferQueueProcessor(
workflowResetter reset.WorkflowResetter,
archivalClient archiver.Client,
executionCheck invariant.Invariant,
wfIDCache workflowcache.WFCache,
) Processor {
logger := shard.GetLogger().WithTags(tag.ComponentTransferQueue)
currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()
Expand All @@ -100,6 +102,7 @@ func NewTransferQueueProcessor(
workflowResetter,
logger,
config,
wfIDCache,
)

activeQueueProcessor := newTransferQueueActiveProcessor(
Expand Down Expand Up @@ -131,6 +134,7 @@ func NewTransferQueueProcessor(
logger,
clusterName,
config,
wfIDCache,
)
standbyQueueProcessors[clusterName] = newTransferQueueStandbyProcessor(
clusterName,
Expand Down
3 changes: 3 additions & 0 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/reset"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/workflowcache"
"github.com/uber/cadence/service/worker/archiver"
"github.com/uber/cadence/service/worker/parentclosepolicy"
)
Expand Down Expand Up @@ -84,6 +85,7 @@ func NewTransferActiveTaskExecutor(
workflowResetter reset.WorkflowResetter,
logger log.Logger,
config *config.Config,
wfIDCache workflowcache.WFCache,
) Executor {

return &transferActiveTaskExecutor{
Expand All @@ -93,6 +95,7 @@ func NewTransferActiveTaskExecutor(
executionCache,
logger,
config,
wfIDCache,
),
historyClient: shard.GetService().GetHistoryClient(),
parentClosePolicyClient: parentclosepolicy.NewClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
test "github.com/uber/cadence/service/history/testing"
"github.com/uber/cadence/service/history/workflowcache"
warchiver "github.com/uber/cadence/service/worker/archiver"
"github.com/uber/cadence/service/worker/parentclosepolicy"
)
Expand All @@ -66,6 +67,7 @@ type (
mockShard *shard.TestContext
mockEngine *engine.MockEngine
mockDomainCache *cache.MockDomainCache
mockWFCache *workflowcache.MockWFCache
mockHistoryClient *hclient.MockClient
mockMatchingClient *matching.MockClient

Expand Down Expand Up @@ -168,6 +170,7 @@ func (s *transferActiveTaskExecutorSuite) SetupTest() {
s.mockArchivalMetadata = s.mockShard.Resource.ArchivalMetadata
s.mockArchiverProvider = s.mockShard.Resource.ArchiverProvider
s.mockDomainCache = s.mockShard.Resource.DomainCache
s.mockWFCache = workflowcache.NewMockWFCache(s.controller)
s.mockDomainCache.EXPECT().GetDomainByID(s.domainID).Return(s.domainEntry, nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomainName(s.domainID).Return(s.domainName, nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomainID(s.domainName).Return(s.domainID, nil).AnyTimes()
Expand Down Expand Up @@ -197,6 +200,7 @@ func (s *transferActiveTaskExecutorSuite) SetupTest() {
nil,
s.logger,
config,
s.mockWFCache,
).(*transferActiveTaskExecutor)
s.transferActiveTaskExecutor.parentClosePolicyClient = s.mockParentClosePolicyClient
}
Expand Down Expand Up @@ -239,7 +243,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessActivityTask_Success() {
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockMatchingClient.EXPECT().AddActivityTask(gomock.Any(), createAddActivityTaskRequest(transferTask, ai, mutableState.GetExecutionInfo().PartitionConfig)).Return(nil).Times(1)

s.mockWFCache.EXPECT().AllowInternal(constants.TestDomainID, constants.TestWorkflowID).Return(true).Times(1)
err = s.transferActiveTaskExecutor.Execute(transferTask, true)
s.Nil(err)
}
Expand Down
3 changes: 3 additions & 0 deletions service/history/task/transfer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/workflowcache"
"github.com/uber/cadence/service/worker/archiver"
)

Expand All @@ -56,6 +57,7 @@ func NewTransferStandbyTaskExecutor(
logger log.Logger,
clusterName string,
config *config.Config,
wfIDCache workflowcache.WFCache,
) Executor {
return &transferStandbyTaskExecutor{
transferTaskExecutorBase: newTransferTaskExecutorBase(
Expand All @@ -64,6 +66,7 @@ func NewTransferStandbyTaskExecutor(
executionCache,
logger,
config,
wfIDCache,
),
clusterName: clusterName,
historyResender: historyResender,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
test "github.com/uber/cadence/service/history/testing"
"github.com/uber/cadence/service/history/workflowcache"
warchiver "github.com/uber/cadence/service/worker/archiver"
)

Expand All @@ -60,6 +61,7 @@ type (
controller *gomock.Controller
mockShard *shard.TestContext
mockDomainCache *cache.MockDomainCache
mockWFCache *workflowcache.MockWFCache
mockNDCHistoryResender *ndc.MockHistoryResender
mockMatchingClient *matching.MockClient

Expand Down Expand Up @@ -136,6 +138,7 @@ func (s *transferStandbyTaskExecutorSuite) SetupTest() {
s.mockArchivalMetadata = s.mockShard.Resource.ArchivalMetadata
s.mockArchiverProvider = s.mockShard.Resource.ArchiverProvider
s.mockDomainCache = s.mockShard.Resource.DomainCache
s.mockWFCache = workflowcache.NewMockWFCache(s.controller)
s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomainName(constants.TestDomainID).Return(constants.TestDomainName, nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomain(constants.TestDomainName).Return(constants.TestGlobalDomainEntry, nil).AnyTimes()
Expand All @@ -159,6 +162,7 @@ func (s *transferStandbyTaskExecutorSuite) SetupTest() {
s.logger,
s.clusterName,
config,
s.mockWFCache,
).(*transferStandbyTaskExecutor)
}

Expand Down Expand Up @@ -236,7 +240,7 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessActivityTask_Pending_PushT
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockMatchingClient.EXPECT().AddActivityTask(gomock.Any(), createAddActivityTaskRequest(transferTask, ai, mutableState.GetExecutionInfo().PartitionConfig)).Return(nil).Times(1)

s.mockWFCache.EXPECT().AllowInternal(constants.TestDomainID, constants.TestWorkflowID).Return(true).Times(1)
s.mockShard.SetCurrentTime(s.clusterName, now)
err = s.transferStandbyTaskExecutor.Execute(transferTask, true)
s.Nil(err)
Expand Down
7 changes: 7 additions & 0 deletions service/history/task/transfer_task_executor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/workflowcache"
"github.com/uber/cadence/service/worker/archiver"
)

Expand All @@ -59,6 +60,7 @@ type (
visibilityMgr persistence.VisibilityManager
config *config.Config
throttleRetry *backoff.ThrottleRetry
wfIDCache workflowcache.WFCache
}
)

Expand All @@ -68,6 +70,7 @@ func newTransferTaskExecutorBase(
executionCache *execution.Cache,
logger log.Logger,
config *config.Config,
wfIDCache workflowcache.WFCache,
) *transferTaskExecutorBase {
return &transferTaskExecutorBase{
shard: shard,
Expand All @@ -82,6 +85,7 @@ func newTransferTaskExecutorBase(
backoff.WithRetryPolicy(taskRetryPolicy),
backoff.WithRetryableError(common.IsServiceTransientError),
),
wfIDCache: wfIDCache,
}
}

Expand All @@ -99,6 +103,9 @@ func (t *transferTaskExecutorBase) pushActivity(
t.logger.Fatal("Cannot process non activity task", tag.TaskType(task.GetTaskType()))
}

// Ratelimiting is not done. This is only to count the number of requests via metrics
t.wfIDCache.AllowInternal(task.DomainID, task.WorkflowID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is called by transfer active task executor and transfer standby task executor. Are we going to count those together in same rate limit quota?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a good catch. I have considered to address this in a follow up PR


return t.matchingClient.AddActivityTask(ctx, &types.AddActivityTaskRequest{
DomainUUID: task.TargetDomainID,
SourceDomainUUID: task.DomainID,
Expand Down