Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Support GeneratedNameMaxLength property
Browse files Browse the repository at this point in the history
Signed-off-by: Filipe Regadas <[email protected]>
  • Loading branch information
regadas committed Mar 23, 2021
1 parent 979fabe commit 3bcaf57
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 14 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ require (
sigs.k8s.io/controller-runtime v0.8.2
)

replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d
replace (
github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d
github.com/flyteorg/flyteplugins => ../flyteplugins
)
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 h1:xJ0dAkuxJXfwdH7IaSzBEbSQxEDz36YUmt7+CB4zoNA=
Expand Down Expand Up @@ -234,8 +233,6 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4
github.com/flyteorg/flyteidl v0.18.17/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.24 h1:Y4+y/tu6Qsb3jNXxuVsflycfSocfthUi6XsMgJTfGuc=
github.com/flyteorg/flyteidl v0.18.24/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteplugins v0.5.38 h1:xAQ1J23cRxzwNDgzbmRuuvflq2PFetntRCjuM5RBfTw=
github.com/flyteorg/flyteplugins v0.5.38/go.mod h1:CxerBGWWEmNYmPxSMHnwQEr9cc1Fbo/g5fcABazU6Jo=
github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
Expand Down Expand Up @@ -1230,7 +1227,6 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20210217171935-8e2decd92398/go.mod h1:60tmSUpHxGPFerNHbo/ayI2lKxvtrhbxFyXuEIWJd78=
k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78=
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
logger.Infof(ctx, "Node level caching is disabled. Skipping catalog read.")
}

tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p.GetID())
tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p)
if err != nil {
return handler.UnknownTransition, errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "unable to create Handler execution context")
}
Expand Down Expand Up @@ -645,7 +645,7 @@ func (t Handler) Abort(ctx context.Context, nCtx handler.NodeExecutionContext, r
return errors.Wrapf(errors.UnsupportedTaskTypeError, nCtx.NodeID(), err, "unable to resolve plugin")
}

tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p.GetID())
tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p)
if err != nil {
return errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "unable to create Handler execution context")
}
Expand Down Expand Up @@ -701,7 +701,7 @@ func (t Handler) Finalize(ctx context.Context, nCtx handler.NodeExecutionContext
return errors.Wrapf(errors.UnsupportedTaskTypeError, nCtx.NodeID(), err, "unable to resolve plugin")
}

tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p.GetID())
tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p)
if err != nil {
return errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "unable to create Handler execution context")
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/controller/nodes/task/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,10 @@ func Test_task_Handle_Barrier(t *testing.T) {
assert.NoError(t, err)
tk.resourceManager = noopRm

tctx, err := tk.newTaskExecutionContext(context.TODO(), nCtx, "plugin1")
p := &pluginCoreMocks.Plugin{}
p.On("GetID").Return("plugin1")
p.OnGetProperties().Return(pluginCore.PluginProperties{})
tctx, err := tk.newTaskExecutionContext(context.TODO(), nCtx, p)
assert.NoError(t, err)
id := tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()

Expand Down Expand Up @@ -1322,12 +1325,14 @@ func Test_task_Abort(t *testing.T) {
{"abort-fails", fields{defaultPluginCallback: func() pluginCore.Plugin {
p := &pluginCoreMocks.Plugin{}
p.On("GetID").Return("id")
p.OnGetProperties().Return(pluginCore.PluginProperties{})
p.On("Abort", mock.Anything, mock.Anything).Return(fmt.Errorf("error"))
return p
}}, args{nil}, true, true},
{"abort-success", fields{defaultPluginCallback: func() pluginCore.Plugin {
p := &pluginCoreMocks.Plugin{}
p.On("GetID").Return("id")
p.OnGetProperties().Return(pluginCore.PluginProperties{})
p.On("Abort", mock.Anything, mock.Anything).Return(nil)
return p
}}, args{ev: &fakeBufferedTaskEventRecorder{}}, false, true},
Expand Down Expand Up @@ -1464,12 +1469,14 @@ func Test_task_Abort_v1(t *testing.T) {
{"abort-fails", fields{defaultPluginCallback: func() pluginCore.Plugin {
p := &pluginCoreMocks.Plugin{}
p.On("GetID").Return("id")
p.OnGetProperties().Return(pluginCore.PluginProperties{})
p.On("Abort", mock.Anything, mock.Anything).Return(fmt.Errorf("error"))
return p
}}, args{nil}, true, true},
{"abort-success", fields{defaultPluginCallback: func() pluginCore.Plugin {
p := &pluginCoreMocks.Plugin{}
p.On("GetID").Return("id")
p.OnGetProperties().Return(pluginCore.PluginProperties{})
p.On("Abort", mock.Anything, mock.Anything).Return(nil)
return p
}}, args{ev: &fakeBufferedTaskEventRecorder{}}, false, true},
Expand Down Expand Up @@ -1603,12 +1610,14 @@ func Test_task_Finalize(t *testing.T) {
{"finalize-fails", fields{defaultPluginCallback: func() pluginCore.Plugin {
p := &pluginCoreMocks.Plugin{}
p.On("GetID").Return("id")
p.OnGetProperties().Return(pluginCore.PluginProperties{})
p.On("Finalize", mock.Anything, mock.Anything).Return(fmt.Errorf("error"))
return p
}}, args{nCtx: nCtx}, true, true},
{"finalize-success", fields{defaultPluginCallback: func() pluginCore.Plugin {
p := &pluginCoreMocks.Plugin{}
p.On("GetID").Return("id")
p.OnGetProperties().Return(pluginCore.PluginProperties{})
p.On("Finalize", mock.Anything, mock.Anything).Return(nil)
return p
}}, args{nCtx: nCtx}, false, true},
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type PluginManager struct {
resourceLevelMonitor *ResourceLevelMonitor
disableInjectOwnerReferences bool
disableInjectFinalizer bool
GeneratedNameMaxLength *int
}

func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) {
Expand All @@ -120,7 +121,9 @@ func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetad
}

func (e *PluginManager) GetProperties() pluginsCore.PluginProperties {
return pluginsCore.PluginProperties{}
return pluginsCore.PluginProperties{
GeneratedNameMaxLength: e.GeneratedNameMaxLength,
}
}

func (e *PluginManager) GetID() string {
Expand Down Expand Up @@ -534,6 +537,7 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
resourceLevelMonitor: rm,
disableInjectOwnerReferences: entry.DisableInjectOwnerReferences,
disableInjectFinalizer: entry.DisableInjectFinalizer,
GeneratedNameMaxLength: entry.GeneratedNameMaxLength,
}, nil
}

Expand Down
14 changes: 11 additions & 3 deletions pkg/controller/nodes/task/taskexec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (t taskExecutionContext) SecretManager() pluginCore.SecretManager {
return t.sm
}

func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.NodeExecutionContext, pluginID string) (*taskExecutionContext, error) {
func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.NodeExecutionContext, plugin pluginCore.Plugin) (*taskExecutionContext, error) {
id := GetTaskExecutionIdentifier(nCtx)

currentNodeUniqueID := nCtx.NodeID()
Expand All @@ -136,7 +136,15 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node
}
}

