From 2349d52bfb6dd96d7c92748338e9e38cbafff7ee Mon Sep 17 00:00:00 2001 From: catalinii Date: Wed, 28 Oct 2020 12:16:49 -0700 Subject: [PATCH] Add workflowId and workflowExecutionId information (#133) --- .../go/tasks/plugins/hive/client/qubole_client.go | 14 ++++++++------ .../go/tasks/plugins/hive/execution_state.go | 13 ++++++++----- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/hive/client/qubole_client.go b/flyteplugins/go/tasks/plugins/hive/client/qubole_client.go index 747337a36c..2bb65c4e95 100644 --- a/flyteplugins/go/tasks/plugins/hive/client/qubole_client.go +++ b/flyteplugins/go/tasks/plugins/hive/client/qubole_client.go @@ -44,12 +44,14 @@ type QuboleCommandDetails struct { } type CommandMetadata struct { - TaskName string - Domain string - Project string - Labels map[string]string - AttemptNumber uint32 - MaxAttempts uint32 + TaskName string + Domain string + Project string + Labels map[string]string + AttemptNumber uint32 + MaxAttempts uint32 + WorkflowID string + WorkflowExecutionID string } // QuboleClient API Request Body, meant to be passed into JSON.marshal diff --git a/flyteplugins/go/tasks/plugins/hive/execution_state.go b/flyteplugins/go/tasks/plugins/hive/execution_state.go index e0c61f1d8b..e5b08b3467 100644 --- a/flyteplugins/go/tasks/plugins/hive/execution_state.go +++ b/flyteplugins/go/tasks/plugins/hive/execution_state.go @@ -7,6 +7,7 @@ import ( "time" "github.com/lyft/flytestdlib/cache" + "github.com/lyft/flytestdlib/contextutils" idlCore "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/plugins" @@ -344,11 +345,13 @@ func KickOffQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentSt taskExecutionIdentifier := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID() commandMetadata := client.CommandMetadata{TaskName: taskName, - Domain: taskExecutionIdentifier.GetTaskId().GetDomain(), - Project: taskExecutionIdentifier.GetNodeExecutionId().GetExecutionId().GetProject(), - Labels: tCtx.TaskExecutionMetadata().GetLabels(), - AttemptNumber: taskExecutionIdentifier.GetRetryAttempt(), - MaxAttempts: tCtx.TaskExecutionMetadata().GetMaxAttempts(), + Domain: taskExecutionIdentifier.GetNodeExecutionId().GetExecutionId().GetDomain(), + Project: taskExecutionIdentifier.GetNodeExecutionId().GetExecutionId().GetProject(), + Labels: tCtx.TaskExecutionMetadata().GetLabels(), + AttemptNumber: taskExecutionIdentifier.GetRetryAttempt(), + MaxAttempts: tCtx.TaskExecutionMetadata().GetMaxAttempts(), + WorkflowExecutionID: taskExecutionIdentifier.GetNodeExecutionId().GetExecutionId().GetName(), + WorkflowID: contextutils.Value(ctx, contextutils.WorkflowIDKey), } cmdDetails, err := quboleClient.ExecuteHiveCommand(ctx, query, timeoutSec,