From e502a4f7eb2af4400c69003b6085dbd4be588e54 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 3 Dec 2019 13:10:33 -0800 Subject: [PATCH] qubole validation for older versions of the sdk (#34) Prior versions of the SDK did not fill in this value. When this code was written, it was assumed that all prior versions of the SDK would've been deprecated, which was a bad assumption. --- go/tasks/plugins/hive/execution_state.go | 12 ++++++++++++ go/tasks/plugins/hive/execution_state_test.go | 11 +++++++++++ 2 files changed, 23 insertions(+) diff --git a/go/tasks/plugins/hive/execution_state.go b/go/tasks/plugins/hive/execution_state.go index 73e6f294f..978c33893 100644 --- a/go/tasks/plugins/hive/execution_state.go +++ b/go/tasks/plugins/hive/execution_state.go @@ -166,6 +166,14 @@ func GetAllocationToken(ctx context.Context, resourceNamespace core.ResourceName return newState, nil } +func validateQuboleHiveJob(hiveJob plugins.QuboleHiveJob) error { + if hiveJob.Query == nil { + return errors.Errorf(errors.BadTaskSpecification, + "Query could not be found. Please ensure that you are at least on Flytekit version 0.3.0 or later.") + } + return nil +} + // This function is the link between the output written by the SDK, and the execution side. It extracts the query // out of the task template. func GetQueryInfo(ctx context.Context, tCtx core.TaskExecutionContext) ( @@ -182,6 +190,10 @@ func GetQueryInfo(ctx context.Context, tCtx core.TaskExecutionContext) ( return "", "", []string{}, 0, err } + if err := validateQuboleHiveJob(hiveJob); err != nil { + return "", "", []string{}, 0, err + } + query = hiveJob.Query.GetQuery() cluster = hiveJob.ClusterLabel tags = hiveJob.Tags diff --git a/go/tasks/plugins/hive/execution_state_test.go b/go/tasks/plugins/hive/execution_state_test.go index 64ac37aa4..d080646fe 100644 --- a/go/tasks/plugins/hive/execution_state_test.go +++ b/go/tasks/plugins/hive/execution_state_test.go @@ -2,6 +2,7 @@ package hive import ( "context" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/plugins" "net/url" "testing" @@ -77,6 +78,16 @@ func TestGetQueryInfo(t *testing.T) { assert.Equal(t, 500, int(timeout)) } +func TestValidateQuboleHiveJob(t *testing.T) { + hiveJob := plugins.QuboleHiveJob{ + ClusterLabel: "default", + Tags: []string{"flyte_plugin_test"}, + Query: nil, + } + err := validateQuboleHiveJob(hiveJob) + assert.Error(t, err) +} + func TestConstructTaskLog(t *testing.T) { expected := "https://wellness.qubole.com/v2/analyze?command_id=123" u, err := url.Parse(expected)