Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add per build id timestamp for when it was last made set default #4526

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 125 additions & 127 deletions api/persistence/v1/task_queues.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ message BuildId {
// (-- api-linter: core::0142::time-field-type=disabled
// aip.dev/not-precedent: Using HLC instead of wall clock. --)
temporal.server.api.clock.v1.HybridLogicalClock state_update_timestamp = 3;
// HLC timestamp representing when this build id was last made default in its version set.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// HLC timestamp representing when this build id was last made default in its version set.
// HLC timestamp representing when this build id was last made default in its version set, if ever.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently don't expose an API to add a build id without making it set default in a set

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point

// (-- api-linter: core::0142::time-field-type=disabled
// aip.dev/not-precedent: Using HLC instead of wall clock. --)
temporal.server.api.clock.v1.HybridLogicalClock became_default_timestamp = 4;
}

// An internal represenation of temporal.api.taskqueue.v1.CompatibleVersionSet
Expand All @@ -50,15 +54,10 @@ message CompatibleVersionSet {
repeated string set_ids = 1;
// All the compatible versions, unordered except for the last element, which is considered the set "default".
repeated BuildId build_ids = 2;
// HLC timestamp representing when the set default was updated. Different from BuildId.state_update_timestamp, which
// refers to the build ID status.
// (-- api-linter: core::0142::time-field-type=disabled
// aip.dev/not-precedent: Using HLC instead of wall clock. --)
temporal.server.api.clock.v1.HybridLogicalClock default_update_timestamp = 3;
// HLC timestamp representing when this set was last made the default for the queue.
// (-- api-linter: core::0142::time-field-type=disabled
// aip.dev/not-precedent: Using HLC instead of wall clock. --)
temporal.server.api.clock.v1.HybridLogicalClock queue_default_update_timestamp = 4;
temporal.server.api.clock.v1.HybridLogicalClock became_default_timestamp = 4;
}

// Holds all the data related to worker versioning for a task queue.
Expand Down
7 changes: 4 additions & 3 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1592,8 +1592,9 @@ func (e *matchingEngineImpl) reviveBuildId(ns *namespace.Namespace, taskQueue st
tag.WorkflowTaskQueueName(taskQueue),
tag.BuildId(buildId.Id))
return &persistencespb.BuildId{
Id: buildId.GetId(),
State: persistencespb.STATE_ACTIVE,
StateUpdateTimestamp: &stamp,
Id: buildId.GetId(),
State: persistencespb.STATE_ACTIVE,
StateUpdateTimestamp: &stamp,
BecameDefaultTimestamp: buildId.BecameDefaultTimestamp,
}
}
41 changes: 18 additions & 23 deletions service/matching/version_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func gatherBuildIds(data *persistencespb.VersioningData) map[string]struct{} {
return buildIds
}

