Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Current level gauge for FlyteWorkflows #132

Merged
merged 12 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ storage:
access-key: minio
auth-type: accesskey
disable-ssl: true
endpoint: http://localhost:9000
endpoint: http://localhost:30084
region: us-east-1
secret-key: miniostorage
type: minio
Expand Down
110 changes: 110 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package controller

import (
"context"
"runtime/pprof"
"time"

"github.com/lyft/flytestdlib/contextutils"
"k8s.io/apimachinery/pkg/labels"

stdErrs "github.com/lyft/flytestdlib/errors"

Expand Down Expand Up @@ -33,11 +38,15 @@ import (
clientset "github.com/lyft/flytepropeller/pkg/client/clientset/versioned"
flyteScheme "github.com/lyft/flytepropeller/pkg/client/clientset/versioned/scheme"
informers "github.com/lyft/flytepropeller/pkg/client/informers/externalversions"
lister "github.com/lyft/flytepropeller/pkg/client/listers/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/controller/nodes"
"github.com/lyft/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
"github.com/lyft/flytepropeller/pkg/controller/workflow"
)

const resourceLevelMonitorCycleDuration = 5 * time.Second
const missing = "missing"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:D nit: missingProject ? or something descriptive


type metrics struct {
Scope promutils.Scope
EnqueueCountWf prometheus.Counter
Expand All @@ -57,6 +66,7 @@ type Controller struct {
recorder record.EventRecorder
metrics *metrics
leaderElector *leaderelection.LeaderElector
levelMonitor *ResourceLevelMonitor
}

// Runs either as a leader -if configured- or as a standalone process.
Expand Down Expand Up @@ -86,6 +96,9 @@ func (c *Controller) run(ctx context.Context) error {
return err
}

// Start the collector process
c.levelMonitor.RunCollector(ctx)

// Start the informer factories to begin populating the informer caches
logger.Info(ctx, "Starting FlyteWorkflow controller")
return c.workerPool.Run(ctx, c.numWorkers, c.flyteworkflowSynced)
Expand Down Expand Up @@ -169,6 +182,101 @@ func (c *Controller) getWorkflowUpdatesHandler() cache.ResourceEventHandler {
}
}

// This object is responsible for emitting metrics that show the current number of Flyte workflows, cut by project and domain.
// It needs to be kicked off. The periodicity is not currently configurable because it seems unnecessary. It will also
// a timer measuring how long it takes to run each measurement cycle.
type ResourceLevelMonitor struct {
Scope promutils.Scope

// Meta timer - this times each collection cycle to measure how long it takes to collect the levels GaugeVec below
CollectorTimer promutils.StopWatch

// System Observability: This is a labeled gauge that emits the current number of FlyteWorkflow objects in the informer. It is used
// to monitor current levels. It currently only splits by project/domain, not workflow status.
levels *prometheus.GaugeVec

// The thing that we want to measure the current levels of
lister lister.FlyteWorkflowLister
}

func (r *ResourceLevelMonitor) countList(ctx context.Context, workflows []*v1alpha1.FlyteWorkflow) map[string]map[string]int {
// Map of Projects to Domains to counts
counts := map[string]map[string]int{}

// Collect all workflow metrics
for _, wf := range workflows {
execID := wf.GetExecutionID()
var project string
var domain string
if execID.WorkflowExecutionIdentifier == nil {
logger.Warningf(ctx, "Workflow does not have an execution identifier! [%v]", wf)
wild-endeavor marked this conversation as resolved.
Show resolved Hide resolved
project = missing
domain = missing
} else {
project = wf.ExecutionID.Project
domain = wf.ExecutionID.Domain
}
if _, ok := counts[project]; !ok {
counts[project] = map[string]int{}
}
counts[project][domain]++
}

return counts
}

func (r *ResourceLevelMonitor) collect(ctx context.Context) {
// Emit gauges at both the project/domain level - aggregation to be handled by Prometheus
workflows, err := r.lister.List(labels.Everything())
if err != nil {
logger.Errorf(ctx, "Error listing workflows when attempting to collect data for gauges %s", err)
}

counts := r.countList(ctx, workflows)

// Emit labeled metrics, for each project/domain combination. This can be aggregated later with Prometheus queries.
metricKeys := []contextutils.Key{contextutils.ProjectKey, contextutils.DomainKey}
wild-endeavor marked this conversation as resolved.
Show resolved Hide resolved
for project, val := range counts {
for domain, num := range val {
tempContext := contextutils.WithProjectDomain(ctx, project, domain)
gauge, err := r.levels.GetMetricWith(contextutils.Values(tempContext, metricKeys...))
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
panic(err)
}
gauge.Set(float64(num))
}
}
}

