Skip to content

Commit

Permalink
feat; Catalog caching & lineage information (flyteorg#110)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ketan Umare authored Jul 29, 2020
1 parent 06d9b79 commit 464028f
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 34 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/jinzhu/gorm v1.9.12
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/lib/pq v1.3.0
github.com/lyft/flyteidl v0.17.36
github.com/lyft/flyteidl v0.17.38
github.com/lyft/flytepropeller v0.2.64
github.com/lyft/flytestdlib v0.3.9
github.com/magiconair/properties v1.8.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,10 @@ github.com/lyft/flyteidl v0.17.34 h1:8ERT/8vY40dOPPJrdD8ossBb30WkvzUx/IAFMR/7+9U
github.com/lyft/flyteidl v0.17.34/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.36 h1:frCsRL9h4aoe+VnQSUhWM9FqZXjCAXcmR96Jt3Y+qVE=
github.com/lyft/flyteidl v0.17.36/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.37-0.20200721225522-b4751137e5ce h1:0yFcmwunllOOdjW8d7+BA6fwQzNYbzrefbbh3dfTHcg=
github.com/lyft/flyteidl v0.17.37-0.20200721225522-b4751137e5ce/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.38 h1:fAbIzyRvBvMMe5wC7qEjD2ehPlPhQCFu5G4eskPezcg=
github.com/lyft/flyteidl v0.17.38/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.3.11/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA=
github.com/lyft/flyteplugins v0.3.21/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A=
github.com/lyft/flyteplugins v0.3.33/go.mod h1:HHO6KC/2z77n9o9KM697YvSP85IWDe6jl6tAIrMLqWU=
Expand Down
11 changes: 11 additions & 0 deletions pkg/repositories/config/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,15 @@ var Migrations = []*gormigrate.Migration{
return tx.Exec("ALTER TABLE executions DROP COLUMN IF EXISTS task_id").Error
},
},

// NodeExecutions table has CacheStatus for Task nodes
{
ID: "2020-07-27-cachestatus",
Migrate: func(tx *gorm.DB) error {
return tx.AutoMigrate(&models.NodeExecution{}).Error
},
Rollback: func(tx *gorm.DB) error {
return tx.Model(&models.NodeExecution{}).DropColumn("cache_status").Error
},
},
}
4 changes: 2 additions & 2 deletions pkg/repositories/gormimpl/node_execution_repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func TestCreateNodeExecution(t *testing.T) {
nodeExecutionQuery := GlobalMock.NewMock()
nodeExecutionQuery.WithQuery(`INSERT INTO "node_executions" ("id","created_at","updated_at","deleted_at",` +
`"execution_project","execution_domain","execution_name","node_id","phase","input_uri","closure","started_at",` +
`"node_execution_created_at","node_execution_updated_at","duration","error_kind","error_code") VALUES ` +
`(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`)
`"node_execution_created_at","node_execution_updated_at","duration","error_kind","error_code","cache_status") VALUES ` +
`(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`)

