From 464028fab12197e4c6f4e0162ae45389ff16f1fa Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Tue, 28 Jul 2020 21:16:39 -0700 Subject: [PATCH] feat; Catalog caching & lineage information (#110) --- go.mod | 2 +- go.sum | 4 + pkg/repositories/config/migrations.go | 11 ++ .../gormimpl/node_execution_repo_test.go | 4 +- pkg/repositories/models/node_execution.go | 2 + .../transformers/node_execution.go | 20 +++- .../transformers/node_execution_test.go | 106 +++++++++++++----- 7 files changed, 115 insertions(+), 34 deletions(-) diff --git a/go.mod b/go.mod index c83db22ff..a288df48a 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/jinzhu/gorm v1.9.12 github.com/kelseyhightower/envconfig v1.4.0 // indirect github.com/lib/pq v1.3.0 - github.com/lyft/flyteidl v0.17.36 + github.com/lyft/flyteidl v0.17.38 github.com/lyft/flytepropeller v0.2.64 github.com/lyft/flytestdlib v0.3.9 github.com/magiconair/properties v1.8.1 diff --git a/go.sum b/go.sum index 76efec4f5..ba8c5bb31 100644 --- a/go.sum +++ b/go.sum @@ -458,6 +458,10 @@ github.com/lyft/flyteidl v0.17.34 h1:8ERT/8vY40dOPPJrdD8ossBb30WkvzUx/IAFMR/7+9U github.com/lyft/flyteidl v0.17.34/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteidl v0.17.36 h1:frCsRL9h4aoe+VnQSUhWM9FqZXjCAXcmR96Jt3Y+qVE= github.com/lyft/flyteidl v0.17.36/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= +github.com/lyft/flyteidl v0.17.37-0.20200721225522-b4751137e5ce h1:0yFcmwunllOOdjW8d7+BA6fwQzNYbzrefbbh3dfTHcg= +github.com/lyft/flyteidl v0.17.37-0.20200721225522-b4751137e5ce/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= +github.com/lyft/flyteidl v0.17.38 h1:fAbIzyRvBvMMe5wC7qEjD2ehPlPhQCFu5G4eskPezcg= +github.com/lyft/flyteidl v0.17.38/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteplugins v0.3.11/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= github.com/lyft/flyteplugins v0.3.21/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A= github.com/lyft/flyteplugins v0.3.33/go.mod h1:HHO6KC/2z77n9o9KM697YvSP85IWDe6jl6tAIrMLqWU= diff --git a/pkg/repositories/config/migrations.go b/pkg/repositories/config/migrations.go index d7673a729..8c4e243ad 100644 --- a/pkg/repositories/config/migrations.go +++ b/pkg/repositories/config/migrations.go @@ -222,4 +222,15 @@ var Migrations = []*gormigrate.Migration{ return tx.Exec("ALTER TABLE executions DROP COLUMN IF EXISTS task_id").Error }, }, + + // NodeExecutions table has CacheStatus for Task nodes + { + ID: "2020-07-27-cachestatus", + Migrate: func(tx *gorm.DB) error { + return tx.AutoMigrate(&models.NodeExecution{}).Error + }, + Rollback: func(tx *gorm.DB) error { + return tx.Model(&models.NodeExecution{}).DropColumn("cache_status").Error + }, + }, } diff --git a/pkg/repositories/gormimpl/node_execution_repo_test.go b/pkg/repositories/gormimpl/node_execution_repo_test.go index 702d99f75..ddace1540 100644 --- a/pkg/repositories/gormimpl/node_execution_repo_test.go +++ b/pkg/repositories/gormimpl/node_execution_repo_test.go @@ -32,8 +32,8 @@ func TestCreateNodeExecution(t *testing.T) { nodeExecutionQuery := GlobalMock.NewMock() nodeExecutionQuery.WithQuery(`INSERT INTO "node_executions" ("id","created_at","updated_at","deleted_at",` + `"execution_project","execution_domain","execution_name","node_id","phase","input_uri","closure","started_at",` + - `"node_execution_created_at","node_execution_updated_at","duration","error_kind","error_code") VALUES ` + - `(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`) + `"node_execution_created_at","node_execution_updated_at","duration","error_kind","error_code","cache_status") VALUES ` + + `(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`) nodeExecutionEventQuery := GlobalMock.NewMock() nodeExecutionEventQuery.WithQuery(`INSERT INTO "node_execution_events" ("created_at","updated_at",` + diff --git a/pkg/repositories/models/node_execution.go b/pkg/repositories/models/node_execution.go index 11ae4387f..7dae1c8f7 100644 --- a/pkg/repositories/models/node_execution.go +++ b/pkg/repositories/models/node_execution.go @@ -37,4 +37,6 @@ type NodeExecution struct { ErrorKind *string `gorm:"index"` // Execution Error Code nullable. string value, but finite set determined by the execution engine and plugins ErrorCode *string + // If the node is of Type Task, this should always exist for a successful execution, indicating the cache status for the execution + CacheStatus *string } diff --git a/pkg/repositories/transformers/node_execution.go b/pkg/repositories/transformers/node_execution.go index 6cec9e40b..9a403be0e 100644 --- a/pkg/repositories/transformers/node_execution.go +++ b/pkg/repositories/transformers/node_execution.go @@ -3,18 +3,20 @@ package transformers import ( "context" - "github.com/lyft/flyteadmin/pkg/common" "github.com/lyft/flytestdlib/logger" + "github.com/lyft/flyteadmin/pkg/common" + "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" - "github.com/lyft/flyteadmin/pkg/errors" - "github.com/lyft/flyteadmin/pkg/repositories/models" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" "google.golang.org/grpc/codes" + + "github.com/lyft/flyteadmin/pkg/errors" + "github.com/lyft/flyteadmin/pkg/repositories/models" ) type ToNodeExecutionModelInput struct { @@ -154,6 +156,18 @@ func UpdateNodeExecutionModel( } } + // Update TaskNodeMetadata, which includes caching information today. + if request.Event.GetTaskNodeMetadata() != nil { + st := request.Event.GetTaskNodeMetadata().GetCacheStatus().String() + nodeExecutionClosure.TargetMetadata = &admin.NodeExecutionClosure_TaskNodeMetadata{ + TaskNodeMetadata: &admin.TaskNodeMetadata{ + CacheStatus: request.Event.GetTaskNodeMetadata().GetCacheStatus(), + CatalogKey: request.Event.GetTaskNodeMetadata().GetCatalogKey(), + }, + } + nodeExecutionModel.CacheStatus = &st + } + marshaledClosure, err := proto.Marshal(&nodeExecutionClosure) if err != nil { return errors.NewFlyteAdminErrorf( diff --git a/pkg/repositories/transformers/node_execution_test.go b/pkg/repositories/transformers/node_execution_test.go index e7f64388e..5bca4b72c 100644 --- a/pkg/repositories/transformers/node_execution_test.go +++ b/pkg/repositories/transformers/node_execution_test.go @@ -7,11 +7,12 @@ import ( "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/proto" - "github.com/lyft/flyteadmin/pkg/repositories/models" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/event" "github.com/stretchr/testify/assert" + + "github.com/lyft/flyteadmin/pkg/repositories/models" ) var occurredAt = time.Now().UTC() @@ -158,38 +159,87 @@ func TestCreateNodeExecutionModel(t *testing.T) { } func TestUpdateNodeExecutionModel(t *testing.T) { - request := admin.NodeExecutionEventRequest{ - Event: &event.NodeExecutionEvent{ - Phase: core.NodeExecution_RUNNING, - OccurredAt: occurredAtProto, - TargetMetadata: &event.NodeExecutionEvent_WorkflowNodeMetadata{ - WorkflowNodeMetadata: &event.WorkflowNodeMetadata{ + t.Run("child-workflow", func(t *testing.T) { + request := admin.NodeExecutionEventRequest{ + Event: &event.NodeExecutionEvent{ + Phase: core.NodeExecution_RUNNING, + OccurredAt: occurredAtProto, + TargetMetadata: &event.NodeExecutionEvent_WorkflowNodeMetadata{ + WorkflowNodeMetadata: &event.WorkflowNodeMetadata{ + ExecutionId: childExecutionID, + }, + }, + }, + } + nodeExecutionModel := models.NodeExecution{ + Phase: core.NodeExecution_UNDEFINED.String(), + } + err := UpdateNodeExecutionModel(&request, &nodeExecutionModel, childExecutionID) + assert.Nil(t, err) + assert.Equal(t, core.NodeExecution_RUNNING.String(), nodeExecutionModel.Phase) + assert.Equal(t, occurredAt, *nodeExecutionModel.StartedAt) + assert.EqualValues(t, occurredAt, *nodeExecutionModel.NodeExecutionUpdatedAt) + assert.Nil(t, nodeExecutionModel.CacheStatus) + + var closure = &admin.NodeExecutionClosure{ + Phase: core.NodeExecution_RUNNING, + StartedAt: occurredAtProto, + UpdatedAt: occurredAtProto, + TargetMetadata: &admin.NodeExecutionClosure_WorkflowNodeMetadata{ + WorkflowNodeMetadata: &admin.WorkflowNodeMetadata{ ExecutionId: childExecutionID, }, }, - }, - } - nodeExecutionModel := models.NodeExecution{ - Phase: core.NodeExecution_UNDEFINED.String(), - } - err := UpdateNodeExecutionModel(&request, &nodeExecutionModel, childExecutionID) - assert.Nil(t, err) - assert.Equal(t, core.NodeExecution_RUNNING.String(), nodeExecutionModel.Phase) - assert.Equal(t, occurredAt, *nodeExecutionModel.StartedAt) - assert.EqualValues(t, occurredAt, *nodeExecutionModel.NodeExecutionUpdatedAt) + } + var closureBytes, _ = proto.Marshal(closure) + assert.Equal(t, nodeExecutionModel.Closure, closureBytes) + }) - var closure = &admin.NodeExecutionClosure{ - Phase: core.NodeExecution_RUNNING, - StartedAt: occurredAtProto, - UpdatedAt: occurredAtProto, - TargetMetadata: &admin.NodeExecutionClosure_WorkflowNodeMetadata{ - WorkflowNodeMetadata: &admin.WorkflowNodeMetadata{ - ExecutionId: childExecutionID, + t.Run("task-node-metadata", func(t *testing.T) { + request := admin.NodeExecutionEventRequest{ + Event: &event.NodeExecutionEvent{ + Phase: core.NodeExecution_RUNNING, + OccurredAt: occurredAtProto, + TargetMetadata: &event.NodeExecutionEvent_TaskNodeMetadata{ + TaskNodeMetadata: &event.TaskNodeMetadata{ + CacheStatus: core.CatalogCacheStatus_CACHE_POPULATED, + CatalogKey: &core.CatalogMetadata{ + DatasetId: &core.Identifier{ + ResourceType: core.ResourceType_DATASET, + Name: "x", + Project: "proj", + Domain: "domain", + }, + }, + }, + }, }, - }, - } - var closureBytes, _ = proto.Marshal(closure) - assert.Equal(t, nodeExecutionModel.Closure, closureBytes) + } + nodeExecutionModel := models.NodeExecution{ + Phase: core.NodeExecution_UNDEFINED.String(), + } + err := UpdateNodeExecutionModel(&request, &nodeExecutionModel, childExecutionID) + assert.Nil(t, err) + assert.Equal(t, core.NodeExecution_RUNNING.String(), nodeExecutionModel.Phase) + assert.Equal(t, occurredAt, *nodeExecutionModel.StartedAt) + assert.EqualValues(t, occurredAt, *nodeExecutionModel.NodeExecutionUpdatedAt) + assert.NotNil(t, nodeExecutionModel.CacheStatus) + assert.Equal(t, *nodeExecutionModel.CacheStatus, request.Event.GetTaskNodeMetadata().CacheStatus.String()) + + var closure = &admin.NodeExecutionClosure{ + Phase: core.NodeExecution_RUNNING, + StartedAt: occurredAtProto, + UpdatedAt: occurredAtProto, + TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ + TaskNodeMetadata: &admin.TaskNodeMetadata{ + CacheStatus: request.Event.GetTaskNodeMetadata().CacheStatus, + CatalogKey: request.Event.GetTaskNodeMetadata().CatalogKey, + }, + }, + } + var closureBytes, _ = proto.Marshal(closure) + assert.Equal(t, nodeExecutionModel.Closure, closureBytes) + }) } func TestFromNodeExecutionModel(t *testing.T) {