Skip to content

Commit

Permalink
Adding metrics for external Plugins - Hive, Batch and Presto (flyteor…
Browse files Browse the repository at this point in the history
…g#80)

* Adding metrics for external Plugins - Hive, Batch and Presto
* Fixes
  • Loading branch information
anandswaminathan authored Apr 23, 2020
1 parent 209c52d commit 7361bf6
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 41 deletions.
10 changes: 6 additions & 4 deletions go/tasks/plugins/array/awsbatch/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Executor struct {

outputAssembler array.OutputAssembler
errorAssembler array.OutputAssembler
metrics ExecutorMetrics
}

func (e Executor) GetID() string {
Expand Down Expand Up @@ -74,12 +75,12 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c
fallthrough

case arrayCore.PhaseLaunch:
pluginState, err = LaunchSubTasks(ctx, tCtx, e.jobStore, pluginConfig, pluginState)
pluginState, err = LaunchSubTasks(ctx, tCtx, e.jobStore, pluginConfig, pluginState, e.metrics)

case arrayCore.PhaseCheckingSubTaskExecutions:
pluginState, err = CheckSubTasksState(ctx, tCtx.TaskExecutionMetadata(),
tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(),
e.jobStore, tCtx.DataStore(), pluginConfig, pluginState)
e.jobStore, tCtx.DataStore(), pluginConfig, pluginState, e.metrics)

case arrayCore.PhaseAssembleFinalOutput:
pluginState.State, err = array.AssembleFinalOutputs(ctx, e.outputAssembler, tCtx, arrayCore.PhaseSuccess, pluginState.State)
Expand Down Expand Up @@ -120,11 +121,11 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c
}

func (e Executor) Abort(ctx context.Context, tCtx core.TaskExecutionContext) error {
return TerminateSubTasks(ctx, tCtx, e.jobStore.Client, "Aborted")
return TerminateSubTasks(ctx, tCtx, e.jobStore.Client, "Aborted", e.metrics)
}

func (e Executor) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error {
return TerminateSubTasks(ctx, tCtx, e.jobStore.Client, "Finalized")
return TerminateSubTasks(ctx, tCtx, e.jobStore.Client, "Finalized", e.metrics)
}

func NewExecutor(ctx context.Context, awsClient aws.Client, cfg *batchConfig.Config,
Expand Down Expand Up @@ -164,6 +165,7 @@ func NewExecutor(ctx context.Context, awsClient aws.Client, cfg *batchConfig.Con
jobDefinitionCache: definition.NewCache(cfg.JobDefCacheSize),
outputAssembler: outputAssembler,
errorAssembler: errorAssembler,
metrics: getAwsBatchExecutorMetrics(scope.NewSubScope("awsbatch")),
}, nil
}

Expand Down
28 changes: 28 additions & 0 deletions go/tasks/plugins/array/awsbatch/executor_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package awsbatch

import (
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/promutils/labeled"
)

type ExecutorMetrics struct {
Scope promutils.Scope
SubTasksSubmitted labeled.Counter
SubTasksSucceeded labeled.Counter
SubTasksFailed labeled.Counter
BatchJobTerminated labeled.Counter
}

func getAwsBatchExecutorMetrics(scope promutils.Scope) ExecutorMetrics {
return ExecutorMetrics{
Scope: scope,
SubTasksSubmitted: labeled.NewCounter("sub_task_submitted",
"Sub tasks submitted", scope),
SubTasksSucceeded: labeled.NewCounter("batch_task_success",
"Batch tasks successful", scope),
SubTasksFailed: labeled.NewCounter("batch_task_failure",
"Batch tasks failure", scope),
BatchJobTerminated: labeled.NewCounter("batch_job_terminated",
"Batch job terminated", scope),
}
}
17 changes: 12 additions & 5 deletions go/tasks/plugins/array/awsbatch/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
)