func (r *ResourceLevelMonitor) RunCollector(ctx context.Context) {
ticker := time.NewTicker(resourceLevelMonitorCycleDuration)
collectorCtx := contextutils.WithGoroutineLabel(ctx, "resource-level-monitor")

go func() {
pprof.SetGoroutineLabels(collectorCtx)
for {
select {
case <-collectorCtx.Done():
return
case <-ticker.C:
t := r.CollectorTimer.Start()
r.collect(collectorCtx)
t.Stop()
}
}
}()
}

func NewResourceLevelMonitor(scope promutils.Scope, lister lister.FlyteWorkflowLister) *ResourceLevelMonitor {
return &ResourceLevelMonitor{
Scope: scope,
CollectorTimer: scope.MustNewStopWatch("collection_cycle", "Measures how long it takes to run a collection", time.Millisecond),
levels: scope.MustNewGaugeVec("flyteworkflow", "Current FlyteWorkflow levels",
contextutils.ProjectKey.String(), contextutils.DomainKey.String()),
lister: lister,
}
}

func newControllerMetrics(scope promutils.Scope) *metrics {
c := scope.MustNewCounterVec("wf_enqueue", "workflow enqueue count.", "type")
return &metrics{
Expand Down Expand Up @@ -298,6 +406,8 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize workflow store")
}

controller.levelMonitor = NewResourceLevelMonitor(scope.NewSubScope("collector"), flyteworkflowInformer.Lister())

nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink,
launchPlanActor, launchPlanActor, cfg.MaxDatasetSizeBytes,
storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, scope)
Expand Down
83 changes: 83 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package controller

import (
"context"
"strings"
"testing"
"time"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
listers "github.com/lyft/flytepropeller/pkg/client/listers/flyteworkflow/v1alpha1"
"github.com/lyft/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/labels"
)

var wfs = []*v1alpha1.FlyteWorkflow{
{
ExecutionID: v1alpha1.ExecutionID{
WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{
Project: "proj",
Domain: "dev",
Name: "name",
},
},
},
{
ExecutionID: v1alpha1.ExecutionID{
WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{
Project: "proj",
Domain: "dev",
Name: "name",
},
},
},
{
ExecutionID: v1alpha1.ExecutionID{
WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{
Project: "proj2",
Domain: "dev",
Name: "name",
},
},
},
}

func TestNewResourceLevelMonitor(t *testing.T) {
lm := ResourceLevelMonitor{}
res := lm.countList(context.Background(), wfs)
assert.Equal(t, 2, res["proj"]["dev"])
assert.Equal(t, 1, res["proj2"]["dev"])
}

type mockWFLister struct {
listers.FlyteWorkflowLister
}

func (m mockWFLister) List(_ labels.Selector) (ret []*v1alpha1.FlyteWorkflow, err error) {
return wfs, nil
}

func TestResourceLevelMonitor_collect(t *testing.T) {
scope := promutils.NewScope("testscope")
g := scope.MustNewGaugeVec("unittest", "testing", "project", "domain")
lm := &ResourceLevelMonitor{
Scope: scope,
CollectorTimer: scope.MustNewStopWatch("collection_cycle", "Measures how long it takes to run a collection", time.Millisecond),
levels: g,
lister: mockWFLister{},
}
lm.collect(context.Background())

var expected = `
# HELP testscope:unittest testing
# TYPE testscope:unittest gauge
testscope:unittest{domain="dev",project="proj"} 2
testscope:unittest{domain="dev",project="proj2"} 1
`

err := testutil.CollectAndCompare(g, strings.NewReader(expected))
assert.NoError(t, err)
}