From 077d75d65c2f17c7b9e56382e8e5a69eea98111f Mon Sep 17 00:00:00 2001 From: Catalin Toda Date: Fri, 23 Oct 2020 13:29:08 -0700 Subject: [PATCH] Add workflowId and workflowExecutionId information --- go/tasks/plugins/hive/client/qubole_client.go | 14 ++++++++------ go/tasks/plugins/hive/execution_state.go | 13 ++++++++----- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/go/tasks/plugins/hive/client/qubole_client.go b/go/tasks/plugins/hive/client/qubole_client.go index 747337a36..2bb65c4e9 100644 --- a/go/tasks/plugins/hive/client/qubole_client.go +++ b/go/tasks/plugins/hive/client/qubole_client.go @@ -44,12 +44,14 @@ type QuboleCommandDetails struct { } type CommandMetadata struct { - TaskName string - Domain string - Project string - Labels map[string]string - AttemptNumber uint32 - MaxAttempts uint32 + TaskName string + Domain string + Project string + Labels map[string]string + AttemptNumber uint32 + MaxAttempts uint32 + WorkflowID string + WorkflowExecutionID string } // QuboleClient API Request Body, meant to be passed into JSON.marshal diff --git a/go/tasks/plugins/hive/execution_state.go b/go/tasks/plugins/hive/execution_state.go index e0c61f1d8..e5b08b346 100644 --- a/go/tasks/plugins/hive/execution_state.go +++ b/go/tasks/plugins/hive/execution_state.go @@ -7,6 +7,7 @@ import ( "time" "github.com/lyft/flytestdlib/cache" + "github.com/lyft/flytestdlib/contextutils" idlCore "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/plugins" @@ -344,11 +345,13 @@ func KickOffQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentSt taskExecutionIdentifier := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID() commandMetadata := client.CommandMetadata{TaskName: taskName, - Domain: taskExecutionIdentifier.GetTaskId().GetDomain(), - Project: taskExecutionIdentifier.GetNodeExecutionId().GetExecutionId().GetProject(), - Labels: tCtx.TaskExecutionMetadata().GetLabels(), - AttemptNumber: taskExecutionIdentifier.GetRetryAttempt(), - MaxAttempts: tCtx.TaskExecutionMetadata().GetMaxAttempts(), + Domain: taskExecutionIdentifier.GetNodeExecutionId().GetExecutionId().GetDomain(), + Project: taskExecutionIdentifier.GetNodeExecutionId().GetExecutionId().GetProject(), + Labels: tCtx.TaskExecutionMetadata().GetLabels(), + AttemptNumber: taskExecutionIdentifier.GetRetryAttempt(), + MaxAttempts: tCtx.TaskExecutionMetadata().GetMaxAttempts(), + WorkflowExecutionID: taskExecutionIdentifier.GetNodeExecutionId().GetExecutionId().GetName(), + WorkflowID: contextutils.Value(ctx, contextutils.WorkflowIDKey), } cmdDetails, err := quboleClient.ExecuteHiveCommand(ctx, query, timeoutSec,