Skip to content

Commit

Permalink
fix: Use deterministic execution names in scheduler (#5724)
Browse files Browse the repository at this point in the history
* feat: Use deterministic execution names in scheduler

Signed-off-by: Kevin Su <[email protected]>

* fix tests

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
pingsutw and eapolinario authored Sep 5, 2024
1 parent 5f69589 commit a058fd1
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 4 deletions.
14 changes: 14 additions & 0 deletions flyteadmin/pkg/common/naming/execution_name_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package naming

import (
"context"
"strings"
"testing"
"time"
Expand All @@ -9,6 +10,8 @@ import (

runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
runtimeMocks "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/mocks"
"github.com/flyteorg/flyte/flyteadmin/scheduler/identifier"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
)

const AllowedExecutionIDAlphabetStr = "abcdefghijklmnopqrstuvwxyz"
Expand Down Expand Up @@ -61,4 +64,15 @@ func TestGetExecutionName(t *testing.T) {
assert.Equal(t, 4, len(words), "FriendlyName should be split into exactly four words")
})

t.Run("deterministic name", func(t *testing.T) {
hashValue := identifier.HashScheduledTimeStamp(context.Background(), &core.Identifier{
Project: "Project",
Domain: "Domain",
Name: "Name",
Version: "Version",
}, time.Time{})

name := GetExecutionName(int64(hashValue))
assert.Equal(t, name, "carpet-juliet-kentucky-kentucky")
})
}
11 changes: 10 additions & 1 deletion flyteadmin/scheduler/executor/executor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"k8s.io/client-go/util/retry"

"github.com/flyteorg/flyte/flyteadmin/pkg/common/naming"
"github.com/flyteorg/flyte/flyteadmin/scheduler/identifier"
"github.com/flyteorg/flyte/flyteadmin/scheduler/repositories/models"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
Expand Down Expand Up @@ -52,7 +53,15 @@ func (w *executor) Execute(ctx context.Context, scheduledTime time.Time, s model
}
}

executionName := naming.GetExecutionName(time.Now().UnixNano())
// Making the identifier deterministic using the hash of the identifier and scheduled time
hashValue := identifier.HashScheduledTimeStamp(ctx, &core.Identifier{
Project: s.Project,
Domain: s.Domain,
Name: s.Name,
Version: s.Version,
}, scheduledTime)

executionName := naming.GetExecutionName(int64(hashValue))
executionRequest := &admin.ExecutionCreateRequest{
Project: s.Project,
Domain: s.Domain,
Expand Down
6 changes: 3 additions & 3 deletions flyteadmin/scheduler/identifier/identifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func GetScheduleName(ctx context.Context, s models.SchedulableEntity) string {

// GetExecutionIdentifier returns UUID using the hashed value of the schedule identifier and the scheduledTime
func GetExecutionIdentifier(ctx context.Context, identifier *core.Identifier, scheduledTime time.Time) (uuid.UUID, error) {
hashValue := hashScheduledTimeStamp(ctx, identifier, scheduledTime)
hashValue := HashScheduledTimeStamp(ctx, identifier, scheduledTime)
b := make([]byte, 16)
binary.LittleEndian.PutUint64(b, hashValue)
return uuid.FromBytes(b)
Expand All @@ -55,8 +55,8 @@ func hashIdentifier(ctx context.Context, identifier *core.Identifier) uint64 {
return h.Sum64()
}

// hashScheduledTimeStamp return the hash of the identifier and the scheduledTime
func hashScheduledTimeStamp(ctx context.Context, identifier *core.Identifier, scheduledTime time.Time) uint64 {
// HashScheduledTimeStamp return the hash of the identifier and the scheduledTime
func HashScheduledTimeStamp(ctx context.Context, identifier *core.Identifier, scheduledTime time.Time) uint64 {
h := fnv.New64()
_, err := h.Write([]byte(fmt.Sprintf(executionIDInputsFormat,
identifier.Project, identifier.Domain, identifier.Name, identifier.Version, scheduledTime.Unix())))
Expand Down

0 comments on commit a058fd1

Please sign in to comment.