Skip to content

Commit

Permalink
Add workflowId and workflowExecutionId information (flyteorg#133)
Browse files Browse the repository at this point in the history
  • Loading branch information
catalinii authored Oct 28, 2020
1 parent 45be92a commit 2349d52
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
14 changes: 8 additions & 6 deletions flyteplugins/go/tasks/plugins/hive/client/qubole_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ type QuboleCommandDetails struct {
}

type CommandMetadata struct {
TaskName string
Domain string
Project string
Labels map[string]string
AttemptNumber uint32
MaxAttempts uint32
TaskName string
Domain string
Project string
Labels map[string]string
AttemptNumber uint32
MaxAttempts uint32
WorkflowID string
WorkflowExecutionID string
}

// QuboleClient API Request Body, meant to be passed into JSON.marshal
Expand Down
13 changes: 8 additions & 5 deletions flyteplugins/go/tasks/plugins/hive/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/lyft/flytestdlib/cache"
"github.com/lyft/flytestdlib/contextutils"

idlCore "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/plugins"
Expand Down Expand Up @@ -344,11 +345,13 @@ func KickOffQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentSt

taskExecutionIdentifier := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID()
commandMetadata := client.CommandMetadata{TaskName: taskName,
Domain: taskExecutionIdentifier.GetTaskId().GetDomain(),
Project: taskExecutionIdentifier.GetNodeExecutionId().GetExecutionId().GetProject(),
Labels: tCtx.TaskExecutionMetadata().GetLabels(),
AttemptNumber: taskExecutionIdentifier.GetRetryAttempt(),
MaxAttempts: tCtx.TaskExecutionMetadata().GetMaxAttempts(),
Domain: taskExecutionIdentifier.GetNodeExecutionId().GetExecutionId().GetDomain(),
Project: taskExecutionIdentifier.GetNodeExecutionId().GetExecutionId().GetProject(),
Labels: tCtx.TaskExecutionMetadata().GetLabels(),
AttemptNumber: taskExecutionIdentifier.GetRetryAttempt(),
MaxAttempts: tCtx.TaskExecutionMetadata().GetMaxAttempts(),
WorkflowExecutionID: taskExecutionIdentifier.GetNodeExecutionId().GetExecutionId().GetName(),
WorkflowID: contextutils.Value(ctx, contextutils.WorkflowIDKey),
}

cmdDetails, err := quboleClient.ExecuteHiveCommand(ctx, query, timeoutSec,
Expand Down

0 comments on commit 2349d52

Please sign in to comment.