diff --git a/Gopkg.lock b/Gopkg.lock index b407a37eee28..bd09d53d4872 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,12 +2,12 @@ [[projects]] - digest = "1:2b10b9a545074605403d32baf9dda24b7582976ba7e9b46c4c7b9da9edac03e7" + digest = "1:8b95956b70e181b19025c7ba3578fdfd8efbec4ce916490700488afb9218972c" name = "cloud.google.com/go" packages = ["compute/metadata"] pruneopts = "" - revision = "aad3f485ee528456e0768f20397b4d9dd941e755" - version = "v0.25.0" + revision = "64a2037ec6be8a4b0c1d1f706ed35b428b989239" + version = "v0.26.0" [[projects]] digest = "1:b9660f5e3522b899d32b1f9bb98056203d6f76f673e1843eaa00869330103ba5" @@ -33,17 +33,18 @@ revision = "de5bf2ad457846296e2031421a34e2568e304e35" [[projects]] - digest = "1:cc7677c0448a5faa80a464041d4247016f1641740af3c3bcba1cd0640bbad636" + digest = "1:b6b8cfb7d169d6ce827f955034d251b907d3a56f453f35c4e9c7dd5778fdecd5" name = "github.com/argoproj/argo-cd" packages = [ "errors", "util/cache", "util/diff", + "util/json", "util/kube", ] pruneopts = "" - revision = "08c63ec234561516ef77ce9672e3bee683998571" - version = "v0.6.2" + revision = "da7be2e3cae13320e9714beba644aec24dc7c0fa" + version = "v0.7.1" [[projects]] branch = "master" @@ -57,6 +58,14 @@ pruneopts = "" revision = "881057947d921c5d62af84ad15cd3c6fb36d6077" +[[projects]] + branch = "master" + digest = "1:c0bec5f9b98d0bc872ff5e834fac186b807b656683bd29cb82fb207a1513fabb" + name = "github.com/beorn7/perks" + packages = ["quantile"] + pruneopts = "" + revision = "3a771d992973f24aa725d07868b467d1ddfceafb" + [[projects]] digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b" name = "github.com/davecgh/go-spew" @@ -204,12 +213,12 @@ version = "v6.13.2" [[projects]] - digest = "1:94ab904da3a528888677f39d32f9625ea8b2d639146af9c79ff97bda3e8554ae" + digest = "1:8ac51c6e631370a05c2fb01715168a3a9f76572b275d85f07c7fc89f5e1782e8" name = "github.com/gobuffalo/packr" packages = ["."] pruneopts = "" - revision = "147bee9cde84aeca6693d1b1aedc595298f24d5b" - version = "v1.12.0" + revision = "9e76dbe10b00dd1fa8f29cfa4fac72297beb32a4" + version = "v1.12.1" [[projects]] digest = "1:6e73003ecd35f4487a5e88270d3ca0a81bc80dc88053ac7e4dcfec5fba30d918" @@ -234,10 +243,12 @@ digest = "1:f958a1c137db276e52f0b50efee41a1a389dcdded59a69711f3e872757dab34b" name = "github.com/golang/protobuf" packages = [ + "jsonpb", "proto", "ptypes", "ptypes/any", "ptypes/duration", + "ptypes/struct", "ptypes/timestamp", ] pruneopts = "" @@ -264,6 +275,18 @@ revision = "7c663266750e7d82587642f65e60bc4083f1f84e" version = "v0.2.0" +[[projects]] + digest = "1:b563eec078077ba5cedc795462cbd6d7c75a106a4d2e3d02940093c80e28de28" + name = "github.com/grpc-ecosystem/grpc-gateway" + packages = [ + "runtime", + "runtime/internal", + "utilities", + ] + pruneopts = "" + revision = "92583770e3f01b09a0d3e9bdf64321d8bebd48f2" + version = "v1.4.1" + [[projects]] branch = "master" digest = "1:139bdc2c89779b8ff8b1150be28f889b0ed964e6da96f32cbc9035bd4642881c" @@ -292,12 +315,12 @@ revision = "bf9dde6d0d2c004a008c27aaee91170c786f6db8" [[projects]] - digest = "1:302c6eb8e669c997bec516a138b8fc496018faa1ece4c13e445a2749fbe079bb" + digest = "1:7ab38c15bd21e056e3115c8b526d201eaf74e0308da9370997c6b3c187115d36" name = "github.com/imdario/mergo" packages = ["."] pruneopts = "" - revision = "9316a62528ac99aaecb4e47eadd6dc8aa6533d58" - version = "v0.3.5" + revision = "9f23e2d6bd2a77f959b2bf6acdbefd708a83a4a4" + version = "v0.3.6" [[projects]] digest = "1:870d441fe217b8e689d7949fef6e43efbc787e50f200cb1e70dbca9204a1d6be" @@ -324,12 +347,12 @@ revision = "8eab2debe79d12b7bd3d10653910df25fa9552ba" [[projects]] - digest = "1:53ac4e911e12dde0ab68655e2006449d207a5a681f084974da2b06e5dbeaca72" + digest = "1:b79fc583e4dc7055ed86742e22164ac41bf8c0940722dbcb600f1a3ace1a8cb5" name = "github.com/json-iterator/go" packages = ["."] pruneopts = "" - revision = "ab8a2e0c74be9d3be70b3184d9acc634935ded82" - version = "1.1.4" + revision = "1624edc4454b8682399def8740d46db5e4362ba4" + version = "1.1.5" [[projects]] digest = "1:7fe04787f53bb61c1ba9c659b1a90ee3da16b4d6a1c41566bcb5077efbd30f97" @@ -341,7 +364,7 @@ [[projects]] branch = "master" - digest = "1:a08d7acad9a44f9aff9571a4ed9e5661169650ef3e8aaccb2086d73cfd6cb984" + digest = "1:e977ed7b0619844e394c4e725d008ade0840f1882c500a66e797b98bde70cf87" name = "github.com/mailru/easyjson" packages = [ "buffer", @@ -349,7 +372,15 @@ "jwriter", ] pruneopts = "" - revision = "d5012789d6659eeed305f54c1b1542e7b65829e6" + revision = "03f2033d19d5860aef995fe360ac7d395cd8ce65" + +[[projects]] + digest = "1:63722a4b1e1717be7b98fc686e0b30d5e7f734b9e93d7dee86293b6deab7ea28" + name = "github.com/matttproud/golang_protobuf_extensions" + packages = ["pbutil"] + pruneopts = "" + revision = "c12348ce28de40eed0136aa2b644d0ee0650e56c" + version = "v1.0.1" [[projects]] branch = "master" @@ -377,11 +408,11 @@ [[projects]] branch = "master" - digest = "1:99651e95333755cbe5c9768c1b80031300acca64a80870b40309202b32585a5a" + digest = "1:83854f6b1d2ce047b69657e3a87ba7602f4c5505e8bdfd02ab857db8e983bde1" name = "github.com/mitchellh/go-homedir" packages = ["."] pruneopts = "" - revision = "3864e76763d94a6df2f9960b16a20a33da9f9a66" + revision = "58046073cbffe2f25d425fe1331102f55cf719de" [[projects]] digest = "1:0c0ff2a89c1bb0d01887e1dac043ad7efbf3ec77482ef058ac423d13497e16fd" @@ -431,6 +462,50 @@ revision = "792786c7400a136282c1664665ae0a8db921c6c2" version = "v1.0.0" +[[projects]] + digest = "1:4142d94383572e74b42352273652c62afec5b23f325222ed09198f46009022d1" + name = "github.com/prometheus/client_golang" + packages = [ + "prometheus", + "prometheus/promhttp", + ] + pruneopts = "" + revision = "c5b7fccd204277076155f10851dad72b76a49317" + version = "v0.8.0" + +[[projects]] + branch = "master" + digest = "1:185cf55b1f44a1bf243558901c3f06efa5c64ba62cfdcbb1bf7bbe8c3fb68561" + name = "github.com/prometheus/client_model" + packages = ["go"] + pruneopts = "" + revision = "5c3871d89910bfb32f5fcab2aa4b9ec68e65a99f" + +[[projects]] + branch = "master" + digest = "1:f477ef7b65d94fb17574fc6548cef0c99a69c1634ea3b6da248b63a61ebe0498" + name = "github.com/prometheus/common" + packages = [ + "expfmt", + "internal/bitbucket.org/ww/goautoneg", + "model", + ] + pruneopts = "" + revision = "c7de2306084e37d54b8be01f3541a8464345e9a5" + +[[projects]] + branch = "master" + digest = "1:e04aaa0e8f8da0ed3d6c0700bd77eda52a47f38510063209d72d62f0ef807d5e" + name = "github.com/prometheus/procfs" + packages = [ + ".", + "internal/util", + "nfs", + "xfs", + ] + pruneopts = "" + revision = "05ee40e3a273f7245e8777337fc7b46e533a9a92" + [[projects]] digest = "1:3962f553b77bf6c03fc07cd687a22dd3b00fe11aa14d31194f5505f5bb65cdc8" name = "github.com/sergi/go-diff" @@ -498,12 +573,12 @@ version = "v1.2.2" [[projects]] - digest = "1:e139a0dfe24e723193005b291ed82a975041718cfcab9136aa6c9540df70a4ff" + digest = "1:3ddca2bd5496c6922a2a9e636530e178a43c2a534ea6634211acdc7d10222794" name = "github.com/tidwall/gjson" packages = ["."] pruneopts = "" - revision = "f123b340873a0084cb27267eddd8ff615115fbff" - version = "v1.1.2" + revision = "1e3f6aeaa5bad08d777ea7807b279a07885dd8b2" + version = "v1.1.3" [[projects]] branch = "master" @@ -569,7 +644,7 @@ [[projects]] branch = "master" - digest = "1:cae234a803b78380e4d769db6036b9fcc8c08ed4ff862571ffc1a958edc1f629" + digest = "1:53c4b75f22ea7757dea07eae380ea42de547ae6865a5e3b41866754a8a8219c9" name = "golang.org/x/crypto" packages = [ "cast5", @@ -591,11 +666,11 @@ "ssh/terminal", ] pruneopts = "" - revision = "c126467f60eb25f8f27e5a981f32a87e3965053f" + revision = "f027049dab0ad238e394a753dba2d14753473a04" [[projects]] branch = "master" - digest = "1:96d281cfaaa12ac602772da38ac85f00d59b1d3aa7bfe69d8ba334d6ee41e3e6" + digest = "1:67c2d940f2d5c017ef88e9847709dca9b38d5fe82f1e33fb42ace515219f22f1" name = "golang.org/x/net" packages = [ "context", @@ -606,7 +681,7 @@ "idna", ] pruneopts = "" - revision = "3673e40ba22529d22c3fd7c93e97b0ce50fa7bdd" + revision = "f9ce57c11b242f0f1599cf25c89d8cb02c45295a" [[projects]] branch = "master" @@ -632,14 +707,14 @@ [[projects]] branch = "master" - digest = "1:0e1506f3caef942bf9d08a91ebbe8bd87d263e7e440f8e0427b2f85b44eac3f7" + digest = "1:6d9c86494d97c7fc8bbab029c17fc0ce9dc517aaae92a25d790d01b0e8732832" name = "golang.org/x/sys" packages = [ "unix", "windows", ] pruneopts = "" - revision = "e072cadbbdc8dd3d3ffa82b8b4b9304c261d9311" + revision = "904bdc257025c7b3f43c19360ad3ab85783fad78" [[projects]] digest = "1:5acd3512b047305d49e8763eef7ba423901e85d5dd2fd1e71778a0ea8de10bd4" @@ -675,7 +750,7 @@ [[projects]] branch = "master" - digest = "1:e998e532fc19ae78db55a97f0d761c98cc20c621c2965abed644c3a984d5da76" + digest = "1:c73b8c7b4bfb2e69de55a3549d6a8089d7757899cc5b62ff1c180bd76e9ee7f6" name = "golang.org/x/tools" packages = [ "go/ast/astutil", @@ -683,7 +758,7 @@ "internal/fastwalk", ] pruneopts = "" - revision = "475b7a1e12f029fbb4d210958352057952ea9f7a" + revision = "ca6481ae56504398949d597084558e50ad07117a" [[projects]] digest = "1:c1771ca6060335f9768dff6558108bc5ef6c58506821ad43377ee23ff059e472" @@ -705,6 +780,27 @@ revision = "b1f26356af11148e710935ed1ac8a7f5702c7612" version = "v1.1.0" +[[projects]] + branch = "master" + digest = "1:c7ecd434ece8887311c33ea3c731e2fb42f43092a43545e350b493b9dcb023b2" + name = "google.golang.org/genproto" + packages = ["googleapis/rpc/status"] + pruneopts = "" + revision = "daca94659cb50e9f37c1b834680f2e46358f10b0" + +[[projects]] + digest = "1:ca75b3775a5d4e5d1fb48f57ef0865b4aaa8b3f00e6b52be68db991c4594e0a7" + name = "google.golang.org/grpc" + packages = [ + "codes", + "grpclog", + "metadata", + "status", + ] + pruneopts = "" + revision = "32fb0ac620c32ba40a4626ddf94d90d12cce3455" + version = "v1.14.0" + [[projects]] digest = "1:75fb3fcfc73a8c723efde7777b40e8e8ff9babf30d8c56160d01beffea8a95a6" name = "gopkg.in/inf.v0" @@ -1034,7 +1130,7 @@ [[projects]] branch = "master" - digest = "1:38d0c4ac62d52d18f671b436ad51d780f078d9ebeb43a5c43a0375473b34136f" + digest = "1:74eb4556b4379d0d76a3a5ada504ff6c5ef76cd85cbf1347cb649e4c1cc8ca9e" name = "k8s.io/gengo" packages = [ "args", @@ -1046,29 +1142,29 @@ "types", ] pruneopts = "" - revision = "906d99f89cd644eecf75ab547b29bf9f876f0b59" + revision = "c42f3cdacc394f43077ff17e327d1b351c0304e4" [[projects]] branch = "master" - digest = "1:526095379da1098c3f191a0008cc59c9bf9927492e63da7689e5de424219c162" + digest = "1:951bc2047eea6d316a17850244274554f26fd59189360e45f4056b424dadf2c1" name = "k8s.io/kube-openapi" packages = [ "pkg/common", "pkg/util/proto", ] pruneopts = "" - revision = "d8ea2fe547a448256204cfc68dfee7b26c720acb" + revision = "e3762e86a74c878ffed47484592986685639c2cd" [[projects]] - digest = "1:7d2828223fc12b07a19beee8635afd08ad6a47d5393d21f1febfef881ef9eb26" + digest = "1:0942d7281ddd72af6e4bcec2162c05deac2069a4163d0fbdc97039f1b5d7324a" name = "k8s.io/kubernetes" packages = [ "pkg/apis/core", "pkg/kubectl/scheme", ] pruneopts = "" - revision = "32ac1c9073b132b8ba18aa830f46b77dcceb0723" - version = "v1.10.5" + revision = "a21fdbd78dde8f5447f5f6c331f7eb6f80bd684e" + version = "v1.10.6" [solve-meta] analyzer-name = "dep" @@ -1093,6 +1189,8 @@ "github.com/minio/minio-go", "github.com/minio/minio-go/pkg/credentials", "github.com/pkg/errors", + "github.com/prometheus/client_golang/prometheus", + "github.com/prometheus/client_golang/prometheus/promhttp", "github.com/sirupsen/logrus", "github.com/spf13/cobra", "github.com/stretchr/testify/assert", @@ -1102,6 +1200,7 @@ "github.com/valyala/fasttemplate", "github.com/yudai/gojsondiff/formatter", "golang.org/x/crypto/ssh", + "golang.org/x/net/context", "golang.org/x/sync/errgroup", "gopkg.in/src-d/go-git.v4", "gopkg.in/src-d/go-git.v4/plumbing", diff --git a/Gopkg.toml b/Gopkg.toml index 4e424bde0341..b4856661acd4 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -62,3 +62,7 @@ required = [ [[constraint]] name = "github.com/Knetic/govaluate" revision = "9aa49832a739dcd78a5542ff189fb82c3e423116" + +[[constraint]] + name = "github.com/prometheus/client_golang" + version = "0.8.0" diff --git a/cmd/workflow-controller/main.go b/cmd/workflow-controller/main.go index 4f0c3009952e..b00aee7040fb 100644 --- a/cmd/workflow-controller/main.go +++ b/cmd/workflow-controller/main.go @@ -70,6 +70,8 @@ func NewRootCommand() *cobra.Command { ctx, cancel := context.WithCancel(context.Background()) defer cancel() go wfController.Run(ctx, 8, 8) + go wfController.MetricsServer(ctx) + go wfController.TelemetryServer(ctx) // Wait forever select {} diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 92bd8718d3e8..50b04efdc90c 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -14,6 +14,7 @@ import ( wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned" unstructutil "github.com/argoproj/argo/util/unstructured" "github.com/argoproj/argo/workflow/common" + "github.com/argoproj/argo/workflow/metrics" "github.com/ghodss/yaml" log "github.com/sirupsen/logrus" apiv1 "k8s.io/api/core/v1" @@ -26,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" + "k8s.io/client-go/informers/internalinterfaces" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -78,11 +80,16 @@ type WorkflowControllerConfig struct { InstanceID string `json:"instanceID,omitempty"` MatchLabels map[string]string `json:"matchLabels,omitempty"` + + MetricsConfig metrics.PrometheusConfig `json:"metricsConfig,omitempty"` + + TelemetryConfig metrics.PrometheusConfig `json:"telemetryConfig,omitempty"` } const ( - workflowResyncPeriod = 20 * time.Minute - podResyncPeriod = 30 * time.Minute + workflowResyncPeriod = 20 * time.Minute + workflowMetricsResyncPeriod = 1 * time.Minute + podResyncPeriod = 30 * time.Minute ) // ArtifactRepository represents a artifact repository in which a controller will store its artifacts @@ -121,6 +128,24 @@ func NewWorkflowController(restConfig *rest.Config, kubeclientset kubernetes.Int return &wfc } +// MetricsServer starts a prometheus metrics server if enabled in the configmap +func (wfc *WorkflowController) MetricsServer(ctx context.Context) { + if wfc.Config.MetricsConfig.Enabled { + informer := wfc.newWorkflowInformer(workflowMetricsResyncPeriod, wfc.tweakWorkflowMetricslist) + go informer.Run(ctx.Done()) + registry := metrics.NewWorkflowRegistry(informer) + metrics.RunServer(ctx, wfc.Config.MetricsConfig, registry) + } +} + +// TelemetryServer starts a prometheus telemetry server if enabled in the configmap +func (wfc *WorkflowController) TelemetryServer(ctx context.Context) { + if wfc.Config.TelemetryConfig.Enabled { + registry := metrics.NewTelemetryRegistry() + metrics.RunServer(ctx, wfc.Config.TelemetryConfig, registry) + } +} + // Run starts an Workflow resource controller func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers int) { defer wfc.wfQueue.ShutDown() @@ -134,8 +159,10 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in return } - wfc.wfInformer = wfc.newWorkflowInformer() + wfc.wfInformer = wfc.newWorkflowInformer(workflowResyncPeriod, wfc.tweakWorkflowlist) + wfc.addWorkflowInformerHandler() wfc.podInformer = wfc.newPodInformer() + go wfc.wfInformer.Run(ctx.Done()) go wfc.podInformer.Run(ctx.Done()) go wfc.podLabeler(ctx.Done()) @@ -345,12 +372,19 @@ func (wfc *WorkflowController) tweakWorkflowlist(options *metav1.ListOptions) { options.LabelSelector = labelSelector.String() } +func (wfc *WorkflowController) tweakWorkflowMetricslist(options *metav1.ListOptions) { + options.FieldSelector = fields.Everything().String() + + labelSelector := labels.NewSelector().Add(wfc.instanceIDRequirement()) + options.LabelSelector = labelSelector.String() +} + // newWorkflowInformer returns the workflow informer used by the controller. This is actually // a custom built UnstructuredInformer which is in actuality returning unstructured.Unstructured // objects. We no longer return WorkflowInformer due to: // https://github.com/kubernetes/kubernetes/issues/57705 // https://github.com/argoproj/argo/issues/632 -func (wfc *WorkflowController) newWorkflowInformer() cache.SharedIndexInformer { +func (wfc *WorkflowController) newWorkflowInformer(resyncPeriod time.Duration, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { dynClientPool := dynamic.NewDynamicClientPool(wfc.restConfig) dclient, err := dynClientPool.ClientForGroupVersionKind(wfv1.SchemaGroupVersionKind) if err != nil { @@ -368,11 +402,15 @@ func (wfc *WorkflowController) newWorkflowInformer() cache.SharedIndexInformer { resource, dclient, wfc.Config.Namespace, - workflowResyncPeriod, + resyncPeriod, cache.Indexers{}, - wfc.tweakWorkflowlist, + tweakListOptions, ) - informer.AddEventHandler( + return informer +} + +func (wfc *WorkflowController) addWorkflowInformerHandler() { + wfc.wfInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) @@ -396,7 +434,6 @@ func (wfc *WorkflowController) newWorkflowInformer() cache.SharedIndexInformer { }, }, ) - return informer } func (wfc *WorkflowController) watchControllerConfigMap(ctx context.Context) (cache.Controller, error) { diff --git a/workflow/metrics/collector.go b/workflow/metrics/collector.go new file mode 100644 index 000000000000..b282103d1fb3 --- /dev/null +++ b/workflow/metrics/collector.go @@ -0,0 +1,155 @@ +package metrics + +import ( + "os" + "strings" + + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + "github.com/prometheus/client_golang/prometheus" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" +) + +var ( + descWorkflowDefaultLabels = []string{"namespace", "name"} + + descWorkflowInfo = prometheus.NewDesc( + "kube_wf_info", + "Information about workflow.", + append(descWorkflowDefaultLabels, "entrypoint", "service_account_name", "templates"), + nil, + ) + descWorkflowStartedAt = prometheus.NewDesc( + "kube_wf_start_time", + "Start time in unix timestamp for a workflow.", + descWorkflowDefaultLabels, + nil, + ) + descWorkflowFinishedAt = prometheus.NewDesc( + "kube_wf_completion_time", + "Completion time in unix timestamp for a workflow.", + descWorkflowDefaultLabels, + nil, + ) + descWorkflowCreated = prometheus.NewDesc( + "kube_wf_created", + "Creation time in unix timestamp for a workflow.", + descWorkflowDefaultLabels, + nil, + ) + descWorkflowStatusPhase = prometheus.NewDesc( + "kube_wf_status_phase", + "The workflow current phase.", + append(descWorkflowDefaultLabels, "phase"), + nil, + ) +) + +func boolFloat64(b bool) float64 { + if b { + return 1 + } + return 0 +} + +type workflowLister func() ([]wfv1.Workflow, error) + +func (l workflowLister) List() ([]wfv1.Workflow, error) { + return l() +} + +type wfStore interface { + List() (workflows []wfv1.Workflow, err error) +} + +// workflowCollector collects metrics about all workflows in the cluster +type workflowCollector struct { + store wfStore +} + +// NewWorkflowRegistry creates a new prometheus registry that collects workflows +func NewWorkflowRegistry(informer cache.SharedIndexInformer) *prometheus.Registry { + workflowLister := workflowLister(func() (workflows []wfv1.Workflow, err error) { + for _, m := range informer.GetStore().List() { + var wf wfv1.Workflow + err := runtime.DefaultUnstructuredConverter.FromUnstructured(m.(*unstructured.Unstructured).Object, &wf) + if err != nil { + return nil, err + } + workflows = append(workflows, wf) + } + return workflows, nil + }) + registry := prometheus.NewRegistry() + registry.MustRegister(&workflowCollector{store: workflowLister}) + return registry +} + +// NewTelemetryRegistry creates a new prometheus registry that collects telemetry +func NewTelemetryRegistry() *prometheus.Registry { + registry := prometheus.NewRegistry() + registry.MustRegister(prometheus.NewProcessCollector(os.Getpid(), "")) + registry.MustRegister(prometheus.NewGoCollector()) + return registry +} + +// Describe implements the prometheus.Collector interface +func (wc *workflowCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- descWorkflowInfo + ch <- descWorkflowStartedAt + ch <- descWorkflowFinishedAt + ch <- descWorkflowCreated + ch <- descWorkflowStatusPhase +} + +// Collect implements the prometheus.Collector interface +func (wc *workflowCollector) Collect(ch chan<- prometheus.Metric) { + workflows, err := wc.store.List() + if err != nil { + return + } + for _, wf := range workflows { + wc.collectWorkflow(ch, wf) + } +} + +func (wc *workflowCollector) collectWorkflow(ch chan<- prometheus.Metric, wf wfv1.Workflow) { + addConstMetric := func(desc *prometheus.Desc, t prometheus.ValueType, v float64, lv ...string) { + lv = append([]string{wf.Namespace, wf.Name}, lv...) + ch <- prometheus.MustNewConstMetric(desc, t, v, lv...) + } + addGauge := func(desc *prometheus.Desc, v float64, lv ...string) { + addConstMetric(desc, prometheus.GaugeValue, v, lv...) + } + joinTemplates := func(spec []wfv1.Template) string { + var templates []string + for _, t := range spec { + templates = append(templates, t.Name) + } + return strings.Join(templates, ",") + } + + addGauge(descWorkflowInfo, 1, wf.Spec.Entrypoint, wf.Spec.ServiceAccountName, joinTemplates(wf.Spec.Templates)) + + if phase := wf.Status.Phase; phase != "" { + addGauge(descWorkflowStatusPhase, boolFloat64(phase == wfv1.NodeRunning), string(wfv1.NodeRunning)) + addGauge(descWorkflowStatusPhase, boolFloat64(phase == wfv1.NodeSucceeded), string(wfv1.NodeSucceeded)) + addGauge(descWorkflowStatusPhase, boolFloat64(phase == wfv1.NodeSkipped), string(wfv1.NodeSkipped)) + addGauge(descWorkflowStatusPhase, boolFloat64(phase == wfv1.NodeFailed), string(wfv1.NodeFailed)) + addGauge(descWorkflowStatusPhase, boolFloat64(phase == wfv1.NodeError), string(wfv1.NodeError)) + } + + if !wf.CreationTimestamp.IsZero() { + addGauge(descWorkflowCreated, float64(wf.CreationTimestamp.Unix())) + } + + if !wf.Status.StartedAt.IsZero() { + addGauge(descWorkflowStartedAt, float64(wf.Status.StartedAt.Unix())) + } + + if !wf.Status.FinishedAt.IsZero() { + addGauge(descWorkflowFinishedAt, float64(wf.Status.FinishedAt.Unix())) + } + +} diff --git a/workflow/metrics/server.go b/workflow/metrics/server.go new file mode 100644 index 000000000000..65153e23a044 --- /dev/null +++ b/workflow/metrics/server.go @@ -0,0 +1,37 @@ +package metrics + +import ( + "context" + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + log "github.com/sirupsen/logrus" +) + +// PrometheusConfig defines a config for a metrics server +type PrometheusConfig struct { + Enabled bool `json:"enabled,omitempty"` + Path string `json:"path,omitempty"` + Port string `json:"port,omitempty"` +} + +// RunServer starts a metrics server +func RunServer(ctx context.Context, config PrometheusConfig, registry *prometheus.Registry) { + mux := http.NewServeMux() + mux.Handle(config.Path, promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) + srv := &http.Server{Addr: config.Port, Handler: mux} + + defer func() { + if cerr := srv.Close(); cerr != nil { + log.Fatalf("Encountered an '%s' error when tried to close the metrics server running on '%s'", cerr, config.Port) + } + }() + + log.Infof("Starting prometheus metrics server at 0.0.0.0%s%s", config.Port, config.Path) + if err := srv.ListenAndServe(); err != nil { + panic(err) + } + + <-ctx.Done() +}