// RemoveBuildIds removes given buildIds from versioning data.
// Assumes that build ids are safe to remove, ex: a set default is never removed unless it is a single set member and
// that set is not default for the queue.
func RemoveBuildIds(clock hlc.Clock, versioningData *persistencespb.VersioningData, buildIds []string) *persistencespb.VersioningData {
buildIdsMap := make(map[string]struct{}, len(buildIds))
for _, buildId := range buildIds {
Expand All @@ -113,9 +116,10 @@ func RemoveBuildIds(clock hlc.Clock, versioningData *persistencespb.VersioningDa
for buildIdIdx, buildId := range set.BuildIds {
if _, found := buildIdsMap[buildId.Id]; found {
set.BuildIds[buildIdIdx] = &persistencespb.BuildId{
Id: buildId.Id,
State: persistencespb.STATE_DELETED,
StateUpdateTimestamp: &clock,
Id: buildId.Id,
State: persistencespb.STATE_DELETED,
StateUpdateTimestamp: &clock,
BecameDefaultTimestamp: buildId.BecameDefaultTimestamp,
}
}
}
Expand Down Expand Up @@ -158,10 +162,9 @@ func shallowCloneVersioningData(data *persistencespb.VersioningData) *persistenc

func shallowCloneVersionSet(set *persistencespb.CompatibleVersionSet) *persistencespb.CompatibleVersionSet {
clone := &persistencespb.CompatibleVersionSet{
SetIds: set.SetIds,
BuildIds: make([]*persistencespb.BuildId, len(set.BuildIds)),
DefaultUpdateTimestamp: set.DefaultUpdateTimestamp,
QueueDefaultUpdateTimestamp: set.QueueDefaultUpdateTimestamp,
SetIds: set.SetIds,
BuildIds: make([]*persistencespb.BuildId, len(set.BuildIds)),
BecameDefaultTimestamp: set.BecameDefaultTimestamp,
}
copy(clone.BuildIds, set.BuildIds)
return clone
Expand Down Expand Up @@ -297,21 +300,13 @@ func updateImpl(timestamp hlc.Clock, data *persistencespb.VersioningData, req *w
}
// Merge the sets together, preserving the primary set's default by making it have the most recent timestamp.
primarySet := data.VersionSets[targetSetIdx]
primaryBuildId := primarySet.BuildIds[len(primarySet.BuildIds)-1]
primaryBuildId.BecameDefaultTimestamp = &timestamp
justPrimaryData := &persistencespb.VersioningData{
VersionSets: []*persistencespb.CompatibleVersionSet{{
SetIds: primarySet.SetIds,
BuildIds: primarySet.BuildIds,
DefaultUpdateTimestamp: &timestamp,
QueueDefaultUpdateTimestamp: primarySet.QueueDefaultUpdateTimestamp,
}},
VersionSets: []*persistencespb.CompatibleVersionSet{primarySet},
}
secondarySet := data.VersionSets[secondarySetIdx]
data.VersionSets[secondarySetIdx] = &persistencespb.CompatibleVersionSet{
SetIds: mergeSetIDs(primarySet.SetIds, secondarySet.SetIds),
BuildIds: secondarySet.BuildIds,
DefaultUpdateTimestamp: secondarySet.DefaultUpdateTimestamp,
QueueDefaultUpdateTimestamp: secondarySet.QueueDefaultUpdateTimestamp,
}
secondarySet.SetIds = mergeSetIDs(primarySet.SetIds, secondarySet.SetIds)
data = MergeVersioningData(justPrimaryData, data)
}

Expand Down Expand Up @@ -349,7 +344,7 @@ func findVersion(data *persistencespb.VersioningData, buildID string) (setIndex,

func makeDefaultSet(data *persistencespb.VersioningData, setIx int, timestamp *hlc.Clock) {
set := data.VersionSets[setIx]
set.QueueDefaultUpdateTimestamp = timestamp
set.BecameDefaultTimestamp = timestamp

if setIx < len(data.VersionSets)-1 {
// Move the set to the end and shift all the others down
Expand All @@ -359,16 +354,16 @@ func makeDefaultSet(data *persistencespb.VersioningData, setIx int, timestamp *h
}

func makeVersionInSetDefault(data *persistencespb.VersioningData, setIx, versionIx int, timestamp *hlc.Clock) {
data.VersionSets[setIx].DefaultUpdateTimestamp = timestamp
setVersions := data.VersionSets[setIx].BuildIds
buildId := setVersions[versionIx]
buildId.BecameDefaultTimestamp = timestamp
if len(setVersions) <= 1 {
return
}
if versionIx < len(setVersions)-1 {
// Move the build ID to the end and shift all the others down
moveMe := setVersions[versionIx]
copy(setVersions[versionIx:], setVersions[versionIx+1:])
setVersions[len(setVersions)-1] = moveMe
setVersions[len(setVersions)-1] = buildId
}
}

Expand Down
70 changes: 25 additions & 45 deletions service/matching/version_sets_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,39 +80,29 @@ type buildIDInfo struct {
func collectBuildIdInfo(sets []*persistencespb.CompatibleVersionSet) map[string]buildIDInfo {
buildIDToInfo := make(map[string]buildIDInfo, 0)
for _, set := range sets {
lastIdx := len(set.BuildIds) - 1
for setIdx, buildID := range set.BuildIds {
for _, buildID := range set.BuildIds {
if info, found := buildIDToInfo[buildID.Id]; found {
// A build ID appears in more than one source, merge its information, and track it
state := info.state
stateUpdateTimestamp := hlc.Max(*buildID.StateUpdateTimestamp, info.stateUpdateTimestamp)
if hlc.Equal(stateUpdateTimestamp, *buildID.StateUpdateTimestamp) {
state = buildID.State
}
madeDefaultAt := info.madeDefaultAt
if setIdx == lastIdx {
madeDefaultAt = hlc.Max(*set.DefaultUpdateTimestamp, madeDefaultAt)
}

buildIDToInfo[buildID.Id] = buildIDInfo{
state: state,
stateUpdateTimestamp: stateUpdateTimestamp,
setIDs: mergeSetIDs(info.setIDs, set.SetIds),
madeDefaultAt: madeDefaultAt,
setMadeDefaultAt: hlc.Max(*set.QueueDefaultUpdateTimestamp, info.setMadeDefaultAt),
madeDefaultAt: hlc.Max(*buildID.BecameDefaultTimestamp, info.madeDefaultAt),
setMadeDefaultAt: hlc.Max(*set.BecameDefaultTimestamp, info.setMadeDefaultAt),
}
} else {
// A build ID was seen for the first time, track it
madeDefaultAt := hlc.Zero(0)
if setIdx == lastIdx {
madeDefaultAt = *set.DefaultUpdateTimestamp
}
buildIDToInfo[buildID.Id] = buildIDInfo{
state: buildID.State,
stateUpdateTimestamp: *buildID.StateUpdateTimestamp,
setIDs: set.SetIds,
madeDefaultAt: madeDefaultAt,
setMadeDefaultAt: *set.QueueDefaultUpdateTimestamp,
madeDefaultAt: *buildID.BecameDefaultTimestamp,
setMadeDefaultAt: *set.BecameDefaultTimestamp,
}
}
}
Expand All @@ -123,54 +113,44 @@ func collectBuildIdInfo(sets []*persistencespb.CompatibleVersionSet) map[string]
func intoVersionSets(buildIDToInfo map[string]buildIDInfo) []*persistencespb.CompatibleVersionSet {
sets := make([]*persistencespb.CompatibleVersionSet, 0)
for id, info := range buildIDToInfo {
info := info
set := findSetWithSetIDs(sets, info.setIDs)
if set == nil {
set = &persistencespb.CompatibleVersionSet{
SetIds: info.setIDs,
BuildIds: make([]*persistencespb.BuildId, 0),
DefaultUpdateTimestamp: hlc.Ptr(hlc.Zero(0)),
QueueDefaultUpdateTimestamp: hlc.Ptr(hlc.Zero(0)),
SetIds: info.setIDs,
BuildIds: make([]*persistencespb.BuildId, 0),
BecameDefaultTimestamp: &info.setMadeDefaultAt,
}
sets = append(sets, set)
} else {
set.SetIds = mergeSetIDs(set.SetIds, info.setIDs)
set.BecameDefaultTimestamp = hlc.Ptr(hlc.Max(info.setMadeDefaultAt, *set.BecameDefaultTimestamp))
}
timestamp := info.stateUpdateTimestamp
buildID := &persistencespb.BuildId{
Id: id,
State: info.state,
StateUpdateTimestamp: &timestamp,
}
defaultTimestamp := info.madeDefaultAt
set.QueueDefaultUpdateTimestamp = hlc.Ptr(hlc.Max(info.setMadeDefaultAt, *set.QueueDefaultUpdateTimestamp))

// Insert the build ID in the right order based on whether it is the default or by its update timestamp
if hlc.Greater(*set.DefaultUpdateTimestamp, defaultTimestamp) {
// Can't be the last element, it's the default already
lastIdx := len(set.BuildIds) - 1
for idx, curr := range set.BuildIds {
if idx == lastIdx || hlc.Greater(*curr.StateUpdateTimestamp, timestamp) {
// Insert just before
set.BuildIds = append(set.BuildIds[:idx+1], set.BuildIds[idx:]...)
set.BuildIds[idx] = buildID
break
}
}
} else {
set.DefaultUpdateTimestamp = &defaultTimestamp
set.BuildIds = append(set.BuildIds, buildID)
Id: id,
State: info.state,
StateUpdateTimestamp: &info.stateUpdateTimestamp,
BecameDefaultTimestamp: &info.madeDefaultAt,
}
set.BuildIds = append(set.BuildIds, buildID)
}
// Sort the sets based on their default update timestamp, ensuring the default set comes last
sortSets(sets)
for _, set := range sets {
sortBuildIds(set.BuildIds)
}
return sets
}

func sortSets(sets []*persistencespb.CompatibleVersionSet) {
sort.Slice(sets, func(i, j int) bool {
si := sets[i]
sj := sets[j]
return hlc.Less(*si.QueueDefaultUpdateTimestamp, *sj.QueueDefaultUpdateTimestamp)
return hlc.Less(*sets[i].BecameDefaultTimestamp, *sets[j].BecameDefaultTimestamp)
})
}

func sortBuildIds(buildIds []*persistencespb.BuildId) {
sort.Slice(buildIds, func(i, j int) bool {
return hlc.Less(*buildIds[i].BecameDefaultTimestamp, *buildIds[j].BecameDefaultTimestamp)
})
}

Expand Down
46 changes: 16 additions & 30 deletions service/matching/version_sets_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,30 +44,18 @@ func buildID(wallclock int64, id string, optionalState ...persistencespb.BuildId
}

return &persistencespb.BuildId{
Id: id,
State: state,
StateUpdateTimestamp: fromWallClock(wallclock),
Id: id,
State: state,
StateUpdateTimestamp: fromWallClock(wallclock),
BecameDefaultTimestamp: fromWallClock(wallclock),
}
}

func mkBuildIds(buildIDs ...*persistencespb.BuildId) []*persistencespb.BuildId {
buildIDStructs := make([]*persistencespb.BuildId, len(buildIDs))
for i, buildID := range buildIDs {
buildIDStructs[i] = &persistencespb.BuildId{
Id: buildID.Id,
State: persistencespb.STATE_ACTIVE,
StateUpdateTimestamp: buildID.StateUpdateTimestamp,
}
}
return buildIDStructs
}

func mkSet(setID string, buildIDs ...*persistencespb.BuildId) *persistencespb.CompatibleVersionSet {
return &persistencespb.CompatibleVersionSet{
SetIds: []string{setID},
BuildIds: mkBuildIds(buildIDs...),
DefaultUpdateTimestamp: buildIDs[len(buildIDs)-1].StateUpdateTimestamp,
QueueDefaultUpdateTimestamp: buildIDs[len(buildIDs)-1].StateUpdateTimestamp,
SetIds: []string{setID},
BuildIds: buildIDs,
BecameDefaultTimestamp: buildIDs[len(buildIDs)-1].BecameDefaultTimestamp,
}
}

Expand Down Expand Up @@ -131,10 +119,9 @@ func TestSetMerge_DifferentSetIDs_MergesSetIDs(t *testing.T) {
b := mkSingleSetData("0.2", buildID(3, "0.2"))
expected := &persistencespb.VersioningData{
VersionSets: []*persistencespb.CompatibleVersionSet{{
SetIds: []string{"0.1", "0.2"},
BuildIds: mkBuildIds(buildID(1, "0.1"), buildID(6, "0.2")),
DefaultUpdateTimestamp: fromWallClock(6),
QueueDefaultUpdateTimestamp: fromWallClock(6),
SetIds: []string{"0.1", "0.2"},
BuildIds: []*persistencespb.BuildId{buildID(1, "0.1"), buildID(6, "0.2")},
BecameDefaultTimestamp: fromWallClock(6),
}},
}
assert.Equal(t, expected, MergeVersioningData(a, b))
Expand All @@ -158,10 +145,9 @@ func TestSetMerge_MultipleMatches_MergesSets(t *testing.T) {
}
expected := &persistencespb.VersioningData{
VersionSets: []*persistencespb.CompatibleVersionSet{{
SetIds: []string{"0.1", "0.2"},
BuildIds: mkBuildIds(buildID(1, "0.1"), buildID(3, "0.2")),
DefaultUpdateTimestamp: fromWallClock(3),
QueueDefaultUpdateTimestamp: fromWallClock(3),
SetIds: []string{"0.1", "0.2"},
BuildIds: []*persistencespb.BuildId{buildID(1, "0.1"), buildID(3, "0.2")},
BecameDefaultTimestamp: fromWallClock(3),
}},
}
assert.Equal(t, expected, MergeVersioningData(a, b))
Expand All @@ -170,16 +156,16 @@ func TestSetMerge_MultipleMatches_MergesSets(t *testing.T) {

func TestSetMerge_BuildIdPromoted_PreservesSetDefault(t *testing.T) {
a := mkSingleSetData("0.1", buildID(2, "0.1"), buildID(1, "0.2"))
a.VersionSets[0].DefaultUpdateTimestamp = fromWallClock(3)
a.VersionSets[0].BuildIds[len(a.VersionSets[0].BuildIds)-1].BecameDefaultTimestamp = fromWallClock(3)
b := mkSingleSetData("0.1", buildID(2, "0.1"), buildID(1, "0.2"))
b.VersionSets[0].DefaultUpdateTimestamp = fromWallClock(3)
b.VersionSets[0].BuildIds[len(b.VersionSets[0].BuildIds)-1].BecameDefaultTimestamp = fromWallClock(3)
assert.Equal(t, b, MergeVersioningData(a, b))
assert.Equal(t, b, MergeVersioningData(b, a))
}

func TestSetMerge_SetPromoted_PreservesGlobalDefault(t *testing.T) {
set01 := mkSet("0.1", buildID(1, "0.1"))
set01.QueueDefaultUpdateTimestamp = fromWallClock(3)
set01.BecameDefaultTimestamp = fromWallClock(3)
a := &persistencespb.VersioningData{
VersionSets: []*persistencespb.CompatibleVersionSet{
mkSet("0.2", buildID(2, "0.2")),
Expand Down
Loading