func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchClient Client, pluginConfig *config.Config,
currentState *State) (nextState *State, err error) {

currentState *State, metrics ExecutorMetrics) (nextState *State, err error) {
logger.Debugf(ctx, "Entering LaunchSubTasks ")
size := currentState.GetExecutionArraySize()
if int64(currentState.GetExecutionArraySize()) > pluginConfig.MaxArrayJobSize {
ee := fmt.Errorf("array size > max allowed. Requested [%v]. Allowed [%v]", currentState.GetExecutionArraySize(), pluginConfig.MaxArrayJobSize)
logger.Info(ctx, ee)
Expand All @@ -35,7 +36,6 @@ func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchCl
return nil, err
}

size := currentState.GetExecutionArraySize()
t, err := tCtx.TaskReader().Read(ctx)
if err != nil {
return nil, err
Expand All @@ -53,6 +53,9 @@ func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchCl
return nil, err
}

metrics.SubTasksSubmitted.Add(ctx, float64(size))
logger.Debugf(ctx, "BatchTasks submitted")

parentState := currentState.
SetPhase(arrayCore.PhaseCheckingSubTaskExecutions, 0).
SetArrayStatus(arraystatus.ArrayStatus{
Expand All @@ -71,7 +74,7 @@ func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchCl

// Attempts to terminate the AWS Job if one is recorded in the pluginState. This API is idempotent and should be safe
// to call multiple times on the same job. It'll result in multiple calls to AWS Batch in that case, however.
func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchClient Client, reason string) error {
func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchClient Client, reason string, metrics ExecutorMetrics) error {
pluginState := &State{}
if _, err := tCtx.PluginStateReader().Get(pluginState); err != nil {
return errors.Wrapf(errors.CorruptedPluginState, err, "Failed to unmarshal custom state")
Expand All @@ -89,7 +92,11 @@ func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batc
if pluginState.GetExternalJobID() != nil {
jobID := *pluginState.GetExternalJobID()
logger.Infof(ctx, "Cancelling AWS Job [%v] because [%v].", jobID, reason)
return batchClient.TerminateJob(ctx, jobID, reason)
err := batchClient.TerminateJob(ctx, jobID, reason)
if err != nil {
return err
}
metrics.BatchJobTerminated.Inc(ctx)
}

return nil
Expand Down
6 changes: 4 additions & 2 deletions go/tasks/plugins/array/awsbatch/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package awsbatch
import (
"testing"

"github.com/lyft/flytestdlib/promutils"

"github.com/stretchr/testify/mock"

"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -122,7 +124,7 @@ func TestLaunchSubTasks(t *testing.T) {
JobDefinitionArn: "arn",
}

newState, err := LaunchSubTasks(context.TODO(), tCtx, batchClient, &config.Config{MaxArrayJobSize: 10}, currentState)
newState, err := LaunchSubTasks(context.TODO(), tCtx, batchClient, &config.Config{MaxArrayJobSize: 10}, currentState, getAwsBatchExecutorMetrics(promutils.NewTestScope()))
assert.NoError(t, err)
assertEqual(t, expectedState, newState)
})
Expand All @@ -143,7 +145,7 @@ func TestTerminateSubTasks(t *testing.T) {
batchClient.OnTerminateJob(ctx, "abc-123", "Test terminate").Return(nil).Once()

t.Run("Simple", func(t *testing.T) {
assert.NoError(t, TerminateSubTasks(ctx, tCtx, batchClient, "Test terminate"))
assert.NoError(t, TerminateSubTasks(ctx, tCtx, batchClient, "Test terminate", getAwsBatchExecutorMetrics(promutils.NewTestScope())))
})

batchClient.AssertExpectations(t)
Expand Down
12 changes: 8 additions & 4 deletions go/tasks/plugins/array/awsbatch/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@ func createSubJobList(count int) []*Job {
}

func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata, outputPrefix, baseOutputSandbox storage.DataReference, jobStore *JobStore,
dataStore *storage.DataStore, cfg *config.Config, currentState *State) (newState *State, err error) {

dataStore *storage.DataStore, cfg *config.Config, currentState *State, metrics ExecutorMetrics) (newState *State, err error) {
logger.Debugf(ctx, "Entering CheckSubTasksState ")
newState = currentState
parentState := currentState.State

jobName := taskMeta.GetTaskExecutionID().GetGeneratedName()
job := jobStore.Get(jobName)
// If job isn't currently being monitored (recovering from a restart?), add it to the sync-cache and return
Expand Down Expand Up @@ -115,11 +114,16 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata
parentState = parentState.SetArrayStatus(newArrayStatus)
// Based on the summary produced above, deduce the overall phase of the task.
phase := arrayCore.SummaryToPhase(ctx, currentState.GetOriginalMinSuccesses()-currentState.GetOriginalArraySize()+int64(currentState.GetExecutionArraySize()), newArrayStatus.Summary)

if phase != arrayCore.PhaseCheckingSubTaskExecutions {
metrics.SubTasksSucceeded.Add(ctx, float64(newArrayStatus.Summary[core.PhaseSuccess]))
totalFailed := newArrayStatus.Summary[core.PhasePermanentFailure] + newArrayStatus.Summary[core.PhaseRetryableFailure]
metrics.SubTasksFailed.Add(ctx, float64(totalFailed))
}
if phase == arrayCore.PhaseWriteToDiscoveryThenFail {
errorMsg := msg.Summary(cfg.MaxErrorStringLength)
parentState = parentState.SetReason(errorMsg)
}

if phase == arrayCore.PhaseCheckingSubTaskExecutions {
newPhaseVersion := uint32(0)
// For now, the only changes to PhaseVersion and PreviousSummary occur for running array jobs.
Expand Down
6 changes: 3 additions & 3 deletions go/tasks/plugins/array/awsbatch/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestCheckSubTasksState(t *testing.T) {
},
ExternalJobID: refStr("job-id"),
JobDefinitionArn: "",
})
}, getAwsBatchExecutorMetrics(promutils.NewTestScope()))

