Skip to content

Commit

Permalink
Hive query tags (flyteorg#43)
Browse files Browse the repository at this point in the history
Before the plugin api refactor, we used to add these [tags](https://github.com/lyft/flyteplugins/blob/ca70572f2eaf718be65fe92fb7887aee9a408c20/go/tasks/v1/qubole/hive_executor.go#L173-L176) but they were lost in translation.  This adds them back.
  • Loading branch information
wild-endeavor authored Dec 30, 2019
1 parent fb6ca98 commit 84df181
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
6 changes: 5 additions & 1 deletion flyteplugins/go/tasks/plugins/hive/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,12 @@ func GetQueryInfo(ctx context.Context, tCtx core.TaskExecutionContext) (

query = hiveJob.Query.GetQuery()
cluster = hiveJob.ClusterLabel
tags = hiveJob.Tags
timeoutSec = hiveJob.Query.TimeoutSec
tags = hiveJob.Tags
tags = append(tags, fmt.Sprintf("ns:%s", tCtx.TaskExecutionMetadata().GetNamespace()))
for k, v := range tCtx.TaskExecutionMetadata().GetLabels() {
tags = append(tags, fmt.Sprintf("%s:%s", k, v))
}

return
}
Expand Down
13 changes: 10 additions & 3 deletions flyteplugins/go/tasks/plugins/hive/execution_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ package hive

import (
"context"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/plugins"
"net/url"
"testing"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/plugins"

mocks2 "github.com/lyft/flytestdlib/cache/mocks"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core/mocks"
pluginsCoreMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core/mocks"
"github.com/lyft/flyteplugins/go/tasks/plugins/hive/client"
quboleMocks "github.com/lyft/flyteplugins/go/tasks/plugins/hive/client/mocks"
"github.com/lyft/flyteplugins/go/tasks/plugins/hive/config"
Expand Down Expand Up @@ -70,18 +72,23 @@ func TestGetQueryInfo(t *testing.T) {
mockTaskExecutionContext := mocks.TaskExecutionContext{}
mockTaskExecutionContext.On("TaskReader").Return(mockTaskReader)

taskMetadata := &pluginsCoreMocks.TaskExecutionMetadata{}
taskMetadata.On("GetNamespace").Return("myproject-staging")
taskMetadata.On("GetLabels").Return(map[string]string{"sample": "label"})
mockTaskExecutionContext.On("TaskExecutionMetadata").Return(taskMetadata)

query, cluster, tags, timeout, err := GetQueryInfo(ctx, &mockTaskExecutionContext)
assert.NoError(t, err)
assert.Equal(t, "select 'one'", query)
assert.Equal(t, "default", cluster)
assert.Equal(t, []string{"flyte_plugin_test"}, tags)
assert.Equal(t, []string{"flyte_plugin_test", "ns:myproject-staging", "sample:label"}, tags)
assert.Equal(t, 500, int(timeout))
}

func TestValidateQuboleHiveJob(t *testing.T) {
hiveJob := plugins.QuboleHiveJob{
ClusterLabel: "default",
Tags: []string{"flyte_plugin_test"},
Tags: []string{"flyte_plugin_test", "sample:label"},
Query: nil,
}
err := validateQuboleHiveJob(hiveJob)
Expand Down

0 comments on commit 84df181

Please sign in to comment.