From cc3d24ff210a8ea58694976be35e26f9b4b28b48 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Wed, 24 Mar 2021 17:05:21 -0700 Subject: [PATCH 1/4] Using tasktemplate paths Signed-off-by: Ketan Umare --- go.mod | 2 +- go.sum | 4 ++-- pkg/controller/nodes/task/handler.go | 8 ++++---- pkg/controller/nodes/task/taskexec_context.go | 9 +++++++-- 4 files changed, 14 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index fad812291..6ba2b2645 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.10.0 github.com/flyteorg/flyteidl v0.18.25 - github.com/flyteorg/flyteplugins v0.5.40 + github.com/flyteorg/flyteplugins v0.5.41-0.20210325000109-3b0a6f610ea0 github.com/flyteorg/flytestdlib v0.3.13 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible diff --git a/go.sum b/go.sum index cd3a42f2f..12e51accc 100644 --- a/go.sum +++ b/go.sum @@ -232,8 +232,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/flyteorg/flyteidl v0.18.25 h1:XbHwM4G1u5nGAcdKod+ENgbL84cHdNzQIWY+NajuHs8= github.com/flyteorg/flyteidl v0.18.25/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= -github.com/flyteorg/flyteplugins v0.5.40 h1:3Vaat/CzMv87hIuloVRKsPussO0271TUmtbCzBMTAN8= -github.com/flyteorg/flyteplugins v0.5.40/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w= +github.com/flyteorg/flyteplugins v0.5.41-0.20210325000109-3b0a6f610ea0 h1:Gy0N9rPjSw3z6b2LntWUBnQoQTGjlx/8Fk+lfF/SIL8= +github.com/flyteorg/flyteplugins v0.5.41-0.20210325000109-3b0a6f610ea0/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w= github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index a5c22039f..bb28ae32e 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -672,8 +672,8 @@ func (t Handler) Abort(ctx context.Context, nCtx handler.NodeExecutionContext, r if r := recover(); r != nil { t.metrics.pluginPanics.Inc(ctx) stack := debug.Stack() - logger.Errorf(ctx, "Panic in plugin.Abort for TaskType [%s]", tCtx.tr.GetTaskType()) - err = fmt.Errorf("panic when executing a plugin for TaskType [%s]. Stack: [%s]", tCtx.tr.GetTaskType(), string(stack)) + logger.Errorf(ctx, "Panic in plugin.Abort for TaskType [%s]", ttype) + err = fmt.Errorf("panic when executing a plugin for TaskType [%s]. Stack: [%s]", ttype, string(stack)) } }() @@ -728,8 +728,8 @@ func (t Handler) Finalize(ctx context.Context, nCtx handler.NodeExecutionContext if r := recover(); r != nil { t.metrics.pluginPanics.Inc(ctx) stack := debug.Stack() - logger.Errorf(ctx, "Panic in plugin.Finalize for TaskType [%s]", tCtx.tr.GetTaskType()) - err = fmt.Errorf("panic when executing a plugin for TaskType [%s]. Stack: [%s]", tCtx.tr.GetTaskType(), string(stack)) + logger.Errorf(ctx, "Panic in plugin.Finalize for TaskType [%s]", ttype) + err = fmt.Errorf("panic when executing a plugin for TaskType [%s]. Stack: [%s]", ttype, string(stack)) } }() childCtx := context.WithValue(ctx, pluginContextKey, p.GetID()) diff --git a/pkg/controller/nodes/task/taskexec_context.go b/pkg/controller/nodes/task/taskexec_context.go index 336b08c40..eca600137 100644 --- a/pkg/controller/nodes/task/taskexec_context.go +++ b/pkg/controller/nodes/task/taskexec_context.go @@ -69,7 +69,7 @@ type taskExecutionContext struct { tm taskExecutionMetadata rm resourcemanager.TaskResourceManager psm *pluginStateManager - tr handler.TaskReader + tr pluginCore.TaskReader ow *ioutils.BufferedOutputWriter ber *bufferedEventRecorder sm pluginCore.SecretManager @@ -168,6 +168,11 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node maxAttempts = uint32(*nCtx.Node().GetRetryStrategy().MinAttempts) } + taskTemplatePath, err := ioutils.GetTaskTemplatePath(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetDataDir()) + if err != nil { + return nil, err + } + return &taskExecutionContext{ NodeExecutionContext: nCtx, tm: taskExecutionMetadata{ @@ -179,7 +184,7 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node rm: resourcemanager.GetTaskResourceManager( t.resourceManager, resourceNamespacePrefix, id), psm: psm, - tr: nCtx.TaskReader(), + tr: ioutils.NewLazyUploadingTaskReader(nCtx.TaskReader(), taskTemplatePath, nCtx.DataStore()), ow: ow, ber: newBufferedEventRecorder(), c: t.asyncCatalog, From 7b573af51fe2d9f6d03565e32ae7784f325c7de4 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Wed, 24 Mar 2021 23:30:52 -0700 Subject: [PATCH 2/4] Updated to use LazyUploadingTaskReader Signed-off-by: Ketan Umare --- pkg/controller/nodes/dynamic/handler.go | 3 +-- .../nodes/dynamic/mocks/task_node_handler.go | 14 +++++++------- pkg/controller/nodes/task/pre_post_execution.go | 4 +++- pkg/controller/nodes/task/taskexec_context_test.go | 3 +-- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/controller/nodes/dynamic/handler.go b/pkg/controller/nodes/dynamic/handler.go index 6ce388352..0225c740e 100644 --- a/pkg/controller/nodes/dynamic/handler.go +++ b/pkg/controller/nodes/dynamic/handler.go @@ -6,7 +6,6 @@ import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog" - pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/flyteorg/flytestdlib/logger" @@ -35,7 +34,7 @@ type TaskNodeHandler interface { handler.Node ValidateOutputAndCacheAdd(ctx context.Context, nodeID v1alpha1.NodeID, i io.InputReader, r io.OutputReader, outputCommitter io.OutputWriter, executionConfig v1alpha1.ExecutionConfig, - tr pluginCore.TaskReader, m catalog.Metadata) (catalog.Status, *io.ExecutionError, error) + tr ioutils.SimpleTaskReader, m catalog.Metadata) (catalog.Status, *io.ExecutionError, error) } type metrics struct { diff --git a/pkg/controller/nodes/dynamic/mocks/task_node_handler.go b/pkg/controller/nodes/dynamic/mocks/task_node_handler.go index d7b33c6cd..98a4e0a97 100644 --- a/pkg/controller/nodes/dynamic/mocks/task_node_handler.go +++ b/pkg/controller/nodes/dynamic/mocks/task_node_handler.go @@ -7,12 +7,12 @@ import ( catalog "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog" - core "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" - handler "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" io "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" + ioutils "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" + mock "github.com/stretchr/testify/mock" v1alpha1 "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" @@ -198,7 +198,7 @@ func (_m TaskNodeHandler_ValidateOutputAndCacheAdd) Return(_a0 catalog.Status, _ return &TaskNodeHandler_ValidateOutputAndCacheAdd{Call: _m.Call.Return(_a0, _a1, _a2)} } -func (_m *TaskNodeHandler) OnValidateOutputAndCacheAdd(ctx context.Context, nodeID string, i io.InputReader, r io.OutputReader, outputCommitter io.OutputWriter, executionConfig v1alpha1.ExecutionConfig, tr core.TaskReader, m catalog.Metadata) *TaskNodeHandler_ValidateOutputAndCacheAdd { +func (_m *TaskNodeHandler) OnValidateOutputAndCacheAdd(ctx context.Context, nodeID string, i io.InputReader, r io.OutputReader, outputCommitter io.OutputWriter, executionConfig v1alpha1.ExecutionConfig, tr ioutils.SimpleTaskReader, m catalog.Metadata) *TaskNodeHandler_ValidateOutputAndCacheAdd { c := _m.On("ValidateOutputAndCacheAdd", ctx, nodeID, i, r, outputCommitter, executionConfig, tr, m) return &TaskNodeHandler_ValidateOutputAndCacheAdd{Call: c} } @@ -209,18 +209,18 @@ func (_m *TaskNodeHandler) OnValidateOutputAndCacheAddMatch(matchers ...interfac } // ValidateOutputAndCacheAdd provides a mock function with given fields: ctx, nodeID, i, r, outputCommitter, executionConfig, tr, m -func (_m *TaskNodeHandler) ValidateOutputAndCacheAdd(ctx context.Context, nodeID string, i io.InputReader, r io.OutputReader, outputCommitter io.OutputWriter, executionConfig v1alpha1.ExecutionConfig, tr core.TaskReader, m catalog.Metadata) (catalog.Status, *io.ExecutionError, error) { +func (_m *TaskNodeHandler) ValidateOutputAndCacheAdd(ctx context.Context, nodeID string, i io.InputReader, r io.OutputReader, outputCommitter io.OutputWriter, executionConfig v1alpha1.ExecutionConfig, tr ioutils.SimpleTaskReader, m catalog.Metadata) (catalog.Status, *io.ExecutionError, error) { ret := _m.Called(ctx, nodeID, i, r, outputCommitter, executionConfig, tr, m) var r0 catalog.Status - if rf, ok := ret.Get(0).(func(context.Context, string, io.InputReader, io.OutputReader, io.OutputWriter, v1alpha1.ExecutionConfig, core.TaskReader, catalog.Metadata) catalog.Status); ok { + if rf, ok := ret.Get(0).(func(context.Context, string, io.InputReader, io.OutputReader, io.OutputWriter, v1alpha1.ExecutionConfig, ioutils.SimpleTaskReader, catalog.Metadata) catalog.Status); ok { r0 = rf(ctx, nodeID, i, r, outputCommitter, executionConfig, tr, m) } else { r0 = ret.Get(0).(catalog.Status) } var r1 *io.ExecutionError - if rf, ok := ret.Get(1).(func(context.Context, string, io.InputReader, io.OutputReader, io.OutputWriter, v1alpha1.ExecutionConfig, core.TaskReader, catalog.Metadata) *io.ExecutionError); ok { + if rf, ok := ret.Get(1).(func(context.Context, string, io.InputReader, io.OutputReader, io.OutputWriter, v1alpha1.ExecutionConfig, ioutils.SimpleTaskReader, catalog.Metadata) *io.ExecutionError); ok { r1 = rf(ctx, nodeID, i, r, outputCommitter, executionConfig, tr, m) } else { if ret.Get(1) != nil { @@ -229,7 +229,7 @@ func (_m *TaskNodeHandler) ValidateOutputAndCacheAdd(ctx context.Context, nodeID } var r2 error - if rf, ok := ret.Get(2).(func(context.Context, string, io.InputReader, io.OutputReader, io.OutputWriter, v1alpha1.ExecutionConfig, core.TaskReader, catalog.Metadata) error); ok { + if rf, ok := ret.Get(2).(func(context.Context, string, io.InputReader, io.OutputReader, io.OutputWriter, v1alpha1.ExecutionConfig, ioutils.SimpleTaskReader, catalog.Metadata) error); ok { r2 = rf(ctx, nodeID, i, r, outputCommitter, executionConfig, tr, m) } else { r2 = ret.Error(2) diff --git a/pkg/controller/nodes/task/pre_post_execution.go b/pkg/controller/nodes/task/pre_post_execution.go index ad3ab8659..3f3a54ae7 100644 --- a/pkg/controller/nodes/task/pre_post_execution.go +++ b/pkg/controller/nodes/task/pre_post_execution.go @@ -3,6 +3,8 @@ package task import ( "context" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog" pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" @@ -71,7 +73,7 @@ func (t *Handler) CheckCatalogCache(ctx context.Context, tr pluginCore.TaskReade func (t *Handler) ValidateOutputAndCacheAdd(ctx context.Context, nodeID v1alpha1.NodeID, i io.InputReader, r io.OutputReader, outputCommitter io.OutputWriter, executionConfig v1alpha1.ExecutionConfig, - tr pluginCore.TaskReader, m catalog.Metadata) (catalog.Status, *io.ExecutionError, error) { + tr ioutils.SimpleTaskReader, m catalog.Metadata) (catalog.Status, *io.ExecutionError, error) { tk, err := tr.Read(ctx) if err != nil { diff --git a/pkg/controller/nodes/task/taskexec_context_test.go b/pkg/controller/nodes/task/taskexec_context_test.go index 1c545a0ac..943bf521a 100644 --- a/pkg/controller/nodes/task/taskexec_context_test.go +++ b/pkg/controller/nodes/task/taskexec_context_test.go @@ -144,7 +144,7 @@ func TestHandler_newTaskExecutionContext(t *testing.T) { assert.Equal(t, got.psm.newStateVersion, uint8(10)) assert.NotNil(t, got.psm.newState) - assert.Equal(t, got.TaskReader(), tr) + assert.NotNil(t, got.TaskReader()) assert.Equal(t, got.MaxDatasetSizeBytes(), int64(1)) assert.NotNil(t, got.SecretManager()) @@ -159,7 +159,6 @@ func TestHandler_newTaskExecutionContext(t *testing.T) { assert.EqualValues(t, got.ResourceManager().(resourcemanager.TaskResourceManager).GetResourcePoolInfo(), make([]*event.ResourcePoolInfo, 0)) - // TODO @kumare fix this test assert.NotNil(t, got.rm) _, err = got.rm.AllocateResource(context.TODO(), "foo", "token", pluginCore.ResourceConstraintsSpec{}) From 0a37ff9c95140c9543b7b252e6c7d78c504c620a Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Fri, 26 Mar 2021 15:00:12 -0700 Subject: [PATCH 3/4] Update flyteplugins to v0.5.41 Signed-off-by: Ketan Umare --- cmd/controller/cmd/root.go | 1 - config.yaml | 6 +++--- go.mod | 2 +- go.sum | 4 ++-- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/cmd/controller/cmd/root.go b/cmd/controller/cmd/root.go index 3b0ec4808..11973771c 100644 --- a/cmd/controller/cmd/root.go +++ b/cmd/controller/cmd/root.go @@ -107,7 +107,6 @@ func initConfig(cmd *cobra.Command, _ []string) error { return err } - fmt.Printf("Started in-cluster mode\n") return nil } diff --git a/config.yaml b/config.yaml index 5cbbd2894..d23e706af 100644 --- a/config.yaml +++ b/config.yaml @@ -22,7 +22,7 @@ propeller: type: bucket rate: 100 capacity: 1000 - kube-config: "$HOME/.kube/config" + kube-config: "$HOME/kubeconfig/k3s/k3s.yaml" publish-k8s-events: true workflowStore: policy: "ResourceVersionCache" @@ -61,7 +61,7 @@ plugins: - FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage co-pilot: name: "flyte-copilot-" - image: "flyteplugins:24c62d97452ce83ad6b4fd24e0eea2b4c44ff0c6" + image: "ghcr.io/flyteorg/flytecopilot:v0.5.28" start-timeout: "5s" sagemaker: roleArn: "arn:aws:iam::123456789012:role/test-development" @@ -94,7 +94,7 @@ event: rate: 500 capacity: 1000 admin: - endpoint: localhost:80 + endpoint: localhost:30081 insecure: true catalog-cache: type: noop diff --git a/go.mod b/go.mod index 6ba2b2645..6c9eecae7 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.10.0 github.com/flyteorg/flyteidl v0.18.25 - github.com/flyteorg/flyteplugins v0.5.41-0.20210325000109-3b0a6f610ea0 + github.com/flyteorg/flyteplugins v0.5.41 github.com/flyteorg/flytestdlib v0.3.13 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible diff --git a/go.sum b/go.sum index 12e51accc..8cc5cb2b3 100644 --- a/go.sum +++ b/go.sum @@ -232,8 +232,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/flyteorg/flyteidl v0.18.25 h1:XbHwM4G1u5nGAcdKod+ENgbL84cHdNzQIWY+NajuHs8= github.com/flyteorg/flyteidl v0.18.25/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI= -github.com/flyteorg/flyteplugins v0.5.41-0.20210325000109-3b0a6f610ea0 h1:Gy0N9rPjSw3z6b2LntWUBnQoQTGjlx/8Fk+lfF/SIL8= -github.com/flyteorg/flyteplugins v0.5.41-0.20210325000109-3b0a6f610ea0/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w= +github.com/flyteorg/flyteplugins v0.5.41 h1:8n1Z55P59ICV4453Dk7fhaUbB944j3BMZ+ozywHczgU= +github.com/flyteorg/flyteplugins v0.5.41/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w= github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= From 9da7337fdcb9ec6faa4c09973e707abfd1e3dff2 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Fri, 26 Mar 2021 16:01:10 -0700 Subject: [PATCH 4/4] config.yaml updated Signed-off-by: Ketan Umare --- config.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/config.yaml b/config.yaml index d23e706af..01bb7501b 100644 --- a/config.yaml +++ b/config.yaml @@ -22,6 +22,7 @@ propeller: type: bucket rate: 100 capacity: 1000 + # This config assumes using `make start` in flytesnacks repo to startup a DinD k3s container kube-config: "$HOME/kubeconfig/k3s/k3s.yaml" publish-k8s-events: true workflowStore: