Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Use the LazyUploadingTaskReader #minor (#246)
Browse files Browse the repository at this point in the history
* Using tasktemplate paths

Signed-off-by: Ketan Umare <[email protected]>

* Updated to use LazyUploadingTaskReader

Signed-off-by: Ketan Umare <[email protected]>

* Update flyteplugins to v0.5.41

Signed-off-by: Ketan Umare <[email protected]>

* config.yaml updated

Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 committed May 25, 2021
1 parent c69bfc4 commit f8f28d5
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 25 deletions.
1 change: 0 additions & 1 deletion cmd/controller/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ func initConfig(cmd *cobra.Command, _ []string) error {
return err
}

fmt.Printf("Started in-cluster mode\n")
return nil
}

Expand Down
7 changes: 4 additions & 3 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ propeller:
type: bucket
rate: 100
capacity: 1000
kube-config: "$HOME/.kube/config"
# This config assumes using `make start` in flytesnacks repo to startup a DinD k3s container
kube-config: "$HOME/kubeconfig/k3s/k3s.yaml"
publish-k8s-events: true
workflowStore:
policy: "ResourceVersionCache"
Expand Down Expand Up @@ -61,7 +62,7 @@ plugins:
- FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage
co-pilot:
name: "flyte-copilot-"
image: "flyteplugins:24c62d97452ce83ad6b4fd24e0eea2b4c44ff0c6"
image: "ghcr.io/flyteorg/flytecopilot:v0.5.28"
start-timeout: "5s"
sagemaker:
roleArn: "arn:aws:iam::123456789012:role/test-development"
Expand Down Expand Up @@ -94,7 +95,7 @@ event:
rate: 500
capacity: 1000
admin:
endpoint: localhost:80
endpoint: localhost:30081
insecure: true
catalog-cache:
type: noop
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.18.25
github.com/flyteorg/flyteplugins v0.5.40
github.com/flyteorg/flyteplugins v0.5.41
github.com/flyteorg/flytestdlib v0.3.13
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.18.25 h1:XbHwM4G1u5nGAcdKod+ENgbL84cHdNzQIWY+NajuHs8=
github.com/flyteorg/flyteidl v0.18.25/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteplugins v0.5.40 h1:3Vaat/CzMv87hIuloVRKsPussO0271TUmtbCzBMTAN8=
github.com/flyteorg/flyteplugins v0.5.40/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w=
github.com/flyteorg/flyteplugins v0.5.41 h1:8n1Z55P59ICV4453Dk7fhaUbB944j3BMZ+ozywHczgU=
github.com/flyteorg/flyteplugins v0.5.41/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w=
github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog"
pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils"
"github.com/flyteorg/flytestdlib/logger"
Expand Down Expand Up @@ -35,7 +34,7 @@ type TaskNodeHandler interface {
handler.Node
ValidateOutputAndCacheAdd(ctx context.Context, nodeID v1alpha1.NodeID, i io.InputReader,
r io.OutputReader, outputCommitter io.OutputWriter, executionConfig v1alpha1.ExecutionConfig,
tr pluginCore.TaskReader, m catalog.Metadata) (catalog.Status, *io.ExecutionError, error)
tr ioutils.SimpleTaskReader, m catalog.Metadata) (catalog.Status, *io.ExecutionError, error)
}

type metrics struct {
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/nodes/dynamic/mocks/task_node_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,8 +672,8 @@ func (t Handler) Abort(ctx context.Context, nCtx handler.NodeExecutionContext, r
if r := recover(); r != nil {
t.metrics.pluginPanics.Inc(ctx)
stack := debug.Stack()
logger.Errorf(ctx, "Panic in plugin.Abort for TaskType [%s]", tCtx.tr.GetTaskType())
err = fmt.Errorf("panic when executing a plugin for TaskType [%s]. Stack: [%s]", tCtx.tr.GetTaskType(), string(stack))
logger.Errorf(ctx, "Panic in plugin.Abort for TaskType [%s]", ttype)
err = fmt.Errorf("panic when executing a plugin for TaskType [%s]. Stack: [%s]", ttype, string(stack))
}
}()

Expand Down Expand Up @@ -728,8 +728,8 @@ func (t Handler) Finalize(ctx context.Context, nCtx handler.NodeExecutionContext
if r := recover(); r != nil {
t.metrics.pluginPanics.Inc(ctx)
stack := debug.Stack()
logger.Errorf(ctx, "Panic in plugin.Finalize for TaskType [%s]", tCtx.tr.GetTaskType())
err = fmt.Errorf("panic when executing a plugin for TaskType [%s]. Stack: [%s]", tCtx.tr.GetTaskType(), string(stack))
logger.Errorf(ctx, "Panic in plugin.Finalize for TaskType [%s]", ttype)
err = fmt.Errorf("panic when executing a plugin for TaskType [%s]. Stack: [%s]", ttype, string(stack))
}
}()
childCtx := context.WithValue(ctx, pluginContextKey, p.GetID())
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/nodes/task/pre_post_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package task
import (
"context"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog"
pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
Expand Down Expand Up @@ -71,7 +73,7 @@ func (t *Handler) CheckCatalogCache(ctx context.Context, tr pluginCore.TaskReade

func (t *Handler) ValidateOutputAndCacheAdd(ctx context.Context, nodeID v1alpha1.NodeID, i io.InputReader,
r io.OutputReader, outputCommitter io.OutputWriter, executionConfig v1alpha1.ExecutionConfig,
tr pluginCore.TaskReader, m catalog.Metadata) (catalog.Status, *io.ExecutionError, error) {
tr ioutils.SimpleTaskReader, m catalog.Metadata) (catalog.Status, *io.ExecutionError, error) {

tk, err := tr.Read(ctx)
if err != nil {
Expand Down
9 changes: 7 additions & 2 deletions pkg/controller/nodes/task/taskexec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type taskExecutionContext struct {
tm taskExecutionMetadata
rm resourcemanager.TaskResourceManager
psm *pluginStateManager
tr handler.TaskReader
tr pluginCore.TaskReader
ow *ioutils.BufferedOutputWriter
ber *bufferedEventRecorder
sm pluginCore.SecretManager
Expand Down Expand Up @@ -168,6 +168,11 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node
maxAttempts = uint32(*nCtx.Node().GetRetryStrategy().MinAttempts)
}

taskTemplatePath, err := ioutils.GetTaskTemplatePath(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetDataDir())
if err != nil {
return nil, err
}

return &taskExecutionContext{
NodeExecutionContext: nCtx,
tm: taskExecutionMetadata{
Expand All @@ -179,7 +184,7 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node
rm: resourcemanager.GetTaskResourceManager(
t.resourceManager, resourceNamespacePrefix, id),
psm: psm,
tr: nCtx.TaskReader(),
tr: ioutils.NewLazyUploadingTaskReader(nCtx.TaskReader(), taskTemplatePath, nCtx.DataStore()),
ow: ow,
ber: newBufferedEventRecorder(),
c: t.asyncCatalog,
Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/nodes/task/taskexec_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestHandler_newTaskExecutionContext(t *testing.T) {
assert.Equal(t, got.psm.newStateVersion, uint8(10))
assert.NotNil(t, got.psm.newState)

assert.Equal(t, got.TaskReader(), tr)
assert.NotNil(t, got.TaskReader())
assert.Equal(t, got.MaxDatasetSizeBytes(), int64(1))
assert.NotNil(t, got.SecretManager())

Expand All @@ -159,7 +159,6 @@ func TestHandler_newTaskExecutionContext(t *testing.T) {

assert.EqualValues(t, got.ResourceManager().(resourcemanager.TaskResourceManager).GetResourcePoolInfo(), make([]*event.ResourcePoolInfo, 0))

// TODO @kumare fix this test
assert.NotNil(t, got.rm)

_, err = got.rm.AllocateResource(context.TODO(), "foo", "token", pluginCore.ResourceConstraintsSpec{})
Expand Down

0 comments on commit f8f28d5

Please sign in to comment.