From 3429744a7b3f6bdd5306d655944a4da6a70ceb46 Mon Sep 17 00:00:00 2001 From: Vincent Demeester Date: Wed, 7 Aug 2024 17:27:05 +0200 Subject: [PATCH] {taskrun,pipelinerun}metrics: make sure config is up-to-date This updates some metrics package and struct to be able to keep up-to-date the metrics configuration in the background go routines that are used. Signed-off-by: Vincent Demeester --- pkg/pipelinerunmetrics/metrics.go | 26 ++++++++++++++++++------ pkg/pipelinerunmetrics/metrics_test.go | 8 ++++---- pkg/reconciler/pipelinerun/controller.go | 8 ++++++-- pkg/reconciler/taskrun/controller.go | 9 ++++++-- pkg/taskrunmetrics/metrics.go | 22 +++++++++++++------- pkg/taskrunmetrics/metrics_test.go | 8 ++++---- 6 files changed, 56 insertions(+), 25 deletions(-) diff --git a/pkg/pipelinerunmetrics/metrics.go b/pkg/pipelinerunmetrics/metrics.go index 3b6afcfa523..d528681db9f 100644 --- a/pkg/pipelinerunmetrics/metrics.go +++ b/pkg/pipelinerunmetrics/metrics.go @@ -105,6 +105,7 @@ const ( type Recorder struct { mutex sync.Mutex initialized bool + cfg *config.Metrics insertTag func(pipeline, pipelinerun string) []tag.Mutator @@ -261,8 +262,8 @@ func viewUnregister() { runningPRsWaitingOnTaskResolutionView) } -// MetricsOnStore returns a function that checks if metrics are configured for a config.Store, and registers it if so -func MetricsOnStore(logger *zap.SugaredLogger) func(name string, +// OnStore returns a function that checks if metrics are configured for a config.Store, and registers it if so +func OnStore(logger *zap.SugaredLogger, r *Recorder) func(name string, value interface{}) { return func(name string, value interface{}) { if name == config.GetMetricsConfigName() { @@ -271,6 +272,8 @@ func MetricsOnStore(logger *zap.SugaredLogger) func(name string, logger.Error("Failed to do type insertion for extracting metrics config") return } + r.updateConfig(cfg) + // Update metrics according to configuration viewUnregister() err := viewRegister(cfg) if err != nil { @@ -282,8 +285,10 @@ func MetricsOnStore(logger *zap.SugaredLogger) func(name string, } func pipelinerunInsertTag(pipeline, pipelinerun string) []tag.Mutator { - return []tag.Mutator{tag.Insert(pipelineTag, pipeline), - tag.Insert(pipelinerunTag, pipelinerun)} + return []tag.Mutator{ + tag.Insert(pipelineTag, pipeline), + tag.Insert(pipelinerunTag, pipelinerun), + } } func pipelineInsertTag(pipeline, pipelinerun string) []tag.Mutator { @@ -312,6 +317,13 @@ func getPipelineTagName(pr *v1.PipelineRun) string { return pipelineName } +func (r *Recorder) updateConfig(cfg *config.Metrics) { + r.mutex.Lock() + defer r.mutex.Unlock() + + r.cfg = cfg +} + // DurationAndCount logs the duration of PipelineRun execution and // count for number of PipelineRuns succeed or failed // returns an error if its failed to log the metrics @@ -351,8 +363,10 @@ func (r *Recorder) DurationAndCount(pr *v1.PipelineRun, beforeCondition *apis.Co ctx, err := tag.New( context.Background(), - append([]tag.Mutator{tag.Insert(namespaceTag, pr.Namespace), - tag.Insert(statusTag, status), tag.Insert(reasonTag, reason)}, r.insertTag(pipelineName, pr.Name)...)...) + append([]tag.Mutator{ + tag.Insert(namespaceTag, pr.Namespace), + tag.Insert(statusTag, status), tag.Insert(reasonTag, reason), + }, r.insertTag(pipelineName, pr.Name)...)...) if err != nil { return err } diff --git a/pkg/pipelinerunmetrics/metrics_test.go b/pkg/pipelinerunmetrics/metrics_test.go index cdab8b6c721..23703e64539 100644 --- a/pkg/pipelinerunmetrics/metrics_test.go +++ b/pkg/pipelinerunmetrics/metrics_test.go @@ -69,7 +69,7 @@ func TestUninitializedMetrics(t *testing.T) { } } -func TestMetricsOnStore(t *testing.T) { +func TestOnStore(t *testing.T) { log := zap.NewExample() defer log.Sync() logger := log.Sugar() @@ -81,7 +81,7 @@ func TestMetricsOnStore(t *testing.T) { } // We check that there's no change when incorrect config is passed - MetricsOnStore(logger)(config.GetMetricsConfigName(), &config.Store{}) + OnStore(logger, metrics)(config.GetMetricsConfigName(), &config.Store{}) // Comparing function assign to struct with the one which should yield same value if reflect.ValueOf(metrics.insertTag).Pointer() != reflect.ValueOf(pipelinerunInsertTag).Pointer() { t.Fatal("metrics recorder shouldn't change during this OnStore call") @@ -94,7 +94,7 @@ func TestMetricsOnStore(t *testing.T) { DurationTaskrunType: config.DurationTaskrunTypeHistogram, DurationPipelinerunType: config.DurationPipelinerunTypeLastValue, } - MetricsOnStore(logger)(config.GetMetricsConfigName(), cfg) + OnStore(logger, metrics)(config.GetMetricsConfigName(), cfg) if reflect.ValueOf(metrics.insertTag).Pointer() != reflect.ValueOf(pipelinerunInsertTag).Pointer() { t.Fatal("metrics recorder shouldn't change during this OnStore call") } @@ -105,7 +105,7 @@ func TestMetricsOnStore(t *testing.T) { DurationTaskrunType: config.DurationTaskrunTypeHistogram, DurationPipelinerunType: config.DurationPipelinerunTypeLastValue, } - MetricsOnStore(logger)(config.GetMetricsConfigName(), cfg) + OnStore(logger, metrics)(config.GetMetricsConfigName(), cfg) if reflect.ValueOf(metrics.insertTag).Pointer() != reflect.ValueOf(nilInsertTag).Pointer() { t.Fatal("metrics recorder didn't change during OnStore call") } diff --git a/pkg/reconciler/pipelinerun/controller.go b/pkg/reconciler/pipelinerun/controller.go index 728cd752882..d47ef8d7760 100644 --- a/pkg/reconciler/pipelinerun/controller.go +++ b/pkg/reconciler/pipelinerun/controller.go @@ -62,8 +62,12 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex verificationpolicyInformer := verificationpolicyinformer.Get(ctx) secretinformer := secretinformer.Get(ctx) tracerProvider := tracing.New(TracerProviderName, logger.Named("tracing")) + pipelinerunmetricsRecorder := pipelinerunmetrics.Get(ctx) //nolint:contextcheck // OnStore methods does not support context as a parameter - configStore := config.NewStore(logger.Named("config-store"), pipelinerunmetrics.MetricsOnStore(logger), tracerProvider.OnStore(secretinformer.Lister())) + configStore := config.NewStore(logger.Named("config-store"), + pipelinerunmetrics.OnStore(logger, pipelinerunmetricsRecorder), + tracerProvider.OnStore(secretinformer.Lister()), + ) configStore.WatchConfigs(cmw) c := &Reconciler{ @@ -76,7 +80,7 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex customRunLister: customRunInformer.Lister(), verificationPolicyLister: verificationpolicyInformer.Lister(), cloudEventClient: cloudeventclient.Get(ctx), - metrics: pipelinerunmetrics.Get(ctx), + metrics: pipelinerunmetricsRecorder, pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger), resolutionRequester: resolution.NewCRDRequester(resolutionclient.Get(ctx), resolutionInformer.Lister()), tracerProvider: tracerProvider, diff --git a/pkg/reconciler/taskrun/controller.go b/pkg/reconciler/taskrun/controller.go index 024abd29373..84ab26185d2 100644 --- a/pkg/reconciler/taskrun/controller.go +++ b/pkg/reconciler/taskrun/controller.go @@ -65,8 +65,13 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex secretinformer := secretinformer.Get(ctx) spireClient := spire.GetControllerAPIClient(ctx) tracerProvider := tracing.New(TracerProviderName, logger.Named("tracing")) + taskrunmetricsRecorder := taskrunmetrics.Get(ctx) //nolint:contextcheck // OnStore methods does not support context as a parameter - configStore := config.NewStore(logger.Named("config-store"), taskrunmetrics.MetricsOnStore(logger), spire.OnStore(ctx, logger), tracerProvider.OnStore(secretinformer.Lister())) + configStore := config.NewStore(logger.Named("config-store"), + taskrunmetrics.OnStore(logger, taskrunmetricsRecorder), + spire.OnStore(ctx, logger), + tracerProvider.OnStore(secretinformer.Lister()), + ) configStore.WatchConfigs(cmw) entrypointCache, err := pod.NewEntrypointCache(kubeclientset) @@ -84,7 +89,7 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex limitrangeLister: limitrangeInformer.Lister(), verificationPolicyLister: verificationpolicyInformer.Lister(), cloudEventClient: cloudeventclient.Get(ctx), - metrics: taskrunmetrics.Get(ctx), + metrics: taskrunmetricsRecorder, entrypointCache: entrypointCache, podLister: podInformer.Lister(), pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger), diff --git a/pkg/taskrunmetrics/metrics.go b/pkg/taskrunmetrics/metrics.go index b9c705cf7f9..2a4fa887a24 100644 --- a/pkg/taskrunmetrics/metrics.go +++ b/pkg/taskrunmetrics/metrics.go @@ -121,6 +121,7 @@ var ( type Recorder struct { mutex sync.Mutex initialized bool + cfg *config.Metrics ReportingPeriod time.Duration @@ -144,15 +145,15 @@ var ( // to log the TaskRun related metrics func NewRecorder(ctx context.Context) (*Recorder, error) { once.Do(func() { + cfg := config.FromContextOrDefaults(ctx) r = &Recorder{ initialized: true, + cfg: cfg.Metrics, // Default to reporting metrics every 30s. ReportingPeriod: 30 * time.Second, } - cfg := config.FromContextOrDefaults(ctx) - errRegistering = viewRegister(cfg.Metrics) if errRegistering != nil { r.initialized = false @@ -325,9 +326,8 @@ func viewUnregister() { ) } -// MetricsOnStore returns a function that checks if metrics are configured for a config.Store, and registers it if so -func MetricsOnStore(logger *zap.SugaredLogger) func(name string, - value interface{}) { +// OnStore returns a function that checks if metrics are configured for a config.Store, and registers it if so +func OnStore(logger *zap.SugaredLogger, r *Recorder) func(name string, value interface{}) { return func(name string, value interface{}) { if name == config.GetMetricsConfigName() { cfg, ok := value.(*config.Metrics) @@ -335,6 +335,8 @@ func MetricsOnStore(logger *zap.SugaredLogger) func(name string, logger.Error("Failed to do type insertion for extracting metrics config") return } + r.updateConfig(cfg) + // Update metrics according to the configuration viewUnregister() err := viewRegister(cfg) if err != nil { @@ -389,6 +391,13 @@ func getTaskTagName(tr *v1.TaskRun) string { return taskName } +func (r *Recorder) updateConfig(cfg *config.Metrics) { + r.mutex.Lock() + defer r.mutex.Unlock() + + r.cfg = cfg +} + // DurationAndCount logs the duration of TaskRun execution and // count for number of TaskRuns succeed or failed // returns an error if its failed to log the metrics @@ -454,8 +463,7 @@ func (r *Recorder) RunningTaskRuns(ctx context.Context, lister listers.TaskRunLi return err } - cfg := config.FromContextOrDefaults(ctx) - addNamespaceLabelToQuotaThrottleMetric := cfg.Metrics != nil && cfg.Metrics.ThrottleWithNamespace + addNamespaceLabelToQuotaThrottleMetric := r.cfg != nil && r.cfg.ThrottleWithNamespace var runningTrs int trsThrottledByQuota := map[string]int{} diff --git a/pkg/taskrunmetrics/metrics_test.go b/pkg/taskrunmetrics/metrics_test.go index 94e739490ef..d2a02ef35a3 100644 --- a/pkg/taskrunmetrics/metrics_test.go +++ b/pkg/taskrunmetrics/metrics_test.go @@ -80,7 +80,7 @@ func TestUninitializedMetrics(t *testing.T) { } } -func TestMetricsOnStore(t *testing.T) { +func TestOnStore(t *testing.T) { log := zap.NewExample() defer log.Sync() logger := log.Sugar() @@ -92,7 +92,7 @@ func TestMetricsOnStore(t *testing.T) { } // We check that there's no change when incorrect config is passed - MetricsOnStore(logger)(config.GetMetricsConfigName(), &config.Store{}) + OnStore(logger, metrics)(config.GetMetricsConfigName(), &config.Store{}) // Comparing function assign to struct with the one which should yield same value if reflect.ValueOf(metrics.insertTaskTag).Pointer() != reflect.ValueOf(taskrunInsertTag).Pointer() { t.Fatalf("metrics recorder shouldn't change during this OnStore call") @@ -107,7 +107,7 @@ func TestMetricsOnStore(t *testing.T) { } // We test that there's no change when incorrect values in configmap is passed - MetricsOnStore(logger)(config.GetMetricsConfigName(), cfg) + OnStore(logger, metrics)(config.GetMetricsConfigName(), cfg) // Comparing function assign to struct with the one which should yield same value if reflect.ValueOf(metrics.insertTaskTag).Pointer() != reflect.ValueOf(taskrunInsertTag).Pointer() { t.Fatalf("metrics recorder shouldn't change during this OnStore call") @@ -121,7 +121,7 @@ func TestMetricsOnStore(t *testing.T) { DurationPipelinerunType: config.DurationPipelinerunTypeLastValue, } - MetricsOnStore(logger)(config.GetMetricsConfigName(), cfg) + OnStore(logger, metrics)(config.GetMetricsConfigName(), cfg) if reflect.ValueOf(metrics.insertTaskTag).Pointer() != reflect.ValueOf(nilInsertTag).Pointer() { t.Fatalf("metrics recorder didn't change during OnStore call") }