uniqueID, err := utils.FixedLengthUniqueIDForParts(IDMaxLength, nCtx.NodeExecutionMetadata().GetOwnerID().Name, currentNodeUniqueID, strconv.Itoa(int(id.RetryAttempt)))
length := IDMaxLength
if l := plugin.GetProperties().GeneratedNameMaxLength; l != nil {
length = *l
if length <= 0 {
return nil, errors.Errorf(errors.RuntimeExecutionError, nCtx.NodeID(), "GeneratedNameMaxLength shoud greater then 0")
}
}

uniqueID, err := utils.FixedLengthUniqueIDForParts(length, nCtx.NodeExecutionMetadata().GetOwnerID().Name, currentNodeUniqueID, strconv.Itoa(int(id.RetryAttempt)))
if err != nil {
// SHOULD never really happen
return nil, err
Expand All @@ -157,7 +165,7 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node
return nil, errors.Wrapf(errors.RuntimeExecutionError, nCtx.NodeID(), err, "unable to initialize plugin state manager")
}

resourceNamespacePrefix := pluginCore.ResourceNamespace(t.resourceManager.GetID()).CreateSubNamespace(pluginCore.ResourceNamespace(pluginID))
resourceNamespacePrefix := pluginCore.ResourceNamespace(t.resourceManager.GetID()).CreateSubNamespace(pluginCore.ResourceNamespace(plugin.GetID()))
maxAttempts := uint32(DefaultMaxAttempts)
if nCtx.Node().GetRetryStrategy() != nil && nCtx.Node().GetRetryStrategy().MinAttempts != nil {
maxAttempts = uint32(*nCtx.Node().GetRetryStrategy().MinAttempts)
Expand Down
16 changes: 15 additions & 1 deletion pkg/controller/nodes/task/taskexec_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog/mocks"
pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
pluginCoreMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks"
ioMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils"
"github.com/flyteorg/flytestdlib/promutils"
Expand Down Expand Up @@ -116,7 +118,10 @@ func TestHandler_newTaskExecutionContext(t *testing.T) {
resourceManager: noopRm,
}

got, err := tk.newTaskExecutionContext(context.TODO(), nCtx, "plugin1")
p := &pluginCoreMocks.Plugin{}
p.On("GetID").Return("plugin1")
p.OnGetProperties().Return(pluginCore.PluginProperties{})
got, err := tk.newTaskExecutionContext(context.TODO(), nCtx, p)
assert.NoError(t, err)
assert.NotNil(t, got)

Expand Down Expand Up @@ -152,4 +157,13 @@ func TestHandler_newTaskExecutionContext(t *testing.T) {
assert.NotNil(t, got.ResourceManager())
assert.Nil(t, got.Catalog())
// assert.Equal(t, got.InputReader(), ir)

anotherPlugin := &pluginCoreMocks.Plugin{}
anotherPlugin.On("GetID").Return("plugin2")
maxLength := 8
anotherPlugin.OnGetProperties().Return(pluginCore.PluginProperties{
GeneratedNameMaxLength: &maxLength,
})
anotherTaskExecCtx, _ := tk.newTaskExecutionContext(context.TODO(), nCtx, anotherPlugin)
assert.Equal(t, anotherTaskExecCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), "fpmmhh6q")
}

0 comments on commit 3bcaf57

Please sign in to comment.