diff --git a/flytepropeller/config.yaml b/flytepropeller/config.yaml index 787c465d8..b086508ed 100644 --- a/flytepropeller/config.yaml +++ b/flytepropeller/config.yaml @@ -59,7 +59,7 @@ storage: access-key: minio auth-type: accesskey disable-ssl: true - endpoint: http://localhost:9000 + endpoint: http://localhost:30084 region: us-east-1 secret-key: miniostorage type: minio diff --git a/flytepropeller/pkg/controller/controller.go b/flytepropeller/pkg/controller/controller.go index 61996bded..8c01755f2 100644 --- a/flytepropeller/pkg/controller/controller.go +++ b/flytepropeller/pkg/controller/controller.go @@ -2,6 +2,11 @@ package controller import ( "context" + "runtime/pprof" + "time" + + "github.com/lyft/flytestdlib/contextutils" + "k8s.io/apimachinery/pkg/labels" stdErrs "github.com/lyft/flytestdlib/errors" @@ -33,11 +38,15 @@ import ( clientset "github.com/lyft/flytepropeller/pkg/client/clientset/versioned" flyteScheme "github.com/lyft/flytepropeller/pkg/client/clientset/versioned/scheme" informers "github.com/lyft/flytepropeller/pkg/client/informers/externalversions" + lister "github.com/lyft/flytepropeller/pkg/client/listers/flyteworkflow/v1alpha1" "github.com/lyft/flytepropeller/pkg/controller/nodes" "github.com/lyft/flytepropeller/pkg/controller/nodes/subworkflow/launchplan" "github.com/lyft/flytepropeller/pkg/controller/workflow" ) +const resourceLevelMonitorCycleDuration = 5 * time.Second +const missing = "missing" + type metrics struct { Scope promutils.Scope EnqueueCountWf prometheus.Counter @@ -57,6 +66,7 @@ type Controller struct { recorder record.EventRecorder metrics *metrics leaderElector *leaderelection.LeaderElector + levelMonitor *ResourceLevelMonitor } // Runs either as a leader -if configured- or as a standalone process. @@ -86,6 +96,9 @@ func (c *Controller) run(ctx context.Context) error { return err } + // Start the collector process + c.levelMonitor.RunCollector(ctx) + // Start the informer factories to begin populating the informer caches logger.Info(ctx, "Starting FlyteWorkflow controller") return c.workerPool.Run(ctx, c.numWorkers, c.flyteworkflowSynced) @@ -169,6 +182,101 @@ func (c *Controller) getWorkflowUpdatesHandler() cache.ResourceEventHandler { } } +// This object is responsible for emitting metrics that show the current number of Flyte workflows, cut by project and domain. +// It needs to be kicked off. The periodicity is not currently configurable because it seems unnecessary. It will also +// a timer measuring how long it takes to run each measurement cycle. +type ResourceLevelMonitor struct { + Scope promutils.Scope + + // Meta timer - this times each collection cycle to measure how long it takes to collect the levels GaugeVec below + CollectorTimer promutils.StopWatch + + // System Observability: This is a labeled gauge that emits the current number of FlyteWorkflow objects in the informer. It is used + // to monitor current levels. It currently only splits by project/domain, not workflow status. + levels *prometheus.GaugeVec + + // The thing that we want to measure the current levels of + lister lister.FlyteWorkflowLister +} + +func (r *ResourceLevelMonitor) countList(ctx context.Context, workflows []*v1alpha1.FlyteWorkflow) map[string]map[string]int { + // Map of Projects to Domains to counts + counts := map[string]map[string]int{} + + // Collect all workflow metrics + for _, wf := range workflows { + execID := wf.GetExecutionID() + var project string + var domain string + if execID.WorkflowExecutionIdentifier == nil { + logger.Warningf(ctx, "Workflow does not have an execution identifier! [%v]", wf) + project = missing + domain = missing + } else { + project = wf.ExecutionID.Project + domain = wf.ExecutionID.Domain + } + if _, ok := counts[project]; !ok { + counts[project] = map[string]int{} + } + counts[project][domain]++ + } + + return counts +} + +func (r *ResourceLevelMonitor) collect(ctx context.Context) { + // Emit gauges at both the project/domain level - aggregation to be handled by Prometheus + workflows, err := r.lister.List(labels.Everything()) + if err != nil { + logger.Errorf(ctx, "Error listing workflows when attempting to collect data for gauges %s", err) + } + + counts := r.countList(ctx, workflows) + + // Emit labeled metrics, for each project/domain combination. This can be aggregated later with Prometheus queries. + metricKeys := []contextutils.Key{contextutils.ProjectKey, contextutils.DomainKey} + for project, val := range counts { + for domain, num := range val { + tempContext := contextutils.WithProjectDomain(ctx, project, domain) + gauge, err := r.levels.GetMetricWith(contextutils.Values(tempContext, metricKeys...)) + if err != nil { + panic(err) + } + gauge.Set(float64(num)) + } + } +} + +func (r *ResourceLevelMonitor) RunCollector(ctx context.Context) { + ticker := time.NewTicker(resourceLevelMonitorCycleDuration) + collectorCtx := contextutils.WithGoroutineLabel(ctx, "resource-level-monitor") + + go func() { + pprof.SetGoroutineLabels(collectorCtx) + for { + select { + case <-collectorCtx.Done(): + return + case <-ticker.C: + t := r.CollectorTimer.Start() + r.collect(collectorCtx) + t.Stop() + } + } + }() +} + +func NewResourceLevelMonitor(scope promutils.Scope, lister lister.FlyteWorkflowLister) *ResourceLevelMonitor { + return &ResourceLevelMonitor{ + Scope: scope, + CollectorTimer: scope.MustNewStopWatch("collection_cycle", "Measures how long it takes to run a collection", time.Millisecond), + levels: scope.MustNewGaugeVec("flyteworkflow", "Current FlyteWorkflow levels", + contextutils.ProjectKey.String(), contextutils.DomainKey.String()), + lister: lister, + } +} + func newControllerMetrics(scope promutils.Scope) *metrics { c := scope.MustNewCounterVec("wf_enqueue", "workflow enqueue count.", "type") return &metrics{ @@ -298,6 +406,8 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize workflow store") } + controller.levelMonitor = NewResourceLevelMonitor(scope.NewSubScope("collector"), flyteworkflowInformer.Lister()) + nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, launchPlanActor, launchPlanActor, cfg.MaxDatasetSizeBytes, storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, scope) diff --git a/flytepropeller/pkg/controller/controller_test.go b/flytepropeller/pkg/controller/controller_test.go new file mode 100644 index 000000000..b2e295935 --- /dev/null +++ b/flytepropeller/pkg/controller/controller_test.go @@ -0,0 +1,83 @@ +package controller + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + listers "github.com/lyft/flytepropeller/pkg/client/listers/flyteworkflow/v1alpha1" + "github.com/lyft/flytestdlib/promutils" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/labels" +) + +var wfs = []*v1alpha1.FlyteWorkflow{ + { + ExecutionID: v1alpha1.ExecutionID{ + WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ + Project: "proj", + Domain: "dev", + Name: "name", + }, + }, + }, + { + ExecutionID: v1alpha1.ExecutionID{ + WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ + Project: "proj", + Domain: "dev", + Name: "name", + }, + }, + }, + { + ExecutionID: v1alpha1.ExecutionID{ + WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ + Project: "proj2", + Domain: "dev", + Name: "name", + }, + }, + }, +} + +func TestNewResourceLevelMonitor(t *testing.T) { + lm := ResourceLevelMonitor{} + res := lm.countList(context.Background(), wfs) + assert.Equal(t, 2, res["proj"]["dev"]) + assert.Equal(t, 1, res["proj2"]["dev"]) +} + +type mockWFLister struct { + listers.FlyteWorkflowLister +} + +func (m mockWFLister) List(_ labels.Selector) (ret []*v1alpha1.FlyteWorkflow, err error) { + return wfs, nil +} + +func TestResourceLevelMonitor_collect(t *testing.T) { + scope := promutils.NewScope("testscope") + g := scope.MustNewGaugeVec("unittest", "testing", "project", "domain") + lm := &ResourceLevelMonitor{ + Scope: scope, + CollectorTimer: scope.MustNewStopWatch("collection_cycle", "Measures how long it takes to run a collection", time.Millisecond), + levels: g, + lister: mockWFLister{}, + } + lm.collect(context.Background()) + + var expected = ` + # HELP testscope:unittest testing + # TYPE testscope:unittest gauge + testscope:unittest{domain="dev",project="proj"} 2 + testscope:unittest{domain="dev",project="proj2"} 1 + ` + + err := testutil.CollectAndCompare(g, strings.NewReader(expected)) + assert.NoError(t, err) +}