Skip to content

Commit

Permalink
Add Functional Tests and Fix Bugs (#6889)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->

## Why?
<!-- Tell your future self why have you made these changes -->

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
carlydf authored and dnr committed Nov 26, 2024
1 parent 679fb19 commit 0c0a67e
Show file tree
Hide file tree
Showing 10 changed files with 568 additions and 30 deletions.
4 changes: 3 additions & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,9 @@ Deleted Redirect Rules will be kept in the DB (with DeleteTimestamp). After this
"matching.wv.ReachabilityBuildIdVisibilityGracePeriod",
3*time.Minute,
`ReachabilityBuildIdVisibilityGracePeriod is the time period for which deleted versioning rules are still considered active
to account for the delay in updating the build id field in visibility.`,
to account for the delay in updating the build id field in visibility. Not yet supported for GetDeploymentReachability. We recommend waiting
at least 2 minutes between changing the current deployment and calling GetDeployment, so that newly started workflow executions using the
recently-current deployment can arrive in visibility.`,
)
ReachabilityTaskQueueScanLimit = NewGlobalIntSetting(
"limit.reachabilityTaskQueueScan",
Expand Down
6 changes: 6 additions & 0 deletions common/rpc/interceptor/redirection.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ var (
"DescribeBatchOperation": func() any { return &workflowservice.DescribeBatchOperationResponse{} },
"ListBatchOperations": func() any { return &workflowservice.ListBatchOperationsResponse{} },
"UpdateActivityOptionsById": func() any { return &workflowservice.UpdateActivityOptionsByIdResponse{} },

"DescribeDeployment": func() any { return &workflowservice.DescribeDeploymentResponse{} },
"ListDeployments": func() any { return &workflowservice.ListDeploymentsResponse{} },
"GetDeploymentReachability": func() any { return &workflowservice.GetDeploymentReachabilityResponse{} },
"GetCurrentDeployment": func() any { return &workflowservice.GetCurrentDeploymentResponse{} },
"SetCurrentDeployment": func() any { return &workflowservice.SetCurrentDeploymentResponse{} },
}
)

Expand Down
10 changes: 3 additions & 7 deletions common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ const (
UnversionedSearchAttribute = buildIdSearchAttributePrefixUnversioned
)

// TODO (carly): fix delimiter
// escapeBuildIdSearchAttributeDelimiter is a helper which escapes the BuildIdSearchAttributeDelimiter character in the input string
func escapeBuildIdSearchAttributeDelimiter(s string) string {
s = strings.Replace(s, BuildIdSearchAttributeDelimiter, `|`+BuildIdSearchAttributeDelimiter, -1)
Expand All @@ -63,16 +62,13 @@ func escapeBuildIdSearchAttributeDelimiter(s string) string {
// BuildIds KeywordList in this format. If the workflow becomes unpinned or unversioned, this entry will be removed from
// that list.
func PinnedBuildIdSearchAttribute(deployment *deploymentpb.Deployment) string {
escapedDeployment := fmt.Sprintf("%s%s%s",
return fmt.Sprintf("%s%s%s%s%s",
BuildIdSearchAttributePrefixPinned,
BuildIdSearchAttributeDelimiter,
escapeBuildIdSearchAttributeDelimiter(deployment.GetSeriesName()),
BuildIdSearchAttributeDelimiter,
escapeBuildIdSearchAttributeDelimiter(deployment.GetBuildId()),
)
return sqlparser.String(sqlparser.NewStrVal([]byte(fmt.Sprintf("%s%s%s",
BuildIdSearchAttributePrefixPinned,
BuildIdSearchAttributeDelimiter,
escapedDeployment,
))))
}

// AssignedBuildIdSearchAttribute returns the search attribute value for the currently assigned build ID
Expand Down
17 changes: 7 additions & 10 deletions service/history/api/updateworkflowoptions/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ package updateworkflowoptions
import (
"context"
"fmt"

"google.golang.org/protobuf/types/known/fieldmaskpb"

"go.temporal.io/api/serviceerror"
Expand Down Expand Up @@ -64,24 +63,22 @@ func Invoke(
req.GetWorkflowExecution().GetWorkflowId(),
req.GetWorkflowExecution().GetRunId(),
),
func(workflowLease api.WorkflowLease) (_ *api.UpdateWorkflowAction, updateError error) {
func(workflowLease api.WorkflowLease) (*api.UpdateWorkflowAction, error) {
mutableState := workflowLease.GetMutableState()
defer func() { workflowLease.GetReleaseFn()(updateError) }()

if !mutableState.IsWorkflowExecutionRunning() {
// in-memory mutable state is still clean, let updateError=nil in the defer func()
// to prevent clearing and reloading mutable state while releasing the lock
return nil, consts.ErrWorkflowCompleted
}

// Merge the requested options mentioned in the field mask with the current options in the mutable state
mergedOpts, updateError := applyWorkflowExecutionOptions(
mergedOpts, err := applyWorkflowExecutionOptions(
getOptionsFromMutableState(mutableState),
req.GetWorkflowExecutionOptions(),
req.GetUpdateMask(),
)
if updateError != nil {
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("error parsing update_options: %v", updateError))
if err != nil {
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("error applying update_options: %v", err))
}

// Set options for gRPC response
Expand All @@ -95,9 +92,9 @@ func Invoke(
}, nil
}

_, updateError = mutableState.AddWorkflowExecutionOptionsUpdatedEvent(req.GetWorkflowExecutionOptions().GetVersioningOverride())
if updateError != nil {
return nil, updateError
_, err = mutableState.AddWorkflowExecutionOptionsUpdatedEvent(mergedOpts.GetVersioningOverride())
if err != nil {
return nil, err
}

// TODO (carly) part 2: handle safe deployment change --> CreateWorkflowTask=true
Expand Down
5 changes: 4 additions & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2740,7 +2740,10 @@ func (ms *MutableStateImpl) saveBuildIds(buildIds []string, maxSearchAttributeVa
ms.executionInfo.SearchAttributes = searchAttributes
}

hasUnversionedOrAssigned := worker_versioning.IsUnversionedOrAssignedBuildIdSearchAttribute(buildIds[0])
hasUnversionedOrAssigned := false
if len(buildIds) > 0 { // len is 0 if we are removing the pinned search attribute and the workflow was never unversioned or assigned
hasUnversionedOrAssigned = worker_versioning.IsUnversionedOrAssignedBuildIdSearchAttribute(buildIds[0])
}
for {
saPayload, err := searchattribute.EncodeValue(buildIds, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions service/worker/deployment/deployment_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ type DeploymentClientImpl struct {
VisibilityManager manager.VisibilityManager
MaxIDLengthLimit dynamicconfig.IntPropertyFn
VisibilityMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter

reachabilityCache reachabilityCache
reachabilityCache reachabilityCache
}

var _ DeploymentStoreClient = (*DeploymentClientImpl)(nil)
Expand Down
6 changes: 3 additions & 3 deletions service/worker/deployment/deployment_reachability.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func getDeploymentReachability(
) (enumspb.DeploymentReachability, time.Time, error) {
// 1a. Reachable by new unpinned workflows
if isCurrent {
// TODO (carly): still return reachable if the deployment just became not current, but workflows started on it are not yet in reachability
// TODO (carly) part 2: still return reachable if the deployment just became not current, but workflows started on it are not yet in reachability
return enumspb.DEPLOYMENT_REACHABILITY_REACHABLE, time.Now(), nil
}

Expand Down Expand Up @@ -99,10 +99,10 @@ func makeCountRequest(

func makeDeploymentQuery(seriesName, buildID string, open bool) string {
var statusFilter string
deploymentFilter := "= " + worker_versioning.PinnedBuildIdSearchAttribute(&deploymentpb.Deployment{
deploymentFilter := fmt.Sprintf("= '%s'", worker_versioning.PinnedBuildIdSearchAttribute(&deploymentpb.Deployment{
SeriesName: seriesName,
BuildId: buildID,
})
}))
if open {
statusFilter = "= 'Running'"
} else {
Expand Down
7 changes: 3 additions & 4 deletions service/worker/deployment/deployment_reachability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ func TestMakeDeploymentQuery(t *testing.T) {
buildId := "A"

query := makeDeploymentQuery(seriesName, buildId, true)
expectedQuery := "BuildIds = 'reachability:Pinned:test-deployment:A' AND ExecutionStatus = 'Running'"
expectedQuery := "BuildIds = 'pinned:test-deployment:A' AND ExecutionStatus = 'Running'"
assert.Equal(t, expectedQuery, query)

query = makeDeploymentQuery(seriesName, buildId, false)
expectedQuery = "BuildIds = 'reachability:Pinned:test-deployment:A' AND ExecutionStatus != 'Running'"
expectedQuery = "BuildIds = 'pinned:test-deployment:A' AND ExecutionStatus != 'Running'"
assert.Equal(t, expectedQuery, query)
}

Expand All @@ -63,9 +63,8 @@ func TestReachable_CurrentDeployment(t *testing.T) {
vm := manager.NewMockVisibilityManager(gomock.NewController(t)) // won't receive any calls
testCache := newReachabilityCache(metrics.NoopMetricsHandler, vm, testReachabilityCacheOpenWFsTTL, testReachabilityCacheClosedWFsTTL)

reach, reachValidTime, err := getDeploymentReachability(ctx, "", "", seriesName, buildId, true, testCache)
reach, _, err := getDeploymentReachability(ctx, "", "", seriesName, buildId, true, testCache)
assert.Nil(t, err)
assert.Greater(t, time.Now(), reachValidTime)
assert.Equal(t, enumspb.DEPLOYMENT_REACHABILITY_REACHABLE, reach)
}

Expand Down
4 changes: 2 additions & 2 deletions service/worker/deployment/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func DeploymentStoreClientProvider(historyClient resource.HistoryClient, visibil
reachabilityCache: newReachabilityCache(
metrics.NoopMetricsHandler,
visibilityManager,
reachabilityCacheOpenWFsTTL, // TODO (carly) use dc (ie. config.ReachabilityCacheOpenWFsTTL)
reachabilityCacheClosedWFsTTL, // TODO (carly) use dc (ie. config.ReachabilityCacheClosedWFsTTL)
dynamicconfig.ReachabilityCacheOpenWFsTTL.Get(dc)(),
dynamicconfig.ReachabilityCacheClosedWFsTTL.Get(dc)(),
),
}
}
Expand Down
Loading

0 comments on commit 0c0a67e

Please sign in to comment.