From a058fd15c5bf7a4f027295d2f7bb4d9851ccf094 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 5 Sep 2024 11:07:09 -0700 Subject: [PATCH] fix: Use deterministic execution names in scheduler (#5724) * feat: Use deterministic execution names in scheduler Signed-off-by: Kevin Su * fix tests Signed-off-by: Kevin Su * lint Signed-off-by: Kevin Su * lint Signed-off-by: Kevin Su --------- Signed-off-by: Kevin Su Signed-off-by: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Co-authored-by: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> --- .../pkg/common/naming/execution_name_test.go | 14 ++++++++++++++ flyteadmin/scheduler/executor/executor_impl.go | 11 ++++++++++- flyteadmin/scheduler/identifier/identifier.go | 6 +++--- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/flyteadmin/pkg/common/naming/execution_name_test.go b/flyteadmin/pkg/common/naming/execution_name_test.go index 22729dbb9b..91f04d3bf1 100644 --- a/flyteadmin/pkg/common/naming/execution_name_test.go +++ b/flyteadmin/pkg/common/naming/execution_name_test.go @@ -1,6 +1,7 @@ package naming import ( + "context" "strings" "testing" "time" @@ -9,6 +10,8 @@ import ( runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" runtimeMocks "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/mocks" + "github.com/flyteorg/flyte/flyteadmin/scheduler/identifier" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" ) const AllowedExecutionIDAlphabetStr = "abcdefghijklmnopqrstuvwxyz" @@ -61,4 +64,15 @@ func TestGetExecutionName(t *testing.T) { assert.Equal(t, 4, len(words), "FriendlyName should be split into exactly four words") }) + t.Run("deterministic name", func(t *testing.T) { + hashValue := identifier.HashScheduledTimeStamp(context.Background(), &core.Identifier{ + Project: "Project", + Domain: "Domain", + Name: "Name", + Version: "Version", + }, time.Time{}) + + name := GetExecutionName(int64(hashValue)) + assert.Equal(t, name, "carpet-juliet-kentucky-kentucky") + }) } diff --git a/flyteadmin/scheduler/executor/executor_impl.go b/flyteadmin/scheduler/executor/executor_impl.go index f3fd86c6cf..e269d79b2a 100644 --- a/flyteadmin/scheduler/executor/executor_impl.go +++ b/flyteadmin/scheduler/executor/executor_impl.go @@ -12,6 +12,7 @@ import ( "k8s.io/client-go/util/retry" "github.com/flyteorg/flyte/flyteadmin/pkg/common/naming" + "github.com/flyteorg/flyte/flyteadmin/scheduler/identifier" "github.com/flyteorg/flyte/flyteadmin/scheduler/repositories/models" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" @@ -52,7 +53,15 @@ func (w *executor) Execute(ctx context.Context, scheduledTime time.Time, s model } } - executionName := naming.GetExecutionName(time.Now().UnixNano()) + // Making the identifier deterministic using the hash of the identifier and scheduled time + hashValue := identifier.HashScheduledTimeStamp(ctx, &core.Identifier{ + Project: s.Project, + Domain: s.Domain, + Name: s.Name, + Version: s.Version, + }, scheduledTime) + + executionName := naming.GetExecutionName(int64(hashValue)) executionRequest := &admin.ExecutionCreateRequest{ Project: s.Project, Domain: s.Domain, diff --git a/flyteadmin/scheduler/identifier/identifier.go b/flyteadmin/scheduler/identifier/identifier.go index 5d386e8652..caf5c94296 100644 --- a/flyteadmin/scheduler/identifier/identifier.go +++ b/flyteadmin/scheduler/identifier/identifier.go @@ -34,7 +34,7 @@ func GetScheduleName(ctx context.Context, s models.SchedulableEntity) string { // GetExecutionIdentifier returns UUID using the hashed value of the schedule identifier and the scheduledTime func GetExecutionIdentifier(ctx context.Context, identifier *core.Identifier, scheduledTime time.Time) (uuid.UUID, error) { - hashValue := hashScheduledTimeStamp(ctx, identifier, scheduledTime) + hashValue := HashScheduledTimeStamp(ctx, identifier, scheduledTime) b := make([]byte, 16) binary.LittleEndian.PutUint64(b, hashValue) return uuid.FromBytes(b) @@ -55,8 +55,8 @@ func hashIdentifier(ctx context.Context, identifier *core.Identifier) uint64 { return h.Sum64() } -// hashScheduledTimeStamp return the hash of the identifier and the scheduledTime -func hashScheduledTimeStamp(ctx context.Context, identifier *core.Identifier, scheduledTime time.Time) uint64 { +// HashScheduledTimeStamp return the hash of the identifier and the scheduledTime +func HashScheduledTimeStamp(ctx context.Context, identifier *core.Identifier, scheduledTime time.Time) uint64 { h := fnv.New64() _, err := h.Write([]byte(fmt.Sprintf(executionIDInputsFormat, identifier.Project, identifier.Domain, identifier.Name, identifier.Version, scheduledTime.Unix())))