From c81a0a840490753e1a4b92d96a700094d11dd99d Mon Sep 17 00:00:00 2001 From: Andrew Chan Date: Mon, 9 Sep 2019 12:30:37 -0700 Subject: [PATCH 1/5] Cache task executions to DataCatalog --- Gopkg.lock | 14 + Gopkg.toml | 5 + config.yaml | 3 + pkg/controller/catalog/catalog_client.go | 20 +- .../{discovery_config.go => config.go} | 6 +- pkg/controller/catalog/config_flags.go | 38 +- pkg/controller/catalog/config_flags_test.go | 46 +- .../catalog/datacatalog/datacatalog.go | 291 ++++++++++++ .../catalog/datacatalog/datacatalog_test.go | 427 ++++++++++++++++++ .../datacatalog/mocks/DataCatalogClient.go | 163 +++++++ .../transformer/datacatalog_transformer.go | 144 ++++++ .../datacatalog_transformer_test.go | 132 ++++++ pkg/controller/controller.go | 5 +- pkg/controller/nodes/executor_test.go | 16 +- pkg/controller/nodes/task/handler.go | 19 +- pkg/controller/workflow/executor_test.go | 18 +- 16 files changed, 1265 insertions(+), 82 deletions(-) rename pkg/controller/catalog/{discovery_config.go => config.go} (73%) create mode 100644 pkg/controller/catalog/datacatalog/datacatalog.go create mode 100644 pkg/controller/catalog/datacatalog/datacatalog_test.go create mode 100644 pkg/controller/catalog/datacatalog/mocks/DataCatalogClient.go create mode 100644 pkg/controller/catalog/datacatalog/transformer/datacatalog_transformer.go create mode 100644 pkg/controller/catalog/datacatalog/transformer/datacatalog_transformer_test.go diff --git a/Gopkg.lock b/Gopkg.lock index 8ac4731381..513d0366e3 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -447,6 +447,15 @@ revision = "f55edac94c9bbba5d6182a4be46d86a2c9b5b50e" version = "v1.0.2" +[[projects]] + digest = "1:eaef68aaa87572012b236975f558582a043044c21be3fda97921c4871fb4298f" + name = "github.com/lyft/datacatalog" + packages = ["protos/gen"] + pruneopts = "" + revision = "0da0ffbb4705efd5d5ecd04ea560b35d968beb86" + source = "https://github.com/lyft/datacatalog" + version = "v0.1.0" + [[projects]] digest = "1:ef7b24655c09b19a0b397e8a58f8f15fc402b349484afad6ce1de0a8f21bb292" name = "github.com/lyft/flyteidl" @@ -1338,12 +1347,15 @@ "github.com/DiSiqueira/GoTree", "github.com/fatih/color", "github.com/ghodss/yaml", + "github.com/gogo/protobuf/proto", "github.com/golang/protobuf/jsonpb", "github.com/golang/protobuf/proto", "github.com/golang/protobuf/ptypes", "github.com/golang/protobuf/ptypes/struct", "github.com/golang/protobuf/ptypes/timestamp", + "github.com/google/uuid", "github.com/grpc-ecosystem/go-grpc-middleware/retry", + "github.com/lyft/datacatalog/protos/gen", "github.com/lyft/flyteidl/clients/go/admin", "github.com/lyft/flyteidl/clients/go/admin/mocks", "github.com/lyft/flyteidl/clients/go/coreutils", @@ -1384,6 +1396,7 @@ "golang.org/x/time/rate", "google.golang.org/grpc", "google.golang.org/grpc/codes", + "google.golang.org/grpc/credentials", "google.golang.org/grpc/status", "k8s.io/api/batch/v1", "k8s.io/api/core/v1", @@ -1399,6 +1412,7 @@ "k8s.io/apimachinery/pkg/util/rand", "k8s.io/apimachinery/pkg/util/runtime", "k8s.io/apimachinery/pkg/util/sets", + "k8s.io/apimachinery/pkg/util/uuid", "k8s.io/apimachinery/pkg/util/wait", "k8s.io/apimachinery/pkg/watch", "k8s.io/client-go/discovery", diff --git a/Gopkg.toml b/Gopkg.toml index 7d7e504096..c908a00657 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -51,6 +51,11 @@ required = [ source = "https://github.com/lyft/flyteidl" version = "^0.14.x" +[[constraint]] + name = "github.com/lyft/datacatalog" + source = "https://github.com/lyft/datacatalog" + version = "^0.1.x" + [[constraint]] name = "github.com/lyft/flyteplugins" source = "https://github.com/lyft/flyteplugins" diff --git a/config.yaml b/config.yaml index c50dbc4b31..6bb16eea30 100644 --- a/config.yaml +++ b/config.yaml @@ -88,6 +88,9 @@ event: admin: endpoint: localhost:8089 insecure: true +catalog-cache: + type: catalog + endpoint: datacatalog:8089 errors: show-source: true logger: diff --git a/pkg/controller/catalog/catalog_client.go b/pkg/controller/catalog/catalog_client.go index 53a9b8aa5a..af24c2b4d0 100644 --- a/pkg/controller/catalog/catalog_client.go +++ b/pkg/controller/catalog/catalog_client.go @@ -3,7 +3,10 @@ package catalog import ( "context" + "fmt" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytepropeller/pkg/controller/catalog/datacatalog" "github.com/lyft/flytestdlib/logger" "github.com/lyft/flytestdlib/storage" ) @@ -13,16 +16,25 @@ type Client interface { Put(ctx context.Context, task *core.TaskTemplate, execID *core.TaskExecutionIdentifier, inputPath storage.DataReference, outputPath storage.DataReference) error } -func NewCatalogClient(store storage.ProtobufStore) Client { +func NewCatalogClient(ctx context.Context, store storage.ProtobufStore) (Client, error) { catalogConfig := GetConfig() var catalogClient Client - if catalogConfig.Type == LegacyDiscoveryType { + var err error + switch catalogConfig.Type { + case LegacyDiscoveryType: catalogClient = NewLegacyDiscovery(catalogConfig.Endpoint, store) - } else if catalogConfig.Type == NoOpDiscoveryType { + case DataCatalogType: + catalogClient, err = datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Secure, store) + if err != nil { + return nil, err + } + case NoOpDiscoveryType, "": catalogClient = NewNoOpDiscovery() + default: + return nil, fmt.Errorf("No such catalog type available: %v", catalogConfig.Type) } logger.Infof(context.Background(), "Created Catalog client, type: %v", catalogConfig.Type) - return catalogClient + return catalogClient, nil } diff --git a/pkg/controller/catalog/discovery_config.go b/pkg/controller/catalog/config.go similarity index 73% rename from pkg/controller/catalog/discovery_config.go rename to pkg/controller/catalog/config.go index bbc1bab8ff..6d4f2c3376 100644 --- a/pkg/controller/catalog/discovery_config.go +++ b/pkg/controller/catalog/config.go @@ -21,11 +21,13 @@ type DiscoveryType = string const ( NoOpDiscoveryType DiscoveryType = "noop" LegacyDiscoveryType DiscoveryType = "legacy" + DataCatalogType DiscoveryType = "datacatalog" ) type Config struct { - Type DiscoveryType `json:"type" pflag:"\"noop\",Discovery Implementation to use"` - Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for discovery service"` + Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"` + Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"` + Secure bool `json:"secure" pflag:"true, Connect with TSL/SSL"` } // Gets loaded config for Discovery diff --git a/pkg/controller/catalog/config_flags.go b/pkg/controller/catalog/config_flags.go index d67dd751ac..b2359a462d 100755 --- a/pkg/controller/catalog/config_flags.go +++ b/pkg/controller/catalog/config_flags.go @@ -1,47 +1,21 @@ // Code generated by go generate; DO NOT EDIT. -// This file was generated by robots. +// This file was generated by robots at +// 2019-09-05 05:37:07.301294018 -0700 PDT m=+14.696460456 package catalog import ( - "encoding/json" - "reflect" - "fmt" "github.com/spf13/pflag" ) -// If v is a pointer, it will get its element value or the zero value of the element type. -// If v is not a pointer, it will return it as is. -func (Config) elemValueOrNil(v interface{}) interface{} { - if t := reflect.TypeOf(v); t.Kind() == reflect.Ptr { - if reflect.ValueOf(v).IsNil() { - return reflect.Zero(t.Elem()).Interface() - } else { - return reflect.ValueOf(v).Interface() - } - } else if v == nil { - return reflect.Zero(t).Interface() - } - - return v -} - -func (Config) mustMarshalJSON(v json.Marshaler) string { - raw, err := v.MarshalJSON() - if err != nil { - panic(err) - } - - return string(raw) -} - // GetPFlagSet will return strongly types pflags for all fields in Config and its nested types. The format of the // flags is json-name.json-sub-name... etc. -func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { +func (Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags := pflag.NewFlagSet("Config", pflag.ExitOnError) - cmdFlags.String(fmt.Sprintf("%v%v", prefix, "type"), defaultConfig.Type, "Discovery Implementation to use") - cmdFlags.String(fmt.Sprintf("%v%v", prefix, "endpoint"), defaultConfig.Endpoint, " Endpoint for discovery service") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "type"), "noop", " Catalog Implementation to use") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "endpoint"), "", " Endpoint for catalog service") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "secure"), true, " Connect with TSL/SSL") return cmdFlags } diff --git a/pkg/controller/catalog/config_flags_test.go b/pkg/controller/catalog/config_flags_test.go index a2538822b8..e5ce16b044 100755 --- a/pkg/controller/catalog/config_flags_test.go +++ b/pkg/controller/catalog/config_flags_test.go @@ -1,5 +1,6 @@ // Code generated by go generate; DO NOT EDIT. -// This file was generated by robots. +// This file was generated by robots at +// 2019-09-05 05:37:07.301294018 -0700 PDT m=+14.696460456 package catalog @@ -7,7 +8,6 @@ import ( "encoding/json" "fmt" "reflect" - "strings" "testing" "github.com/mitchellh/mapstructure" @@ -70,16 +70,6 @@ func decode_Config(input, result interface{}) error { return decoder.Decode(input) } -func join_Config(arr interface{}, sep string) string { - listValue := reflect.ValueOf(arr) - strs := make([]string, 0, listValue.Len()) - for i := 0; i < listValue.Len(); i++ { - strs = append(strs, fmt.Sprintf("%v", listValue.Index(i))) - } - - return strings.Join(strs, sep) -} - func testDecodeJson_Config(t *testing.T, val, result interface{}) { assert.NoError(t, decode_Config(val, result)) } @@ -103,16 +93,14 @@ func TestConfig_SetFlags(t *testing.T) { t.Run("DefaultValue", func(t *testing.T) { // Test that default value is set properly if vString, err := cmdFlags.GetString("type"); err == nil { - assert.Equal(t, string(defaultConfig.Type), vString) + assert.Equal(t, "noop", vString) } else { assert.FailNow(t, err.Error()) } }) t.Run("Override", func(t *testing.T) { - testValue := "1" - - cmdFlags.Set("type", testValue) + cmdFlags.Set("type", "1") if vString, err := cmdFlags.GetString("type"); err == nil { testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.Type) @@ -125,16 +113,14 @@ func TestConfig_SetFlags(t *testing.T) { t.Run("DefaultValue", func(t *testing.T) { // Test that default value is set properly if vString, err := cmdFlags.GetString("endpoint"); err == nil { - assert.Equal(t, string(defaultConfig.Endpoint), vString) + assert.Equal(t, "", vString) } else { assert.FailNow(t, err.Error()) } }) t.Run("Override", func(t *testing.T) { - testValue := "1" - - cmdFlags.Set("endpoint", testValue) + cmdFlags.Set("endpoint", "1") if vString, err := cmdFlags.GetString("endpoint"); err == nil { testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.Endpoint) @@ -143,4 +129,24 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_secure", func(t *testing.T) { + t.Run("DefaultValue", func(t *testing.T) { + // Test that default value is set properly + if vBool, err := cmdFlags.GetBool("secure"); err == nil { + assert.Equal(t, true, vBool) + } else { + assert.FailNow(t, err.Error()) + } + }) + + t.Run("Override", func(t *testing.T) { + cmdFlags.Set("secure", "1") + if vBool, err := cmdFlags.GetBool("secure"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vBool), &actual.Secure) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) } diff --git a/pkg/controller/catalog/datacatalog/datacatalog.go b/pkg/controller/catalog/datacatalog/datacatalog.go new file mode 100644 index 0000000000..672da3f3f5 --- /dev/null +++ b/pkg/controller/catalog/datacatalog/datacatalog.go @@ -0,0 +1,291 @@ +package datacatalog + +import ( + "context" + "crypto/x509" + "time" + + "fmt" + + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" + datacatalog "github.com/lyft/datacatalog/protos/gen" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytepropeller/pkg/controller/catalog/datacatalog/transformer" + "github.com/lyft/flytestdlib/logger" + "github.com/lyft/flytestdlib/storage" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/status" + "k8s.io/apimachinery/pkg/util/uuid" +) + +const ( + taskVersionKey = "task-version" + taskExecKey = "execution-name" + taskExecVersion = "execution-version" +) + +type CatalogClient struct { + client datacatalog.DataCatalogClient + store storage.ProtobufStore +} + +func (m *CatalogClient) getArtifactByTag(ctx context.Context, tagName string, dataset *datacatalog.Dataset) (*datacatalog.Artifact, error) { + logger.Debugf(ctx, "Get Artifact by tag %v", tagName) + artifactQuery := &datacatalog.GetArtifactRequest{ + Dataset: dataset.Id, + QueryHandle: &datacatalog.GetArtifactRequest_TagName{ + TagName: tagName, + }, + } + response, err := m.client.GetArtifact(ctx, artifactQuery) + if err != nil { + return nil, err + } + + return response.Artifact, nil +} + +func (m *CatalogClient) getDataset(ctx context.Context, task *core.TaskTemplate) (*datacatalog.Dataset, error) { + datasetID, err := transformer.GenerateDatasetIDForTask(ctx, task) + if err != nil { + return nil, err + } + logger.Debugf(ctx, "Get Dataset %v", datasetID) + + dsQuery := &datacatalog.GetDatasetRequest{ + Dataset: datasetID, + } + + datasetResponse, err := m.client.GetDataset(ctx, dsQuery) + if err != nil { + return nil, err + } + + return datasetResponse.Dataset, nil +} + +func (m *CatalogClient) validateTask(task *core.TaskTemplate) error { + taskInterface := task.Interface + if taskInterface == nil { + return fmt.Errorf("Task interface cannot be nil, task: [%+v]", task) + } + + if task.Id == nil { + return fmt.Errorf("Task ID cannot be nil, task: [%+v]", task) + } + + if task.Metadata == nil { + return fmt.Errorf("Task metadata cannot be nil, task: [%+v]", task) + } + + return nil +} + +// Get the cached task execution from Catalog. +// These are the steps taken: +// - Verify there is a Dataset created for the Task +// - Lookup the Artifact that is tagged with the hash of the input values +// - The artifactData contains the literal values that serve as the task outputs +func (m *CatalogClient) Get(ctx context.Context, task *core.TaskTemplate, inputPath storage.DataReference) (*core.LiteralMap, error) { + inputs := &core.LiteralMap{} + + if err := m.validateTask(task); err != nil { + logger.Errorf(ctx, "DataCatalog task validation failed %+v, err: %+v", task, err) + return nil, err + } + + if task.Interface.Inputs != nil && len(task.Interface.Inputs.Variables) != 0 { + if err := m.store.ReadProtobuf(ctx, inputPath, inputs); err != nil { + logger.Errorf(ctx, "DataCatalog failed to read inputs %+v, err: %+v", inputPath, err) + return nil, err + } + logger.Debugf(ctx, "DataCatalog read inputs from %v", inputPath) + } + + dataset, err := m.getDataset(ctx, task) + if err != nil { + logger.Errorf(ctx, "DataCatalog failed to get dataset for task %+v, err: %+v", task, err) + return nil, err + } + + tag, err := transformer.GenerateArtifactTagName(ctx, inputs) + if err != nil { + logger.Errorf(ctx, "DataCatalog failed to generate tag for inputs %+v, err: %+v", inputs, err) + return nil, err + } + + artifact, err := m.getArtifactByTag(ctx, tag, dataset) + if err != nil { + logger.Errorf(ctx, "DataCatalog failed to get artifact by tag %+v, err: %+v", tag, err) + return nil, err + } + logger.Debugf(ctx, "Artifact found %v from tag %v", artifact, tag) + + outputs, err := transformer.GenerateTaskOutputsFromArtifact(task, artifact) + if err != nil { + logger.Errorf(ctx, "DataCatalog failed to get outputs from artifact %+v, err: %+v", artifact.Id, err) + return nil, err + } + + logger.Debugf(ctx, "Cached %v artifact outputs from artifact %v", len(outputs.Literals), artifact.Id) + return outputs, nil +} + +// Catalog the task execution as a cached Artifact. We associate an Artifact as the cached data by tagging the Artifact +// with the hash of the input values. +// +// The steps taken to cache an execution: +// - Ensure a Dataset exists for the Artifact. The Dataset represents the proj/domain/name/version of the task +// - Create an Artifact with the execution data that belongs to the dataset +// - Tag the Artifact with a hash generated by the input values +func (m *CatalogClient) Put(ctx context.Context, task *core.TaskTemplate, execID *core.TaskExecutionIdentifier, inputPath storage.DataReference, outputPath storage.DataReference) error { + inputs := &core.LiteralMap{} + outputs := &core.LiteralMap{} + + if err := m.validateTask(task); err != nil { + logger.Errorf(ctx, "DataCatalog task validation failed %+v, err: %+v", task, err) + return err + } + + if task.Interface.Inputs != nil && len(task.Interface.Inputs.Variables) != 0 { + if err := m.store.ReadProtobuf(ctx, inputPath, inputs); err != nil { + logger.Errorf(ctx, "DataCatalog failed to read inputs %+v, err: %+v", inputPath, err) + return err + } + logger.Debugf(ctx, "DataCatalog read inputs from %v", inputPath) + } + + if task.Interface.Outputs != nil && len(task.Interface.Outputs.Variables) != 0 { + if err := m.store.ReadProtobuf(ctx, outputPath, outputs); err != nil { + logger.Errorf(ctx, "DataCatalog failed to read outputs %+v, err: %+v", outputPath, err) + return err + } + logger.Debugf(ctx, "DataCatalog read outputs from %v", outputPath) + } + + datasetID, err := transformer.GenerateDatasetIDForTask(ctx, task) + if err != nil { + logger.Errorf(ctx, "DataCatalog failed to generate dataset for task %+v, err: %+v", task, err) + return err + } + + logger.Debugf(ctx, "DataCatalog put into Catalog for DataSet %v", datasetID) + + // Try creating the dataset in case it doesn't exist + newDataset := &datacatalog.Dataset{ + Id: datasetID, + Metadata: &datacatalog.Metadata{ + KeyMap: map[string]string{ + taskVersionKey: task.Id.Version, + taskExecKey: execID.TaskId.Name, + }, + }, + } + + _, err = m.client.CreateDataset(ctx, &datacatalog.CreateDatasetRequest{Dataset: newDataset}) + if err != nil { + logger.Debugf(ctx, "Create dataset %v return err %v", datasetID, err) + + if status.Code(err) == codes.AlreadyExists { + logger.Debugf(ctx, "Create Dataset for task %v already exists", task.Id) + } else { + logger.Errorf(ctx, "Unable to create dataset %+v, err: %+v", datasetID, err) + return err + } + } + + // Create the artifact for the execution that belongs in the task + artifactDataList := make([]*datacatalog.ArtifactData, 0, len(outputs.Literals)) + for name, value := range outputs.Literals { + artifactData := &datacatalog.ArtifactData{ + Name: name, + Value: value, + } + artifactDataList = append(artifactDataList, artifactData) + } + + artifactMetadata := &datacatalog.Metadata{ + KeyMap: map[string]string{ + taskExecVersion: execID.TaskId.Version, + taskExecKey: execID.TaskId.Name, + }, + } + + cachedArtifact := &datacatalog.Artifact{ + Id: string(uuid.NewUUID()), + Dataset: datasetID, + Data: artifactDataList, + Metadata: artifactMetadata, + } + + createArtifactRequest := &datacatalog.CreateArtifactRequest{Artifact: cachedArtifact} + _, err = m.client.CreateArtifact(ctx, createArtifactRequest) + if err != nil { + logger.Errorf(ctx, "Failed to create Artifact %+v, err: %v", cachedArtifact, err) + return err + } + logger.Debugf(ctx, "Created artifact: %v, with %v outputs from execution %v", cachedArtifact.Id, len(artifactDataList), execID.TaskId.Name) + + // Tag the artifact since it is the cached artifact + tagName, err := transformer.GenerateArtifactTagName(ctx, inputs) + if err != nil { + logger.Errorf(ctx, "Failed to create tag for artifact %+v, err: %+v", cachedArtifact.Id, err) + return err + } + logger.Debugf(ctx, "Created tag: %v, for task: %v", tagName, task.Id) + + // TODO: We should create the artifact + tag in a transaction when the service supports that + tag := &datacatalog.Tag{ + Name: tagName, + Dataset: datasetID, + ArtifactId: cachedArtifact.Id, + } + _, err = m.client.AddTag(ctx, &datacatalog.AddTagRequest{Tag: tag}) + if err != nil { + if status.Code(err) == codes.AlreadyExists { + logger.Errorf(ctx, "Tag %v already exists for Artifact %v (idempotent)", tagName, cachedArtifact.Id) + } + + logger.Errorf(ctx, "Failed to add tag %+v for artifact %+v, err: %+v", tagName, cachedArtifact.Id, err) + return err + } + + return nil +} + +func NewDataCatalog(ctx context.Context, endpoint string, secureConnection bool, datastore storage.ProtobufStore) (*CatalogClient, error) { + var opts []grpc.DialOption + + grpcOptions := []grpc_retry.CallOption{ + grpc_retry.WithBackoff(grpc_retry.BackoffLinear(100 * time.Millisecond)), + grpc_retry.WithCodes(codes.DeadlineExceeded, codes.Unavailable, codes.Canceled), + grpc_retry.WithMax(5), + } + + if secureConnection { + pool, err := x509.SystemCertPool() + if err != nil { + return nil, err + } + + creds := credentials.NewClientTLSFromCert(pool, "") + opts = append(opts, grpc.WithTransportCredentials(creds)) + } + + retryInterceptor := grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(grpcOptions...)) + + opts = append(opts, retryInterceptor) + clientConn, err := grpc.Dial(endpoint, opts...) + if err != nil { + return nil, err + } + + client := datacatalog.NewDataCatalogClient(clientConn) + + return &CatalogClient{ + client: client, + store: datastore, + }, nil +} diff --git a/pkg/controller/catalog/datacatalog/datacatalog_test.go b/pkg/controller/catalog/datacatalog/datacatalog_test.go new file mode 100644 index 0000000000..d25b94165a --- /dev/null +++ b/pkg/controller/catalog/datacatalog/datacatalog_test.go @@ -0,0 +1,427 @@ +package datacatalog + +import ( + "context" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/google/uuid" + datacatalog "github.com/lyft/datacatalog/protos/gen" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytepropeller/pkg/controller/catalog/datacatalog/mocks" + "github.com/lyft/flytestdlib/contextutils" + "github.com/lyft/flytestdlib/promutils" + "github.com/lyft/flytestdlib/promutils/labeled" + "github.com/lyft/flytestdlib/storage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func init() { + labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey) +} + +func createInmemoryStore(t testing.TB) *storage.DataStore { + cfg := storage.Config{ + Type: storage.TypeMemory, + } + + d, err := storage.NewDataStore(&cfg, promutils.NewTestScope()) + assert.NoError(t, err) + + return d +} + +func newStringLiteral(value string) *core.Literal { + return &core.Literal{ + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_StringValue{ + StringValue: value, + }, + }, + }, + }, + }, + } +} + +var sampleParameters = &core.LiteralMap{Literals: map[string]*core.Literal{ + "out1": newStringLiteral("output1-stringval"), +}} + +var variableMap = &core.VariableMap{ + Variables: map[string]*core.Variable{ + "test": &core.Variable{ + Type: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_STRING, + }, + }, + }, + }, +} + +var typedInterface = &core.TypedInterface{ + Inputs: variableMap, + Outputs: variableMap, +} + +var sampleTask = &core.TaskTemplate{ + Id: &core.Identifier{Project: "project", Domain: "domain", Name: "name"}, + Interface: typedInterface, + Metadata: &core.TaskMetadata{ + DiscoveryVersion: "1.0.0", + }, +} + +var noInputOutputTask = &core.TaskTemplate{ + Id: &core.Identifier{Project: "project", Domain: "domain", Name: "name"}, + Metadata: &core.TaskMetadata{ + DiscoveryVersion: "1.0.0", + }, + Interface: &core.TypedInterface{}, +} + +var datasetID = &datacatalog.DatasetID{ + Project: "project", + Domain: "domain", + Name: "flyte_task-name", + Version: "1.0.0-ue5g6uuI-ue5g6uuI", +} + +func assertGrpcErr(t *testing.T, err error, code codes.Code) { + assert.Equal(t, code, status.Code(err)) +} + +func TestCatalog_Get(t *testing.T) { + + ctx := context.Background() + testFile := storage.DataReference("test-data.pb") + + t.Run("Empty interface returns err", func(t *testing.T) { + dataStore := createInmemoryStore(t) + + mockClient := &mocks.DataCatalogClient{} + catalogClient := &CatalogClient{ + client: mockClient, + store: dataStore, + } + taskWithoutInterface := &core.TaskTemplate{ + Id: &core.Identifier{Project: "project", Domain: "domain", Name: "name"}, + Metadata: &core.TaskMetadata{ + DiscoveryVersion: "1.0.0", + }, + } + _, err := catalogClient.Get(ctx, taskWithoutInterface, testFile) + assert.Error(t, err) + }) + + t.Run("No results, no Dataset", func(t *testing.T) { + dataStore := createInmemoryStore(t) + err := dataStore.WriteProtobuf(ctx, testFile, storage.Options{}, newStringLiteral("output")) + assert.NoError(t, err) + + mockClient := &mocks.DataCatalogClient{} + catalogClient := &CatalogClient{ + client: mockClient, + store: dataStore, + } + mockClient.On("GetDataset", + ctx, + mock.MatchedBy(func(o *datacatalog.GetDatasetRequest) bool { + assert.EqualValues(t, datasetID, o.Dataset) + return true + }), + ).Return(nil, status.Error(codes.NotFound, "test not found")) + resp, err := catalogClient.Get(ctx, sampleTask, testFile) + assert.Error(t, err) + + assertGrpcErr(t, err, codes.NotFound) + assert.Nil(t, resp) + }) + + t.Run("No results, no Artifact", func(t *testing.T) { + dataStore := createInmemoryStore(t) + + err := dataStore.WriteProtobuf(ctx, testFile, storage.Options{}, sampleParameters) + assert.NoError(t, err) + + mockClient := &mocks.DataCatalogClient{} + discovery := &CatalogClient{ + client: mockClient, + store: dataStore, + } + + sampleDataSet := &datacatalog.Dataset{ + Id: datasetID, + } + mockClient.On("GetDataset", + ctx, + mock.MatchedBy(func(o *datacatalog.GetDatasetRequest) bool { + assert.EqualValues(t, datasetID, o.Dataset) + return true + }), + ).Return(&datacatalog.GetDatasetResponse{Dataset: sampleDataSet}, nil, "") + + mockClient.On("GetArtifact", + ctx, + mock.MatchedBy(func(o *datacatalog.GetArtifactRequest) bool { + return true + }), + ).Return(nil, status.Error(codes.NotFound, "")) + + outputs, err := discovery.Get(ctx, sampleTask, testFile) + assert.Nil(t, outputs) + assert.Error(t, err) + }) + + t.Run("Found w/ tag", func(t *testing.T) { + dataStore := createInmemoryStore(t) + + err := dataStore.WriteProtobuf(ctx, testFile, storage.Options{}, sampleParameters) + assert.NoError(t, err) + + mockClient := &mocks.DataCatalogClient{} + discovery := &CatalogClient{ + client: mockClient, + store: dataStore, + } + + sampleDataSet := &datacatalog.Dataset{ + Id: datasetID, + } + + mockClient.On("GetDataset", + ctx, + mock.MatchedBy(func(o *datacatalog.GetDatasetRequest) bool { + assert.EqualValues(t, datasetID, o.Dataset) + return true + }), + ).Return(&datacatalog.GetDatasetResponse{Dataset: sampleDataSet}, nil) + + sampleArtifactData := &datacatalog.ArtifactData{ + Name: "test", + Value: newStringLiteral("output1-stringval"), + } + sampleArtifact := &datacatalog.Artifact{ + Id: "test-artifact", + Dataset: sampleDataSet.Id, + Data: []*datacatalog.ArtifactData{sampleArtifactData}, + } + mockClient.On("GetArtifact", + ctx, + mock.MatchedBy(func(o *datacatalog.GetArtifactRequest) bool { + assert.EqualValues(t, datasetID, o.Dataset) + assert.Equal(t, "flyte_cached-BE6CZsMk6N3ExR_4X9EuwBgj2Jh2UwasXK3a_pM9xlY", o.GetTagName()) + return true + }), + ).Return(&datacatalog.GetArtifactResponse{Artifact: sampleArtifact}, nil) + + resp, err := discovery.Get(ctx, sampleTask, testFile) + assert.NoError(t, err) + assert.NotNil(t, resp) + }) + + t.Run("Found w/ tag no inputs or outputs", func(t *testing.T) { + dataStore := createInmemoryStore(t) + + mockClient := &mocks.DataCatalogClient{} + discovery := &CatalogClient{ + client: mockClient, + store: dataStore, + } + + sampleDataSet := &datacatalog.Dataset{ + Id: &datacatalog.DatasetID{ + Project: "project", + Domain: "domain", + Name: "name", + Version: "1.0.0-V-K42BDF-V-K42BDF", + }, + } + + mockClient.On("GetDataset", + ctx, + mock.MatchedBy(func(o *datacatalog.GetDatasetRequest) bool { + assert.EqualValues(t, "1.0.0-V-K42BDF-V-K42BDF", o.Dataset.Version) + return true + }), + ).Return(&datacatalog.GetDatasetResponse{Dataset: sampleDataSet}, nil) + + sampleArtifact := &datacatalog.Artifact{ + Id: "test-artifact", + Dataset: sampleDataSet.Id, + Data: []*datacatalog.ArtifactData{}, + } + mockClient.On("GetArtifact", + ctx, + mock.MatchedBy(func(o *datacatalog.GetArtifactRequest) bool { + assert.EqualValues(t, "1.0.0-V-K42BDF-V-K42BDF", o.Dataset.Version) + assert.Equal(t, "flyte_cached-m4vFNUOHOFEFIiZSyOyid92TkWFFBDha4UOkkBb47XU", o.GetTagName()) + return true + }), + ).Return(&datacatalog.GetArtifactResponse{Artifact: sampleArtifact}, nil) + + resp, err := discovery.Get(ctx, noInputOutputTask, testFile) + assert.NoError(t, err) + assert.NotNil(t, resp) + assert.Len(t, resp.Literals, 0) + }) +} + +func TestCatalog_Put(t *testing.T) { + ctx := context.Background() + + execID := &core.TaskExecutionIdentifier{ + NodeExecutionId: &core.NodeExecutionIdentifier{ + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "runID", + }, + }, + TaskId: &core.Identifier{ + Project: "project", + Domain: "domain", + Name: "taskRunName", + Version: "taskRunVersion", + }, + } + + testFile := storage.DataReference("test-data.pb") + + t.Run("Create new cached execution", func(t *testing.T) { + dataStore := createInmemoryStore(t) + + err := dataStore.WriteProtobuf(ctx, testFile, storage.Options{}, sampleParameters) + assert.NoError(t, err) + + mockClient := &mocks.DataCatalogClient{} + discovery := &CatalogClient{ + client: mockClient, + store: dataStore, + } + + mockClient.On("CreateDataset", + ctx, + mock.MatchedBy(func(o *datacatalog.CreateDatasetRequest) bool { + assert.True(t, proto.Equal(o.Dataset.Id, datasetID)) + return true + }), + ).Return(&datacatalog.CreateDatasetResponse{}, nil) + + mockClient.On("CreateArtifact", + ctx, + mock.MatchedBy(func(o *datacatalog.CreateArtifactRequest) bool { + _, parseErr := uuid.Parse(o.Artifact.Id) + assert.NoError(t, parseErr) + assert.EqualValues(t, 1, len(o.Artifact.Data)) + assert.EqualValues(t, "out1", o.Artifact.Data[0].Name) + assert.EqualValues(t, newStringLiteral("output1-stringval"), o.Artifact.Data[0].Value) + return true + }), + ).Return(&datacatalog.CreateArtifactResponse{}, nil) + + mockClient.On("AddTag", + ctx, + mock.MatchedBy(func(o *datacatalog.AddTagRequest) bool { + assert.EqualValues(t, "flyte_cached-BE6CZsMk6N3ExR_4X9EuwBgj2Jh2UwasXK3a_pM9xlY", o.Tag.Name) + return true + }), + ).Return(&datacatalog.AddTagResponse{}, nil) + err = discovery.Put(ctx, sampleTask, execID, testFile, testFile) + assert.NoError(t, err) + }) + + t.Run("Create new cached execution with no inputs/outputs", func(t *testing.T) { + dataStore := createInmemoryStore(t) + mockClient := &mocks.DataCatalogClient{} + catalogClient := &CatalogClient{ + client: mockClient, + store: dataStore, + } + + mockClient.On("CreateDataset", + ctx, + mock.MatchedBy(func(o *datacatalog.CreateDatasetRequest) bool { + assert.Equal(t, "1.0.0-V-K42BDF-V-K42BDF", o.Dataset.Id.Version) + return true + }), + ).Return(&datacatalog.CreateDatasetResponse{}, nil) + + mockClient.On("CreateArtifact", + ctx, + mock.MatchedBy(func(o *datacatalog.CreateArtifactRequest) bool { + assert.EqualValues(t, 0, len(o.Artifact.Data)) + return true + }), + ).Return(&datacatalog.CreateArtifactResponse{}, nil) + + mockClient.On("AddTag", + ctx, + mock.MatchedBy(func(o *datacatalog.AddTagRequest) bool { + assert.EqualValues(t, "flyte_cached-m4vFNUOHOFEFIiZSyOyid92TkWFFBDha4UOkkBb47XU", o.Tag.Name) + return true + }), + ).Return(&datacatalog.AddTagResponse{}, nil) + err := catalogClient.Put(ctx, noInputOutputTask, execID, "", "") + assert.NoError(t, err) + }) + + t.Run("Create new cached execution with existing dataset", func(t *testing.T) { + dataStore := createInmemoryStore(t) + + err := dataStore.WriteProtobuf(ctx, testFile, storage.Options{}, sampleParameters) + assert.NoError(t, err) + + mockClient := &mocks.DataCatalogClient{} + discovery := &CatalogClient{ + client: mockClient, + store: dataStore, + } + + createDatasetCalled := false + mockClient.On("CreateDataset", + ctx, + mock.MatchedBy(func(o *datacatalog.CreateDatasetRequest) bool { + createDatasetCalled = true + return true + }), + ).Return(nil, status.Error(codes.AlreadyExists, "test dataset already exists")) + + createArtifactCalled := false + mockClient.On("CreateArtifact", + ctx, + mock.MatchedBy(func(o *datacatalog.CreateArtifactRequest) bool { + _, parseErr := uuid.Parse(o.Artifact.Id) + assert.NoError(t, parseErr) + assert.EqualValues(t, 1, len(o.Artifact.Data)) + assert.EqualValues(t, "out1", o.Artifact.Data[0].Name) + assert.EqualValues(t, newStringLiteral("output1-stringval"), o.Artifact.Data[0].Value) + createArtifactCalled = true + return true + }), + ).Return(&datacatalog.CreateArtifactResponse{}, nil) + + addTagCalled := false + mockClient.On("AddTag", + ctx, + mock.MatchedBy(func(o *datacatalog.AddTagRequest) bool { + assert.EqualValues(t, "flyte_cached-BE6CZsMk6N3ExR_4X9EuwBgj2Jh2UwasXK3a_pM9xlY", o.Tag.Name) + addTagCalled = true + return true + }), + ).Return(&datacatalog.AddTagResponse{}, nil) + err = discovery.Put(ctx, sampleTask, execID, testFile, testFile) + assert.NoError(t, err) + assert.True(t, createDatasetCalled) + assert.True(t, createArtifactCalled) + assert.True(t, addTagCalled) + }) + +} diff --git a/pkg/controller/catalog/datacatalog/mocks/DataCatalogClient.go b/pkg/controller/catalog/datacatalog/mocks/DataCatalogClient.go new file mode 100644 index 0000000000..ae5a7f8e4b --- /dev/null +++ b/pkg/controller/catalog/datacatalog/mocks/DataCatalogClient.go @@ -0,0 +1,163 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import context "context" +import datacatalog "github.com/lyft/datacatalog/protos/gen" +import grpc "google.golang.org/grpc" +import mock "github.com/stretchr/testify/mock" + +// DataCatalogClient is an autogenerated mock type for the DataCatalogClient type +type DataCatalogClient struct { + mock.Mock +} + +// AddTag provides a mock function with given fields: ctx, in, opts +func (_m *DataCatalogClient) AddTag(ctx context.Context, in *datacatalog.AddTagRequest, opts ...grpc.CallOption) (*datacatalog.AddTagResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *datacatalog.AddTagResponse + if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.AddTagRequest, ...grpc.CallOption) *datacatalog.AddTagResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datacatalog.AddTagResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *datacatalog.AddTagRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// CreateArtifact provides a mock function with given fields: ctx, in, opts +func (_m *DataCatalogClient) CreateArtifact(ctx context.Context, in *datacatalog.CreateArtifactRequest, opts ...grpc.CallOption) (*datacatalog.CreateArtifactResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *datacatalog.CreateArtifactResponse + if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.CreateArtifactRequest, ...grpc.CallOption) *datacatalog.CreateArtifactResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datacatalog.CreateArtifactResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *datacatalog.CreateArtifactRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// CreateDataset provides a mock function with given fields: ctx, in, opts +func (_m *DataCatalogClient) CreateDataset(ctx context.Context, in *datacatalog.CreateDatasetRequest, opts ...grpc.CallOption) (*datacatalog.CreateDatasetResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *datacatalog.CreateDatasetResponse + if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.CreateDatasetRequest, ...grpc.CallOption) *datacatalog.CreateDatasetResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datacatalog.CreateDatasetResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *datacatalog.CreateDatasetRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetArtifact provides a mock function with given fields: ctx, in, opts +func (_m *DataCatalogClient) GetArtifact(ctx context.Context, in *datacatalog.GetArtifactRequest, opts ...grpc.CallOption) (*datacatalog.GetArtifactResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *datacatalog.GetArtifactResponse + if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.GetArtifactRequest, ...grpc.CallOption) *datacatalog.GetArtifactResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datacatalog.GetArtifactResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *datacatalog.GetArtifactRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetDataset provides a mock function with given fields: ctx, in, opts +func (_m *DataCatalogClient) GetDataset(ctx context.Context, in *datacatalog.GetDatasetRequest, opts ...grpc.CallOption) (*datacatalog.GetDatasetResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *datacatalog.GetDatasetResponse + if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.GetDatasetRequest, ...grpc.CallOption) *datacatalog.GetDatasetResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datacatalog.GetDatasetResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *datacatalog.GetDatasetRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/pkg/controller/catalog/datacatalog/transformer/datacatalog_transformer.go b/pkg/controller/catalog/datacatalog/transformer/datacatalog_transformer.go new file mode 100644 index 0000000000..6f66b24f40 --- /dev/null +++ b/pkg/controller/catalog/datacatalog/transformer/datacatalog_transformer.go @@ -0,0 +1,144 @@ +package transformer + +import ( + "context" + "fmt" + "reflect" + + "encoding/base64" + + datacatalog "github.com/lyft/datacatalog/protos/gen" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytepropeller/pkg/compiler/validators" + "github.com/lyft/flytestdlib/pbhash" +) + +const cachedTaskTag = "flyte_cached" +const taskNamespace = "flyte_task" +const maxParamHashLength = 8 + +// Declare the definition of empty literal and variable maps. This is important because we hash against +// the literal and variable maps. So Nil and empty literals and variable maps should translate to these defintions +// in order to have a consistent hash. +var emptyLiteralMap = core.LiteralMap{Literals: map[string]*core.Literal{}} +var emptyVariableMap = core.VariableMap{Variables: map[string]*core.Variable{}} + +func getDatasetNameFromTask(task *core.TaskTemplate) string { + return fmt.Sprintf("%s-%s", taskNamespace, task.Id.Name) +} + +func GenerateTaskOutputsFromArtifact(task *core.TaskTemplate, artifact *datacatalog.Artifact) (*core.LiteralMap, error) { + // if there are no outputs in the task, return empty map + if task.Interface.Outputs == nil || len(task.Interface.Outputs.Variables) == 0 { + return &emptyLiteralMap, nil + } + + outputVariables := task.Interface.Outputs.Variables + artifactDataList := artifact.Data + + // verify the task outputs matches what is stored in ArtifactData + if len(outputVariables) != len(artifactDataList) { + return nil, fmt.Errorf("The task %v with %d outputs, should have %d artifactData for artifact %v", + task.Id, len(outputVariables), len(artifactDataList), artifact.Id) + } + + outputs := make(map[string]*core.Literal, len(artifactDataList)) + for _, artifactData := range artifactDataList { + // verify that the name and type of artifactData matches what is expected from the interface + if _, ok := outputVariables[artifactData.Name]; !ok { + return nil, fmt.Errorf("Unexpected artifactData with name [%v] does not match any task output variables %v", artifactData.Name, reflect.ValueOf(outputVariables).MapKeys()) + } + + expectedVarType := outputVariables[artifactData.Name].GetType() + inputType := validators.LiteralTypeForLiteral(artifactData.Value) + if !validators.AreTypesCastable(inputType, expectedVarType) { + return nil, fmt.Errorf("Unexpected artifactData: [%v] type: [%v] does not match any task output type: [%v]", artifactData.Name, inputType, expectedVarType) + } + + outputs[artifactData.Name] = artifactData.Value + } + + return &core.LiteralMap{Literals: outputs}, nil +} + +func generateDataSetVersionFromTask(ctx context.Context, task *core.TaskTemplate) (string, error) { + signatureHash, err := generateTaskSignatureHash(ctx, task) + if err != nil { + return "", err + } + + cacheVersion := task.Metadata.DiscoveryVersion + if len(cacheVersion) == 0 { + return "", fmt.Errorf("Task cannot have an empty discoveryVersion %v", cacheVersion) + } + return fmt.Sprintf("%s-%s", cacheVersion, signatureHash), nil +} + +func generateTaskSignatureHash(ctx context.Context, task *core.TaskTemplate) (string, error) { + taskInputs := &emptyVariableMap + taskOutputs := &emptyVariableMap + + if task.Interface.Inputs != nil && len(task.Interface.Inputs.Variables) != 0 { + taskInputs = task.Interface.Inputs + } + + if task.Interface.Outputs != nil && len(task.Interface.Outputs.Variables) != 0 { + taskOutputs = task.Interface.Outputs + } + + inputHash, err := pbhash.ComputeHash(ctx, taskInputs) + if err != nil { + return "", err + } + + outputHash, err := pbhash.ComputeHash(ctx, taskOutputs) + if err != nil { + return "", err + } + + inputHashString := base64.RawURLEncoding.EncodeToString(inputHash) + + if len(inputHashString) > maxParamHashLength { + inputHashString = inputHashString[0:maxParamHashLength] + } + + outputHashString := base64.RawURLEncoding.EncodeToString(outputHash) + if len(outputHashString) > maxParamHashLength { + outputHashString = outputHashString[0:maxParamHashLength] + } + + return fmt.Sprintf("%v-%v", inputHashString, outputHashString), nil +} + +func GenerateArtifactTagName(ctx context.Context, inputs *core.LiteralMap) (string, error) { + if inputs == nil || len(inputs.Literals) == 0 { + inputs = &emptyLiteralMap + } + + inputsHash, err := pbhash.ComputeHash(ctx, inputs) + if err != nil { + return "", err + } + + hashString := base64.RawURLEncoding.EncodeToString(inputsHash) + tag := fmt.Sprintf("%s-%s", cachedTaskTag, hashString) + return tag, nil +} + +// Get the DataSetID for a task. +// NOTE: the version of the task is a combination of both the discoverable_version and the task signature. +// This is because the interfact may of changed even if the discoverable_version hadn't. +func GenerateDatasetIDForTask(ctx context.Context, task *core.TaskTemplate) (*datacatalog.DatasetID, error) { + datasetVersion, err := generateDataSetVersionFromTask(ctx, task) + if err != nil { + return nil, err + } + + datasetID := &datacatalog.DatasetID{ + Project: task.Id.Project, + Domain: task.Id.Domain, + Name: getDatasetNameFromTask(task), + Version: datasetVersion, + } + return datasetID, nil +} diff --git a/pkg/controller/catalog/datacatalog/transformer/datacatalog_transformer_test.go b/pkg/controller/catalog/datacatalog/transformer/datacatalog_transformer_test.go new file mode 100644 index 0000000000..39cd764618 --- /dev/null +++ b/pkg/controller/catalog/datacatalog/transformer/datacatalog_transformer_test.go @@ -0,0 +1,132 @@ +package transformer + +import ( + "context" + "testing" + + "github.com/gogo/protobuf/proto" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytepropeller/pkg/utils" + "github.com/stretchr/testify/assert" +) + +// add test for raarranged Literal maps for input values + +func TestNilParamTask(t *testing.T) { + task := &core.TaskTemplate{ + Id: &core.Identifier{ + Project: "project", + Domain: "domain", + Name: "name", + Version: "1.0.0", + }, + Metadata: &core.TaskMetadata{ + DiscoveryVersion: "1.0.0", + }, + Interface: &core.TypedInterface{ + Inputs: nil, + Outputs: nil, + }, + } + datasetID, err := GenerateDatasetIDForTask(context.TODO(), task) + assert.NoError(t, err) + assert.NotEmpty(t, datasetID.Version) + assert.Equal(t, "1.0.0-V-K42BDF-V-K42BDF", datasetID.Version) +} + +// Ensure that empty parameters generate the same dataset as nil parameters +func TestEmptyParamTask(t *testing.T) { + task := &core.TaskTemplate{ + Id: &core.Identifier{ + Project: "project", + Domain: "domain", + Name: "name", + Version: "1.0.0", + }, + Metadata: &core.TaskMetadata{ + DiscoveryVersion: "1.0.0", + }, + Interface: &core.TypedInterface{ + Inputs: &core.VariableMap{}, + Outputs: &core.VariableMap{}, + }, + } + datasetID, err := GenerateDatasetIDForTask(context.TODO(), task) + assert.NoError(t, err) + assert.NotEmpty(t, datasetID.Version) + assert.Equal(t, "1.0.0-V-K42BDF-V-K42BDF", datasetID.Version) + + task.Interface.Inputs = nil + task.Interface.Outputs = nil + datasetIDDupe, err := GenerateDatasetIDForTask(context.TODO(), task) + assert.NoError(t, err) + assert.True(t, proto.Equal(datasetIDDupe, datasetID)) +} + +// Ensure the key order on the map generates the same dataset +func TestVariableMapOrder(t *testing.T) { + task := &core.TaskTemplate{ + Id: &core.Identifier{ + Project: "project", + Domain: "domain", + Name: "name", + Version: "1.0.0", + }, + Metadata: &core.TaskMetadata{ + DiscoveryVersion: "1.0.0", + }, + Interface: &core.TypedInterface{ + Inputs: &core.VariableMap{ + Variables: map[string]*core.Variable{ + "1": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, + "2": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, + }, + }, + }, + } + datasetID, err := GenerateDatasetIDForTask(context.TODO(), task) + assert.NoError(t, err) + assert.NotEmpty(t, datasetID.Version) + assert.Equal(t, "1.0.0-UxVtPm0k-V-K42BDF", datasetID.Version) + + task.Interface.Inputs = &core.VariableMap{ + Variables: map[string]*core.Variable{ + "2": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, + "1": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, + }, + } + datasetIDDupe, err := GenerateDatasetIDForTask(context.TODO(), task) + assert.NoError(t, err) + + assert.Equal(t, "1.0.0-UxVtPm0k-V-K42BDF", datasetIDDupe.Version) + assert.True(t, proto.Equal(datasetID, datasetIDDupe)) +} + +// Ensure the key order on the inputs generates the same tag +func TestInputValueSorted(t *testing.T) { + literalMap, err := utils.MakeLiteralMap(map[string]interface{}{"1": 1, "2": 2}) + assert.NoError(t, err) + + tag, err := GenerateArtifactTagName(context.TODO(), literalMap) + assert.NoError(t, err) + assert.Equal(t, "flyte_cached-GQid5LjHbakcW68DS3P2jp80QLbiF0olFHF2hTh5bg8", tag) + + literalMap, err = utils.MakeLiteralMap(map[string]interface{}{"2": 2, "1": 1}) + assert.NoError(t, err) + + tagDupe, err := GenerateArtifactTagName(context.TODO(), literalMap) + assert.NoError(t, err) + assert.Equal(t, tagDupe, tag) +} + +// Ensure that empty inputs are hashed the same way +func TestNoInputValues(t *testing.T) { + tag, err := GenerateArtifactTagName(context.TODO(), nil) + assert.NoError(t, err) + assert.Equal(t, "flyte_cached-m4vFNUOHOFEFIiZSyOyid92TkWFFBDha4UOkkBb47XU", tag) + + tagDupe, err := GenerateArtifactTagName(context.TODO(), &core.LiteralMap{Literals: nil}) + assert.NoError(t, err) + assert.Equal(t, "flyte_cached-m4vFNUOHOFEFIiZSyOyid92TkWFFBDha4UOkkBb47XU", tagDupe) + assert.Equal(t, tagDupe, tag) +} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 25d8fb4316..d805efee49 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -274,7 +274,10 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter } logger.Info(ctx, "Setting up Catalog client.") - catalogClient := catalog.NewCatalogClient(store) + catalogClient, err := catalog.NewCatalogClient(ctx, store) + if err != nil { + return nil, errors.Wrapf(err, "Failed to create datacatalog client") + } workQ, err := NewCompositeWorkQueue(ctx, cfg.Queue, scope) if err != nil { diff --git a/pkg/controller/nodes/executor_test.go b/pkg/controller/nodes/executor_test.go index 2cfcc4caff..56d7d8c53f 100644 --- a/pkg/controller/nodes/executor_test.go +++ b/pkg/controller/nodes/executor_test.go @@ -59,7 +59,7 @@ func init() { func TestSetInputsForStartNode(t *testing.T) { ctx := context.Background() mockStorage := createInmemoryDataStore(t, testScope.NewSubScope("f")) - catalogClient := catalog.NewCatalogClient(mockStorage) + catalogClient, _ := catalog.NewCatalogClient(ctx, mockStorage) enQWf := func(workflowID v1alpha1.WorkflowID) {} factory := createSingletonTaskExecutorFactory() @@ -138,7 +138,7 @@ func TestNodeExecutor_TransitionToPhase(t *testing.T) { memStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) assert.NoError(t, err) - catalogClient := catalog.NewCatalogClient(memStore) + catalogClient, _ := catalog.NewCatalogClient(ctx, memStore) execIface, err := NewExecutor(ctx, memStore, enQWf, time.Second, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalogClient, fakeKubeClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -349,7 +349,7 @@ func TestNodeExecutor_Initialize(t *testing.T) { memStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) assert.NoError(t, err) - catalogClient := catalog.NewCatalogClient(memStore) + catalogClient, _ := catalog.NewCatalogClient(ctx, memStore) execIface, err := NewExecutor(ctx, memStore, enQWf, time.Second, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalogClient, fakeKubeClient, promutils.NewTestScope()) assert.NoError(t, err) @@ -368,7 +368,7 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseStartNodes(t *testing.T) { assert.True(t, task.IsTestModeEnabled()) store := createInmemoryDataStore(t, promutils.NewTestScope()) - catalogClient := catalog.NewCatalogClient(store) + catalogClient, _ := catalog.NewCatalogClient(ctx, store) execIface, err := NewExecutor(ctx, store, enQWf, time.Second, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalogClient, fakeKubeClient, promutils.NewTestScope()) @@ -472,7 +472,7 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseEndNode(t *testing.T) { assert.True(t, task.IsTestModeEnabled()) store := createInmemoryDataStore(t, promutils.NewTestScope()) - catalogClient := catalog.NewCatalogClient(store) + catalogClient, _ := catalog.NewCatalogClient(ctx, store) execIface, err := NewExecutor(ctx, store, enQWf, time.Second, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalogClient, fakeKubeClient, promutils.NewTestScope()) assert.NoError(t, err) @@ -660,7 +660,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { assert.True(t, task.IsTestModeEnabled()) store := createInmemoryDataStore(t, promutils.NewTestScope()) - catalogClient := catalog.NewCatalogClient(store) + catalogClient, _ := catalog.NewCatalogClient(ctx, store) execIface, err := NewExecutor(ctx, store, enQWf, time.Second, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalogClient, fakeKubeClient, promutils.NewTestScope()) assert.NoError(t, err) @@ -961,7 +961,7 @@ func TestNodeExecutor_RecursiveNodeHandler_NoDownstream(t *testing.T) { assert.True(t, task.IsTestModeEnabled()) store := createInmemoryDataStore(t, promutils.NewTestScope()) - catalogClient := catalog.NewCatalogClient(store) + catalogClient, _ := catalog.NewCatalogClient(ctx, store) execIface, err := NewExecutor(ctx, store, enQWf, time.Second, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalogClient, fakeKubeClient, promutils.NewTestScope()) assert.NoError(t, err) @@ -1334,7 +1334,7 @@ func TestNodeExecutor_RecursiveNodeHandler_UpstreamNotReady(t *testing.T) { assert.True(t, task.IsTestModeEnabled()) store := createInmemoryDataStore(t, promutils.NewTestScope()) - catalogClient := catalog.NewCatalogClient(store) + catalogClient, _ := catalog.NewCatalogClient(ctx, store) execIface, err := NewExecutor(ctx, store, enQWf, time.Second, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalogClient, fakeKubeClient, promutils.NewTestScope()) assert.NoError(t, err) diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index b118107edd..dc0776a17d 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -215,25 +215,26 @@ func (h *taskHandler) StartNode(ctx context.Context, w v1alpha1.ExecutableWorkfl } logger.Infof(ctx, "Executor type: [%v]. Properties: finalizer[%v]. disable[%v].", reflect.TypeOf(t).String(), t.GetProperties().RequiresFinalizer, t.GetProperties().DisableNodeLevelCaching) - if task.CoreTask().Metadata.Discoverable { + if iface := task.CoreTask().Interface; task.CoreTask().Metadata.Discoverable && iface != nil && iface.Outputs != nil && len(iface.Outputs.Variables) > 0 { if t.GetProperties().DisableNodeLevelCaching { logger.Infof(ctx, "Executor has Node-Level caching disabled. Skipping.") } else if resp, err := h.catalogClient.Get(ctx, task.CoreTask(), taskCtx.GetInputsFile()); err != nil { if taskStatus, ok := status.FromError(err); ok && taskStatus.Code() == codes.NotFound { h.metrics.discoveryMissCount.Inc(ctx) - logger.Infof(ctx, "Artifact not found in Discovery. Executing Task.") + logger.Infof(ctx, "Artifact not found in cache. Executing Task.") } else { h.metrics.discoveryGetFailureCount.Inc(ctx) - logger.Errorf(ctx, "Discovery check failed. Executing Task. Err: %v", err.Error()) + logger.Errorf(ctx, "Catalog cache check failed. Executing Task. Err: %v", err.Error()) } } else if resp != nil { h.metrics.discoveryHitCount.Inc(ctx) - if iface := task.CoreTask().Interface; iface != nil && iface.Outputs != nil && len(iface.Outputs.Variables) > 0 { - if err := h.store.WriteProtobuf(ctx, taskCtx.GetOutputsFile(), storage.Options{}, resp); err != nil { - logger.Errorf(ctx, "failed to write data to Storage, err: %v", err.Error()) - return handler.StatusUndefined, errors.Wrapf(errors.CausedByError, node.GetID(), err, "failed to copy cached results for task.") - } + + logger.Debugf(ctx, "Outputs found in Catalog cache %+v", resp) + if err := h.store.WriteProtobuf(ctx, taskCtx.GetOutputsFile(), storage.Options{}, resp); err != nil { + logger.Errorf(ctx, "failed to write data to Storage, err: %v", err.Error()) + return handler.StatusUndefined, errors.Wrapf(errors.CausedByError, node.GetID(), err, "failed to copy cached results for task.") } + // SetCached. w.GetNodeExecutionStatus(node.GetID()).SetCached() return handler.StatusSuccess, nil @@ -344,7 +345,7 @@ func (h *taskHandler) HandleNodeSuccess(ctx context.Context, w v1alpha1.Executab h.metrics.discoveryPutFailureCount.Inc(ctx) logger.Errorf(ctx, "Failed to write results to catalog. Err: %v", err2) } else { - logger.Debugf(ctx, "Successfully cached results to discovery - Task [%s]", task.CoreTask().GetId()) + logger.Debugf(ctx, "Successfully cached results - Task [%s]", task.CoreTask().GetId()) } } } diff --git a/pkg/controller/workflow/executor_test.go b/pkg/controller/workflow/executor_test.go index 492e4067dd..39bd812917 100644 --- a/pkg/controller/workflow/executor_test.go +++ b/pkg/controller/workflow/executor_test.go @@ -231,7 +231,8 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Error(t *testing.T) { enqueueWorkflow := func(workflowId v1alpha1.WorkflowID) {} eventSink := events.NewMockEventSink() - nodeExec, err := nodes.NewExecutor(ctx, store, enqueueWorkflow, time.Second, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalog.NewCatalogClient(store), fakeKubeClient, promutils.NewTestScope()) + catalogClient, _ := catalog.NewCatalogClient(ctx, store) + nodeExec, err := nodes.NewExecutor(ctx, store, enqueueWorkflow, time.Second, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalogClient, fakeKubeClient, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) assert.NoError(t, err) @@ -280,7 +281,8 @@ func TestWorkflowExecutor_HandleFlyteWorkflow(t *testing.T) { enqueueWorkflow := func(workflowId v1alpha1.WorkflowID) {} eventSink := events.NewMockEventSink() - nodeExec, err := nodes.NewExecutor(ctx, store, enqueueWorkflow, time.Second, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalog.NewCatalogClient(store), fakeKubeClient, promutils.NewTestScope()) + catalogClient, _ := catalog.NewCatalogClient(ctx, store) + nodeExec, err := nodes.NewExecutor(ctx, store, enqueueWorkflow, time.Second, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalogClient, fakeKubeClient, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) @@ -330,7 +332,8 @@ func BenchmarkWorkflowExecutor(b *testing.B) { enqueueWorkflow := func(workflowId v1alpha1.WorkflowID) {} eventSink := events.NewMockEventSink() - nodeExec, err := nodes.NewExecutor(ctx, store, enqueueWorkflow, time.Second, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalog.NewCatalogClient(store), fakeKubeClient, scope) + catalogClient, _ := catalog.NewCatalogClient(ctx, store) + nodeExec, err := nodes.NewExecutor(ctx, store, enqueueWorkflow, time.Second, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalogClient, fakeKubeClient, scope) assert.NoError(b, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) @@ -415,7 +418,8 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) { } return nil } - nodeExec, err := nodes.NewExecutor(ctx, store, enqueueWorkflow, time.Second, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalog.NewCatalogClient(store), fakeKubeClient, promutils.NewTestScope()) + catalogClient, _ := catalog.NewCatalogClient(ctx, store) + nodeExec, err := nodes.NewExecutor(ctx, store, enqueueWorkflow, time.Second, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalogClient, fakeKubeClient, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) assert.NoError(t, err) @@ -502,7 +506,8 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Events(t *testing.T) { } return nil } - nodeExec, err := nodes.NewExecutor(ctx, store, enqueueWorkflow, time.Second, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalog.NewCatalogClient(store), fakeKubeClient, promutils.NewTestScope()) + catalogClient, _ := catalog.NewCatalogClient(ctx, store) + nodeExec, err := nodes.NewExecutor(ctx, store, enqueueWorkflow, time.Second, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalogClient, fakeKubeClient, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, promutils.NewTestScope()) assert.NoError(t, err) @@ -553,7 +558,8 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) { assert.NoError(t, err) nodeEventSink := events.NewMockEventSink() - nodeExec, err := nodes.NewExecutor(ctx, store, enqueueWorkflow, time.Second, nodeEventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalog.NewCatalogClient(store), fakeKubeClient, promutils.NewTestScope()) + catalogClient, _ := catalog.NewCatalogClient(ctx, store) + nodeExec, err := nodes.NewExecutor(ctx, store, enqueueWorkflow, time.Second, nodeEventSink, launchplan.NewFailFastLaunchPlanExecutor(), catalogClient, fakeKubeClient, promutils.NewTestScope()) assert.NoError(t, err) t.Run("EventAlreadyInTerminalStateError", func(t *testing.T) { From 8ba3946178fd0b7c8ceb413b05bb909fc94def3c Mon Sep 17 00:00:00 2001 From: Andrew Chan Date: Mon, 9 Sep 2019 17:07:36 -0700 Subject: [PATCH 2/5] Correct metadata for execution name --- .../catalog/datacatalog/datacatalog.go | 31 ++++++++----------- 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/pkg/controller/catalog/datacatalog/datacatalog.go b/pkg/controller/catalog/datacatalog/datacatalog.go index 672da3f3f5..1cdea9b816 100644 --- a/pkg/controller/catalog/datacatalog/datacatalog.go +++ b/pkg/controller/catalog/datacatalog/datacatalog.go @@ -21,11 +21,11 @@ import ( ) const ( - taskVersionKey = "task-version" - taskExecKey = "execution-name" - taskExecVersion = "execution-version" + taskVersionKey = "task-version" + taskExecKey = "execution-name" ) +// This is the client that caches task executions to DataCatalog service. type CatalogClient struct { client datacatalog.DataCatalogClient store storage.ProtobufStore @@ -174,15 +174,17 @@ func (m *CatalogClient) Put(ctx context.Context, task *core.TaskTemplate, execID logger.Debugf(ctx, "DataCatalog put into Catalog for DataSet %v", datasetID) // Try creating the dataset in case it doesn't exist - newDataset := &datacatalog.Dataset{ - Id: datasetID, - Metadata: &datacatalog.Metadata{ - KeyMap: map[string]string{ - taskVersionKey: task.Id.Version, - taskExecKey: execID.TaskId.Name, - }, + + metadata := &datacatalog.Metadata{ + KeyMap: map[string]string{ + taskVersionKey: task.Id.Version, + taskExecKey: execID.NodeExecutionId.NodeId, }, } + newDataset := &datacatalog.Dataset{ + Id: datasetID, + Metadata: metadata, + } _, err = m.client.CreateDataset(ctx, &datacatalog.CreateDatasetRequest{Dataset: newDataset}) if err != nil { @@ -206,18 +208,11 @@ func (m *CatalogClient) Put(ctx context.Context, task *core.TaskTemplate, execID artifactDataList = append(artifactDataList, artifactData) } - artifactMetadata := &datacatalog.Metadata{ - KeyMap: map[string]string{ - taskExecVersion: execID.TaskId.Version, - taskExecKey: execID.TaskId.Name, - }, - } - cachedArtifact := &datacatalog.Artifact{ Id: string(uuid.NewUUID()), Dataset: datasetID, Data: artifactDataList, - Metadata: artifactMetadata, + Metadata: metadata, } createArtifactRequest := &datacatalog.CreateArtifactRequest{Artifact: cachedArtifact} From 2404756d96964ae5bce9c7f0fc315b39b1f7787c Mon Sep 17 00:00:00 2001 From: Andrew Chan Date: Tue, 10 Sep 2019 10:19:21 -0700 Subject: [PATCH 3/5] Specify insecure connection with config --- pkg/controller/catalog/catalog_client.go | 2 +- pkg/controller/catalog/config.go | 2 +- pkg/controller/catalog/config_flags.go | 4 ++-- pkg/controller/catalog/config_flags_test.go | 14 +++++++------- pkg/controller/catalog/datacatalog/datacatalog.go | 8 ++++++-- 5 files changed, 17 insertions(+), 13 deletions(-) diff --git a/pkg/controller/catalog/catalog_client.go b/pkg/controller/catalog/catalog_client.go index af24c2b4d0..67a196898b 100644 --- a/pkg/controller/catalog/catalog_client.go +++ b/pkg/controller/catalog/catalog_client.go @@ -25,7 +25,7 @@ func NewCatalogClient(ctx context.Context, store storage.ProtobufStore) (Client, case LegacyDiscoveryType: catalogClient = NewLegacyDiscovery(catalogConfig.Endpoint, store) case DataCatalogType: - catalogClient, err = datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Secure, store) + catalogClient, err = datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure, store) if err != nil { return nil, err } diff --git a/pkg/controller/catalog/config.go b/pkg/controller/catalog/config.go index 6d4f2c3376..d067876f03 100644 --- a/pkg/controller/catalog/config.go +++ b/pkg/controller/catalog/config.go @@ -27,7 +27,7 @@ const ( type Config struct { Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"` Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"` - Secure bool `json:"secure" pflag:"true, Connect with TSL/SSL"` + Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"` } // Gets loaded config for Discovery diff --git a/pkg/controller/catalog/config_flags.go b/pkg/controller/catalog/config_flags.go index b2359a462d..731c2347f1 100755 --- a/pkg/controller/catalog/config_flags.go +++ b/pkg/controller/catalog/config_flags.go @@ -1,6 +1,6 @@ // Code generated by go generate; DO NOT EDIT. // This file was generated by robots at -// 2019-09-05 05:37:07.301294018 -0700 PDT m=+14.696460456 +// 2019-09-10 10:40:36.580780957 -0700 PDT m=+11.375426414 package catalog @@ -16,6 +16,6 @@ func (Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags := pflag.NewFlagSet("Config", pflag.ExitOnError) cmdFlags.String(fmt.Sprintf("%v%v", prefix, "type"), "noop", " Catalog Implementation to use") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "endpoint"), "", " Endpoint for catalog service") - cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "secure"), true, " Connect with TSL/SSL") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "insecure"), false, " Use insecure grpc connection") return cmdFlags } diff --git a/pkg/controller/catalog/config_flags_test.go b/pkg/controller/catalog/config_flags_test.go index e5ce16b044..5d01a62377 100755 --- a/pkg/controller/catalog/config_flags_test.go +++ b/pkg/controller/catalog/config_flags_test.go @@ -1,6 +1,6 @@ // Code generated by go generate; DO NOT EDIT. // This file was generated by robots at -// 2019-09-05 05:37:07.301294018 -0700 PDT m=+14.696460456 +// 2019-09-10 10:40:36.580780957 -0700 PDT m=+11.375426414 package catalog @@ -129,20 +129,20 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_secure", func(t *testing.T) { + t.Run("Test_insecure", func(t *testing.T) { t.Run("DefaultValue", func(t *testing.T) { // Test that default value is set properly - if vBool, err := cmdFlags.GetBool("secure"); err == nil { - assert.Equal(t, true, vBool) + if vBool, err := cmdFlags.GetBool("insecure"); err == nil { + assert.Equal(t, false, vBool) } else { assert.FailNow(t, err.Error()) } }) t.Run("Override", func(t *testing.T) { - cmdFlags.Set("secure", "1") - if vBool, err := cmdFlags.GetBool("secure"); err == nil { - testDecodeJson_Config(t, fmt.Sprintf("%v", vBool), &actual.Secure) + cmdFlags.Set("insecure", "1") + if vBool, err := cmdFlags.GetBool("insecure"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vBool), &actual.Insecure) } else { assert.FailNow(t, err.Error()) diff --git a/pkg/controller/catalog/datacatalog/datacatalog.go b/pkg/controller/catalog/datacatalog/datacatalog.go index 1cdea9b816..73db43b221 100644 --- a/pkg/controller/catalog/datacatalog/datacatalog.go +++ b/pkg/controller/catalog/datacatalog/datacatalog.go @@ -250,7 +250,7 @@ func (m *CatalogClient) Put(ctx context.Context, task *core.TaskTemplate, execID return nil } -func NewDataCatalog(ctx context.Context, endpoint string, secureConnection bool, datastore storage.ProtobufStore) (*CatalogClient, error) { +func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, datastore storage.ProtobufStore) (*CatalogClient, error) { var opts []grpc.DialOption grpcOptions := []grpc_retry.CallOption{ @@ -259,7 +259,11 @@ func NewDataCatalog(ctx context.Context, endpoint string, secureConnection bool, grpc_retry.WithMax(5), } - if secureConnection { + if insecureConnection { + logger.Debug(ctx, "Establishing insecure connection to DataCatalog") + opts = append(opts, grpc.WithInsecure()) + } else { + logger.Debug(ctx, "Establishing secure connection to DataCatalog") pool, err := x509.SystemCertPool() if err != nil { return nil, err From 876c9a9a7a62622aa333390bf411d77fc21e32ea Mon Sep 17 00:00:00 2001 From: Andrew Chan Date: Tue, 10 Sep 2019 11:28:38 -0700 Subject: [PATCH 4/5] Add more comments --- config.yaml | 1 + pkg/controller/catalog/datacatalog/datacatalog.go | 4 +++- .../datacatalog/transformer/datacatalog_transformer.go | 3 +++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/config.yaml b/config.yaml index 6bb16eea30..cbfa17fe95 100644 --- a/config.yaml +++ b/config.yaml @@ -91,6 +91,7 @@ admin: catalog-cache: type: catalog endpoint: datacatalog:8089 + insecure: true errors: show-source: true logger: diff --git a/pkg/controller/catalog/datacatalog/datacatalog.go b/pkg/controller/catalog/datacatalog/datacatalog.go index 73db43b221..f8aa16883f 100644 --- a/pkg/controller/catalog/datacatalog/datacatalog.go +++ b/pkg/controller/catalog/datacatalog/datacatalog.go @@ -31,6 +31,7 @@ type CatalogClient struct { store storage.ProtobufStore } +// Helper method to retrieve an artifact by the tag func (m *CatalogClient) getArtifactByTag(ctx context.Context, tagName string, dataset *datacatalog.Dataset) (*datacatalog.Artifact, error) { logger.Debugf(ctx, "Get Artifact by tag %v", tagName) artifactQuery := &datacatalog.GetArtifactRequest{ @@ -47,6 +48,7 @@ func (m *CatalogClient) getArtifactByTag(ctx context.Context, tagName string, da return response.Artifact, nil } +// Helper method to retrieve a dataset that is associated with the task func (m *CatalogClient) getDataset(ctx context.Context, task *core.TaskTemplate) (*datacatalog.Dataset, error) { datasetID, err := transformer.GenerateDatasetIDForTask(ctx, task) if err != nil { @@ -174,7 +176,6 @@ func (m *CatalogClient) Put(ctx context.Context, task *core.TaskTemplate, execID logger.Debugf(ctx, "DataCatalog put into Catalog for DataSet %v", datasetID) // Try creating the dataset in case it doesn't exist - metadata := &datacatalog.Metadata{ KeyMap: map[string]string{ taskVersionKey: task.Id.Version, @@ -250,6 +251,7 @@ func (m *CatalogClient) Put(ctx context.Context, task *core.TaskTemplate, execID return nil } +// Create a new Datacatalog client for task execution caching func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, datastore storage.ProtobufStore) (*CatalogClient, error) { var opts []grpc.DialOption diff --git a/pkg/controller/catalog/datacatalog/transformer/datacatalog_transformer.go b/pkg/controller/catalog/datacatalog/transformer/datacatalog_transformer.go index 6f66b24f40..c5f643cfdd 100644 --- a/pkg/controller/catalog/datacatalog/transformer/datacatalog_transformer.go +++ b/pkg/controller/catalog/datacatalog/transformer/datacatalog_transformer.go @@ -27,7 +27,9 @@ func getDatasetNameFromTask(task *core.TaskTemplate) string { return fmt.Sprintf("%s-%s", taskNamespace, task.Id.Name) } +// Transform the artifact Data into task execution outputs as a literal map func GenerateTaskOutputsFromArtifact(task *core.TaskTemplate, artifact *datacatalog.Artifact) (*core.LiteralMap, error) { + // if there are no outputs in the task, return empty map if task.Interface.Outputs == nil || len(task.Interface.Outputs.Variables) == 0 { return &emptyLiteralMap, nil @@ -110,6 +112,7 @@ func generateTaskSignatureHash(ctx context.Context, task *core.TaskTemplate) (st return fmt.Sprintf("%v-%v", inputHashString, outputHashString), nil } +// Generate a tag by hashing the input values func GenerateArtifactTagName(ctx context.Context, inputs *core.LiteralMap) (string, error) { if inputs == nil || len(inputs.Literals) == 0 { inputs = &emptyLiteralMap From 4366de7157feeb4aae919b97ce0603f1352d6c2b Mon Sep 17 00:00:00 2001 From: Andrew Chan Date: Tue, 10 Sep 2019 11:33:48 -0700 Subject: [PATCH 5/5] go-lint cleanup --- pkg/controller/catalog/datacatalog/datacatalog_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/catalog/datacatalog/datacatalog_test.go b/pkg/controller/catalog/datacatalog/datacatalog_test.go index d25b94165a..ea3bef477b 100644 --- a/pkg/controller/catalog/datacatalog/datacatalog_test.go +++ b/pkg/controller/catalog/datacatalog/datacatalog_test.go @@ -388,7 +388,7 @@ func TestCatalog_Put(t *testing.T) { createDatasetCalled := false mockClient.On("CreateDataset", ctx, - mock.MatchedBy(func(o *datacatalog.CreateDatasetRequest) bool { + mock.MatchedBy(func(_ *datacatalog.CreateDatasetRequest) bool { createDatasetCalled = true return true }),