assert.NoError(t, err)
p, _ := newState.GetPhase()
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestCheckSubTasksState(t *testing.T) {
},
ExternalJobID: refStr("job-id"),
JobDefinitionArn: "",
})
}, getAwsBatchExecutorMetrics(promutils.NewTestScope()))

assert.NoError(t, err)
p, _ := newState.GetPhase()
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestCheckSubTasksState(t *testing.T) {
},
ExternalJobID: refStr("job-id"),
JobDefinitionArn: "",
})
}, getAwsBatchExecutorMetrics(promutils.NewTestScope()))

assert.NoError(t, err)
p, _ := newState.GetPhase()
Expand Down
7 changes: 6 additions & 1 deletion go/tasks/plugins/hive/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,13 @@ func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext, cur
metric.ResourceWaitTime.Observe(waitTime.Seconds())

if allocationStatus == core.AllocationStatusGranted {
metric.AllocationGranted.Inc(ctx)
newState.Phase = PhaseQueued
} else if allocationStatus == core.AllocationStatusExhausted {
metric.AllocationNotGranted.Inc(ctx)
newState.Phase = PhaseNotStarted
} else if allocationStatus == core.AllocationStatusNamespaceQuotaExceeded {
metric.AllocationNotGranted.Inc(ctx)
newState.Phase = PhaseNotStarted
} else {
return newState, errors.Errorf(errors.ResourceManagerFailure, "Got bad allocation result [%s] for token [%s]",
Expand Down Expand Up @@ -411,7 +414,7 @@ func Abort(ctx context.Context, tCtx core.TaskExecutionContext, currentState Exe
return nil
}

func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionState) error {
func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionState, metrics QuboleHiveExecutorMetrics) error {
// Release allocation token
uniqueID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
clusterPrimaryLabel, err := composeResourceNamespaceWithClusterPrimaryLabel(ctx, tCtx)
Expand All @@ -422,9 +425,11 @@ func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionSt
err = tCtx.ResourceManager().ReleaseResource(ctx, clusterPrimaryLabel, uniqueID)

if err != nil {
metrics.ResourceReleaseFailed.Inc(ctx)
logger.Errorf(ctx, "Error releasing allocation token [%s] in Finalize [%s]", uniqueID, err)
return err
}
metrics.ResourceReleased.Inc(ctx)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/hive/execution_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestFinalize(t *testing.T) {
called = true
}).Return(nil)

err := Finalize(ctx, tCtx, state)
err := Finalize(ctx, tCtx, state, getQuboleHiveExecutorMetrics(promutils.NewTestScope()))
assert.NoError(t, err)
assert.True(t, called)
}
Expand Down
4 changes: 2 additions & 2 deletions go/tasks/plugins/hive/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (q QuboleHiveExecutor) Finalize(ctx context.Context, tCtx core.TaskExecutio
return errors.Wrapf(errors.CorruptedPluginState, err, "Failed to unmarshal custom state in Finalize")
}

return Finalize(ctx, tCtx, incomingState)
return Finalize(ctx, tCtx, incomingState, q.metrics)
}