nodeExecutionEventQuery := GlobalMock.NewMock()
nodeExecutionEventQuery.WithQuery(`INSERT INTO "node_execution_events" ("created_at","updated_at",` +
Expand Down
2 changes: 2 additions & 0 deletions pkg/repositories/models/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ type NodeExecution struct {
ErrorKind *string `gorm:"index"`
// Execution Error Code nullable. string value, but finite set determined by the execution engine and plugins
ErrorCode *string
// If the node is of Type Task, this should always exist for a successful execution, indicating the cache status for the execution
CacheStatus *string
}
20 changes: 17 additions & 3 deletions pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package transformers
import (
"context"

"github.com/lyft/flyteadmin/pkg/common"
"github.com/lyft/flytestdlib/logger"

"github.com/lyft/flyteadmin/pkg/common"

"github.com/golang/protobuf/proto"

"github.com/golang/protobuf/ptypes"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"

"github.com/lyft/flyteadmin/pkg/errors"
"github.com/lyft/flyteadmin/pkg/repositories/models"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"google.golang.org/grpc/codes"

"github.com/lyft/flyteadmin/pkg/errors"
"github.com/lyft/flyteadmin/pkg/repositories/models"
)

type ToNodeExecutionModelInput struct {
Expand Down Expand Up @@ -154,6 +156,18 @@ func UpdateNodeExecutionModel(
}
}

// Update TaskNodeMetadata, which includes caching information today.
if request.Event.GetTaskNodeMetadata() != nil {
st := request.Event.GetTaskNodeMetadata().GetCacheStatus().String()
nodeExecutionClosure.TargetMetadata = &admin.NodeExecutionClosure_TaskNodeMetadata{
TaskNodeMetadata: &admin.TaskNodeMetadata{
CacheStatus: request.Event.GetTaskNodeMetadata().GetCacheStatus(),
CatalogKey: request.Event.GetTaskNodeMetadata().GetCatalogKey(),
},
}
nodeExecutionModel.CacheStatus = &st
}

marshaledClosure, err := proto.Marshal(&nodeExecutionClosure)
if err != nil {
return errors.NewFlyteAdminErrorf(
Expand Down
106 changes: 78 additions & 28 deletions pkg/repositories/transformers/node_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"github.com/golang/protobuf/ptypes"

"github.com/golang/protobuf/proto"
"github.com/lyft/flyteadmin/pkg/repositories/models"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"
"github.com/stretchr/testify/assert"

"github.com/lyft/flyteadmin/pkg/repositories/models"
)

var occurredAt = time.Now().UTC()
Expand Down Expand Up @@ -158,38 +159,87 @@ func TestCreateNodeExecutionModel(t *testing.T) {
}

func TestUpdateNodeExecutionModel(t *testing.T) {
request := admin.NodeExecutionEventRequest{
Event: &event.NodeExecutionEvent{
Phase: core.NodeExecution_RUNNING,
OccurredAt: occurredAtProto,
TargetMetadata: &event.NodeExecutionEvent_WorkflowNodeMetadata{
WorkflowNodeMetadata: &event.WorkflowNodeMetadata{
t.Run("child-workflow", func(t *testing.T) {
request := admin.NodeExecutionEventRequest{
Event: &event.NodeExecutionEvent{
Phase: core.NodeExecution_RUNNING,
OccurredAt: occurredAtProto,
TargetMetadata: &event.NodeExecutionEvent_WorkflowNodeMetadata{
WorkflowNodeMetadata: &event.WorkflowNodeMetadata{
ExecutionId: childExecutionID,
},
},
},
}
nodeExecutionModel := models.NodeExecution{
Phase: core.NodeExecution_UNDEFINED.String(),
}
err := UpdateNodeExecutionModel(&request, &nodeExecutionModel, childExecutionID)
assert.Nil(t, err)
assert.Equal(t, core.NodeExecution_RUNNING.String(), nodeExecutionModel.Phase)
assert.Equal(t, occurredAt, *nodeExecutionModel.StartedAt)
assert.EqualValues(t, occurredAt, *nodeExecutionModel.NodeExecutionUpdatedAt)
assert.Nil(t, nodeExecutionModel.CacheStatus)

var closure = &admin.NodeExecutionClosure{
Phase: core.NodeExecution_RUNNING,
StartedAt: occurredAtProto,
UpdatedAt: occurredAtProto,
TargetMetadata: &admin.NodeExecutionClosure_WorkflowNodeMetadata{
WorkflowNodeMetadata: &admin.WorkflowNodeMetadata{
ExecutionId: childExecutionID,
},
},
},
}
nodeExecutionModel := models.NodeExecution{
Phase: core.NodeExecution_UNDEFINED.String(),
}
err := UpdateNodeExecutionModel(&request, &nodeExecutionModel, childExecutionID)
assert.Nil(t, err)
assert.Equal(t, core.NodeExecution_RUNNING.String(), nodeExecutionModel.Phase)
assert.Equal(t, occurredAt, *nodeExecutionModel.StartedAt)
assert.EqualValues(t, occurredAt, *nodeExecutionModel.NodeExecutionUpdatedAt)
}
var closureBytes, _ = proto.Marshal(closure)
assert.Equal(t, nodeExecutionModel.Closure, closureBytes)
})

var closure = &admin.NodeExecutionClosure{
Phase: core.NodeExecution_RUNNING,
StartedAt: occurredAtProto,
UpdatedAt: occurredAtProto,
TargetMetadata: &admin.NodeExecutionClosure_WorkflowNodeMetadata{
WorkflowNodeMetadata: &admin.WorkflowNodeMetadata{
ExecutionId: childExecutionID,
t.Run("task-node-metadata", func(t *testing.T) {
request := admin.NodeExecutionEventRequest{
Event: &event.NodeExecutionEvent{
Phase: core.NodeExecution_RUNNING,
OccurredAt: occurredAtProto,
TargetMetadata: &event.NodeExecutionEvent_TaskNodeMetadata{
TaskNodeMetadata: &event.TaskNodeMetadata{
CacheStatus: core.CatalogCacheStatus_CACHE_POPULATED,
CatalogKey: &core.CatalogMetadata{
DatasetId: &core.Identifier{
ResourceType: core.ResourceType_DATASET,
Name: "x",
Project: "proj",
Domain: "domain",
},
},
},
},
},
},
}
var closureBytes, _ = proto.Marshal(closure)
assert.Equal(t, nodeExecutionModel.Closure, closureBytes)
}
nodeExecutionModel := models.NodeExecution{
Phase: core.NodeExecution_UNDEFINED.String(),
}
err := UpdateNodeExecutionModel(&request, &nodeExecutionModel, childExecutionID)
assert.Nil(t, err)
assert.Equal(t, core.NodeExecution_RUNNING.String(), nodeExecutionModel.Phase)
assert.Equal(t, occurredAt, *nodeExecutionModel.StartedAt)
assert.EqualValues(t, occurredAt, *nodeExecutionModel.NodeExecutionUpdatedAt)
assert.NotNil(t, nodeExecutionModel.CacheStatus)
assert.Equal(t, *nodeExecutionModel.CacheStatus, request.Event.GetTaskNodeMetadata().CacheStatus.String())

var closure = &admin.NodeExecutionClosure{
Phase: core.NodeExecution_RUNNING,
StartedAt: occurredAtProto,
UpdatedAt: occurredAtProto,
TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{
TaskNodeMetadata: &admin.TaskNodeMetadata{
CacheStatus: request.Event.GetTaskNodeMetadata().CacheStatus,
CatalogKey: request.Event.GetTaskNodeMetadata().CatalogKey,
},
},
}
var closureBytes, _ = proto.Marshal(closure)
assert.Equal(t, nodeExecutionModel.Closure, closureBytes)
})
}

func TestFromNodeExecutionModel(t *testing.T) {
Expand Down

0 comments on commit 464028f

Please sign in to comment.