Skip to content

Commit

Permalink
{taskrun,pipelinerun}metrics: make sure config is up-to-date
Browse files Browse the repository at this point in the history
This updates some metrics package and struct to be able to
keep up-to-date the metrics configuration in the background go
routines that are used.

Signed-off-by: Vincent Demeester <[email protected]>
  • Loading branch information
vdemeester authored and tekton-robot committed Aug 9, 2024
1 parent 9cf9672 commit 3429744
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 25 deletions.
26 changes: 20 additions & 6 deletions pkg/pipelinerunmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ const (
type Recorder struct {
mutex sync.Mutex
initialized bool
cfg *config.Metrics

insertTag func(pipeline,
pipelinerun string) []tag.Mutator
Expand Down Expand Up @@ -261,8 +262,8 @@ func viewUnregister() {
runningPRsWaitingOnTaskResolutionView)
}

// MetricsOnStore returns a function that checks if metrics are configured for a config.Store, and registers it if so
func MetricsOnStore(logger *zap.SugaredLogger) func(name string,
// OnStore returns a function that checks if metrics are configured for a config.Store, and registers it if so
func OnStore(logger *zap.SugaredLogger, r *Recorder) func(name string,
value interface{}) {
return func(name string, value interface{}) {
if name == config.GetMetricsConfigName() {
Expand All @@ -271,6 +272,8 @@ func MetricsOnStore(logger *zap.SugaredLogger) func(name string,
logger.Error("Failed to do type insertion for extracting metrics config")
return
}
r.updateConfig(cfg)
// Update metrics according to configuration
viewUnregister()
err := viewRegister(cfg)
if err != nil {
Expand All @@ -282,8 +285,10 @@ func MetricsOnStore(logger *zap.SugaredLogger) func(name string,
}

func pipelinerunInsertTag(pipeline, pipelinerun string) []tag.Mutator {
return []tag.Mutator{tag.Insert(pipelineTag, pipeline),
tag.Insert(pipelinerunTag, pipelinerun)}
return []tag.Mutator{
tag.Insert(pipelineTag, pipeline),
tag.Insert(pipelinerunTag, pipelinerun),
}
}

func pipelineInsertTag(pipeline, pipelinerun string) []tag.Mutator {
Expand Down Expand Up @@ -312,6 +317,13 @@ func getPipelineTagName(pr *v1.PipelineRun) string {
return pipelineName
}

func (r *Recorder) updateConfig(cfg *config.Metrics) {
r.mutex.Lock()
defer r.mutex.Unlock()

r.cfg = cfg
}

// DurationAndCount logs the duration of PipelineRun execution and
// count for number of PipelineRuns succeed or failed
// returns an error if its failed to log the metrics
Expand Down Expand Up @@ -351,8 +363,10 @@ func (r *Recorder) DurationAndCount(pr *v1.PipelineRun, beforeCondition *apis.Co

ctx, err := tag.New(
context.Background(),
append([]tag.Mutator{tag.Insert(namespaceTag, pr.Namespace),
tag.Insert(statusTag, status), tag.Insert(reasonTag, reason)}, r.insertTag(pipelineName, pr.Name)...)...)
append([]tag.Mutator{
tag.Insert(namespaceTag, pr.Namespace),
tag.Insert(statusTag, status), tag.Insert(reasonTag, reason),
}, r.insertTag(pipelineName, pr.Name)...)...)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/pipelinerunmetrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestUninitializedMetrics(t *testing.T) {
}
}

func TestMetricsOnStore(t *testing.T) {
func TestOnStore(t *testing.T) {
log := zap.NewExample()
defer log.Sync()
logger := log.Sugar()
Expand All @@ -81,7 +81,7 @@ func TestMetricsOnStore(t *testing.T) {
}

// We check that there's no change when incorrect config is passed
MetricsOnStore(logger)(config.GetMetricsConfigName(), &config.Store{})
OnStore(logger, metrics)(config.GetMetricsConfigName(), &config.Store{})
// Comparing function assign to struct with the one which should yield same value
if reflect.ValueOf(metrics.insertTag).Pointer() != reflect.ValueOf(pipelinerunInsertTag).Pointer() {
t.Fatal("metrics recorder shouldn't change during this OnStore call")
Expand All @@ -94,7 +94,7 @@ func TestMetricsOnStore(t *testing.T) {
DurationTaskrunType: config.DurationTaskrunTypeHistogram,
DurationPipelinerunType: config.DurationPipelinerunTypeLastValue,
}
MetricsOnStore(logger)(config.GetMetricsConfigName(), cfg)
OnStore(logger, metrics)(config.GetMetricsConfigName(), cfg)
if reflect.ValueOf(metrics.insertTag).Pointer() != reflect.ValueOf(pipelinerunInsertTag).Pointer() {
t.Fatal("metrics recorder shouldn't change during this OnStore call")
}
Expand All @@ -105,7 +105,7 @@ func TestMetricsOnStore(t *testing.T) {
DurationTaskrunType: config.DurationTaskrunTypeHistogram,
DurationPipelinerunType: config.DurationPipelinerunTypeLastValue,
}
MetricsOnStore(logger)(config.GetMetricsConfigName(), cfg)
OnStore(logger, metrics)(config.GetMetricsConfigName(), cfg)
if reflect.ValueOf(metrics.insertTag).Pointer() != reflect.ValueOf(nilInsertTag).Pointer() {
t.Fatal("metrics recorder didn't change during OnStore call")
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,12 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
verificationpolicyInformer := verificationpolicyinformer.Get(ctx)
secretinformer := secretinformer.Get(ctx)
tracerProvider := tracing.New(TracerProviderName, logger.Named("tracing"))
pipelinerunmetricsRecorder := pipelinerunmetrics.Get(ctx)
//nolint:contextcheck // OnStore methods does not support context as a parameter
configStore := config.NewStore(logger.Named("config-store"), pipelinerunmetrics.MetricsOnStore(logger), tracerProvider.OnStore(secretinformer.Lister()))
configStore := config.NewStore(logger.Named("config-store"),
pipelinerunmetrics.OnStore(logger, pipelinerunmetricsRecorder),
tracerProvider.OnStore(secretinformer.Lister()),
)
configStore.WatchConfigs(cmw)

c := &Reconciler{
Expand All @@ -76,7 +80,7 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
customRunLister: customRunInformer.Lister(),
verificationPolicyLister: verificationpolicyInformer.Lister(),
cloudEventClient: cloudeventclient.Get(ctx),
metrics: pipelinerunmetrics.Get(ctx),
metrics: pipelinerunmetricsRecorder,
pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger),
resolutionRequester: resolution.NewCRDRequester(resolutionclient.Get(ctx), resolutionInformer.Lister()),
tracerProvider: tracerProvider,
Expand Down
9 changes: 7 additions & 2 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
secretinformer := secretinformer.Get(ctx)
spireClient := spire.GetControllerAPIClient(ctx)
tracerProvider := tracing.New(TracerProviderName, logger.Named("tracing"))
taskrunmetricsRecorder := taskrunmetrics.Get(ctx)
//nolint:contextcheck // OnStore methods does not support context as a parameter
configStore := config.NewStore(logger.Named("config-store"), taskrunmetrics.MetricsOnStore(logger), spire.OnStore(ctx, logger), tracerProvider.OnStore(secretinformer.Lister()))
configStore := config.NewStore(logger.Named("config-store"),
taskrunmetrics.OnStore(logger, taskrunmetricsRecorder),
spire.OnStore(ctx, logger),
tracerProvider.OnStore(secretinformer.Lister()),
)
configStore.WatchConfigs(cmw)

entrypointCache, err := pod.NewEntrypointCache(kubeclientset)
Expand All @@ -84,7 +89,7 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
limitrangeLister: limitrangeInformer.Lister(),
verificationPolicyLister: verificationpolicyInformer.Lister(),
cloudEventClient: cloudeventclient.Get(ctx),
metrics: taskrunmetrics.Get(ctx),
metrics: taskrunmetricsRecorder,
entrypointCache: entrypointCache,
podLister: podInformer.Lister(),
pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger),
Expand Down
22 changes: 15 additions & 7 deletions pkg/taskrunmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ var (
type Recorder struct {
mutex sync.Mutex
initialized bool
cfg *config.Metrics

ReportingPeriod time.Duration

Expand All @@ -144,15 +145,15 @@ var (
// to log the TaskRun related metrics
func NewRecorder(ctx context.Context) (*Recorder, error) {
once.Do(func() {
cfg := config.FromContextOrDefaults(ctx)
r = &Recorder{
initialized: true,
cfg: cfg.Metrics,

// Default to reporting metrics every 30s.
ReportingPeriod: 30 * time.Second,
}

cfg := config.FromContextOrDefaults(ctx)

errRegistering = viewRegister(cfg.Metrics)
if errRegistering != nil {
r.initialized = false
Expand Down Expand Up @@ -325,16 +326,17 @@ func viewUnregister() {
)
}

// MetricsOnStore returns a function that checks if metrics are configured for a config.Store, and registers it if so
func MetricsOnStore(logger *zap.SugaredLogger) func(name string,
value interface{}) {
// OnStore returns a function that checks if metrics are configured for a config.Store, and registers it if so
func OnStore(logger *zap.SugaredLogger, r *Recorder) func(name string, value interface{}) {
return func(name string, value interface{}) {
if name == config.GetMetricsConfigName() {
cfg, ok := value.(*config.Metrics)
if !ok {
logger.Error("Failed to do type insertion for extracting metrics config")
return
}
r.updateConfig(cfg)
// Update metrics according to the configuration
viewUnregister()
err := viewRegister(cfg)
if err != nil {
Expand Down Expand Up @@ -389,6 +391,13 @@ func getTaskTagName(tr *v1.TaskRun) string {
return taskName
}

func (r *Recorder) updateConfig(cfg *config.Metrics) {
r.mutex.Lock()
defer r.mutex.Unlock()

r.cfg = cfg
}

// DurationAndCount logs the duration of TaskRun execution and
// count for number of TaskRuns succeed or failed
// returns an error if its failed to log the metrics
Expand Down Expand Up @@ -454,8 +463,7 @@ func (r *Recorder) RunningTaskRuns(ctx context.Context, lister listers.TaskRunLi
return err
}

cfg := config.FromContextOrDefaults(ctx)
addNamespaceLabelToQuotaThrottleMetric := cfg.Metrics != nil && cfg.Metrics.ThrottleWithNamespace
addNamespaceLabelToQuotaThrottleMetric := r.cfg != nil && r.cfg.ThrottleWithNamespace

var runningTrs int
trsThrottledByQuota := map[string]int{}
Expand Down
8 changes: 4 additions & 4 deletions pkg/taskrunmetrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestUninitializedMetrics(t *testing.T) {
}
}

func TestMetricsOnStore(t *testing.T) {
func TestOnStore(t *testing.T) {
log := zap.NewExample()
defer log.Sync()
logger := log.Sugar()
Expand All @@ -92,7 +92,7 @@ func TestMetricsOnStore(t *testing.T) {
}

// We check that there's no change when incorrect config is passed
MetricsOnStore(logger)(config.GetMetricsConfigName(), &config.Store{})
OnStore(logger, metrics)(config.GetMetricsConfigName(), &config.Store{})
// Comparing function assign to struct with the one which should yield same value
if reflect.ValueOf(metrics.insertTaskTag).Pointer() != reflect.ValueOf(taskrunInsertTag).Pointer() {
t.Fatalf("metrics recorder shouldn't change during this OnStore call")
Expand All @@ -107,7 +107,7 @@ func TestMetricsOnStore(t *testing.T) {
}

// We test that there's no change when incorrect values in configmap is passed
MetricsOnStore(logger)(config.GetMetricsConfigName(), cfg)
OnStore(logger, metrics)(config.GetMetricsConfigName(), cfg)
// Comparing function assign to struct with the one which should yield same value
if reflect.ValueOf(metrics.insertTaskTag).Pointer() != reflect.ValueOf(taskrunInsertTag).Pointer() {
t.Fatalf("metrics recorder shouldn't change during this OnStore call")
Expand All @@ -121,7 +121,7 @@ func TestMetricsOnStore(t *testing.T) {
DurationPipelinerunType: config.DurationPipelinerunTypeLastValue,
}

MetricsOnStore(logger)(config.GetMetricsConfigName(), cfg)
OnStore(logger, metrics)(config.GetMetricsConfigName(), cfg)
if reflect.ValueOf(metrics.insertTaskTag).Pointer() != reflect.ValueOf(nilInsertTag).Pointer() {
t.Fatalf("metrics recorder didn't change during OnStore call")
}
Expand Down

0 comments on commit 3429744

Please sign in to comment.