func (q QuboleHiveExecutor) GetProperties() core.PluginProperties {
Expand Down Expand Up @@ -150,7 +150,7 @@ func NewQuboleHiveExecutor(ctx context.Context, cfg *config.Config, quboleClient
return QuboleHiveExecutor{
id: quboleHiveExecutorID,
cfg: cfg,
metrics: getQuboleHiveExecutorMetrics(scope),
metrics: getQuboleHiveExecutorMetrics(scope.NewSubScope("hive")),
quboleClient: quboleClient,
executionsCache: executionsAutoRefreshCache,
}, nil
Expand Down
17 changes: 10 additions & 7 deletions go/tasks/plugins/hive/executor_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (

type QuboleHiveExecutorMetrics struct {
Scope promutils.Scope
ReleaseResourceFailed labeled.Counter
ResourceReleased labeled.Counter
ResourceReleaseFailed labeled.Counter
AllocationGranted labeled.Counter
AllocationNotGranted labeled.Counter
ResourceWaitTime prometheus.Summary
Expand All @@ -21,12 +22,14 @@ var (
func getQuboleHiveExecutorMetrics(scope promutils.Scope) QuboleHiveExecutorMetrics {
return QuboleHiveExecutorMetrics{
Scope: scope,
ReleaseResourceFailed: labeled.NewCounter("released_resource_failed",
"Error releasing allocation token", scope),
AllocationGranted: labeled.NewCounter("allocation_granted",
"Allocation request granted", scope),
AllocationNotGranted: labeled.NewCounter("allocation_not_granted",
"Allocation request did not fail but not granted", scope),
ResourceReleased: labeled.NewCounter("resource_release_success",
"Resource allocation token released", scope, labeled.EmitUnlabeledMetric),
ResourceReleaseFailed: labeled.NewCounter("resource_release_failed",
"Error releasing allocation token", scope, labeled.EmitUnlabeledMetric),
AllocationGranted: labeled.NewCounter("allocation_grant_success",
"Allocation request granted", scope, labeled.EmitUnlabeledMetric),
AllocationNotGranted: labeled.NewCounter("allocation_grant_failed",
"Allocation request did not fail but not granted", scope, labeled.EmitUnlabeledMetric),
ResourceWaitTime: scope.MustNewSummaryWithOptions("resource_wait_time", "Duration the execution has been waiting for a resource allocation token",
promutils.SummaryOptions{Objectives: tokenAgeObjectives}),
}
Expand Down
7 changes: 6 additions & 1 deletion go/tasks/plugins/presto/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,13 @@ func GetAllocationToken(
}

if allocationStatus == core.AllocationStatusGranted {
metric.AllocationGranted.Inc(ctx)
newState.CurrentPhase = PhaseQueued
} else if allocationStatus == core.AllocationStatusExhausted {
metric.AllocationNotGranted.Inc(ctx)
newState.CurrentPhase = PhaseNotStarted
} else if allocationStatus == core.AllocationStatusNamespaceQuotaExceeded {
metric.AllocationNotGranted.Inc(ctx)
newState.CurrentPhase = PhaseNotStarted
} else {
return newState, errors.Errorf(errors.ResourceManagerFailure, "Got bad allocation result [%s] for token [%s]",
Expand Down Expand Up @@ -532,7 +535,7 @@ func Abort(ctx context.Context, currentState ExecutionState, client client.Prest
return nil
}

func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionState) error {
func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionState, metrics ExecutorMetrics) error {
// Release allocation token
uniqueID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
routingGroup, err := composeResourceNamespaceWithRoutingGroup(ctx, tCtx)
Expand All @@ -543,9 +546,11 @@ func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionSt
err = tCtx.ResourceManager().ReleaseResource(ctx, routingGroup, uniqueID)

if err != nil {
metrics.ResourceReleaseFailed.Inc(ctx)
logger.Errorf(ctx, "Error releasing allocation token [%s] in Finalize [%s]", uniqueID, err)
return err
}
metrics.ResourceReleased.Inc(ctx)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/presto/execution_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func TestFinalize(t *testing.T) {
called = true
}).Return(nil)

err := Finalize(ctx, tCtx, state)
err := Finalize(ctx, tCtx, state, getPrestoExecutorMetrics(promutils.NewTestScope()))
assert.NoError(t, err)
assert.True(t, called)
}
Expand Down
7 changes: 4 additions & 3 deletions go/tasks/plugins/presto/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (p Executor) Finalize(ctx context.Context, tCtx core.TaskExecutionContext)
return errors.Wrapf(errors.CorruptedPluginState, err, "Failed to unmarshal custom state in Finalize")
}

return Finalize(ctx, tCtx, incomingState)
return Finalize(ctx, tCtx, incomingState, p.metrics)
}

func (p Executor) GetProperties() core.PluginProperties {
Expand Down Expand Up @@ -125,7 +125,8 @@ func NewPrestoExecutor(
cfg *config.Config,
prestoClient client.PrestoClient,
scope promutils.Scope) (Executor, error) {
executionsAutoRefreshCache, err := NewPrestoExecutionsCache(ctx, prestoClient, cfg, scope.NewSubScope(prestoTaskType))
subScope := scope.NewSubScope(prestoTaskType)
executionsAutoRefreshCache, err := NewPrestoExecutionsCache(ctx, prestoClient, cfg, subScope)
if err != nil {
logger.Errorf(ctx, "Failed to create AutoRefreshCache in Executor Setup. Error: %v", err)
return Executor{}, err
Expand All @@ -139,7 +140,7 @@ func NewPrestoExecutor(
return Executor{
id: prestoPluginID,
cfg: cfg,
metrics: getPrestoExecutorMetrics(scope),
metrics: getPrestoExecutorMetrics(subScope),
prestoClient: prestoClient,
executionsCache: executionsAutoRefreshCache,
}, nil
Expand Down
Loading

0 comments on commit 7361bf6

Please sign in to comment.