Skip to content

Commit

Permalink
Add kube_cronjob tag to job metrics when applicable (#11692)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmed-mez authored Mar 17, 2022
1 parent 291903b commit b21da7f
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def get_clustername():
'image': 'image_name',
}

JOB_NAME_PATTERN = r"(-\d{4,10}$)"


class KubernetesState(OpenMetricsBaseCheck):
"""
Expand Down Expand Up @@ -146,6 +148,9 @@ def __init__(self, name, init_config, instances):
self.job_succeeded_count = defaultdict(int)
self.job_failed_count = defaultdict(int)

# Regex to extract cronjob from job names
self._job_name_re = re.compile(JOB_NAME_PATTERN)

def check(self, instance):
endpoint = instance.get('kube_state_url')

Expand Down Expand Up @@ -561,12 +566,23 @@ def _label_to_tags(self, name, labels, scraper_config, tag_name=None):
tags += self._build_tags(tag_name or name, value, scraper_config)
return tags

def _get_job_tags(self, lname, lvalue, scraper_config):
"""
Returns kube_job and kube_cronjob tags in a list.
"""
trimmed_job, was_trimmed = self._trim_job_tag(lvalue)
tags = self._build_tags(lname, trimmed_job, scraper_config)
if was_trimmed:
tags += self._build_tags('kube_cronjob', trimmed_job, scraper_config)
return tags

def _trim_job_tag(self, name):
"""
Trims suffix of job names if they match -(\\d{4,10}$)
Returns the trimmed name and a boolean indicating whether the name was trimmed.
"""
pattern = r"(-\d{4,10}$)"
return re.sub(pattern, '', name)
trimmed = self._job_name_re.sub('', name)
return trimmed, trimmed != name

def _extract_job_timestamp(self, name):
"""
Expand Down Expand Up @@ -672,8 +688,7 @@ def kube_job_complete(self, metric, scraper_config):
tags = []
for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]):
if label_name == 'job' or label_name == 'job_name':
trimmed_job = self._trim_job_tag(label_value)
tags += self._build_tags(label_name, trimmed_job, scraper_config)
tags += self._get_job_tags(label_name, label_value, scraper_config)
else:
tags += self._build_tags(label_name, label_value, scraper_config)
self.service_check(service_check_name, self.OK, tags=tags + scraper_config['custom_tags'])
Expand All @@ -684,8 +699,7 @@ def kube_job_failed(self, metric, scraper_config):
tags = []
for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]):
if label_name == 'job' or label_name == 'job_name':
trimmed_job = self._trim_job_tag(label_value)
tags += self._build_tags(label_name, trimmed_job, scraper_config)
tags += self._get_job_tags(label_name, label_value, scraper_config)
else:
tags += self._build_tags(label_name, label_value, scraper_config)
self.service_check(service_check_name, self.CRITICAL, tags=tags + scraper_config['custom_tags'])
Expand All @@ -696,9 +710,8 @@ def kube_job_status_failed(self, metric, scraper_config):
tags = [] + scraper_config['custom_tags']
for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]):
if label_name == 'job' or label_name == 'job_name':
trimmed_job = self._trim_job_tag(label_value)
tags += self._get_job_tags(label_name, label_value, scraper_config)
job_ts = self._extract_job_timestamp(label_value)
tags += self._build_tags(label_name, trimmed_job, scraper_config)
else:
tags += self._build_tags(label_name, label_value, scraper_config)
if job_ts is not None: # if there is a timestamp, this is a Cron Job
Expand All @@ -714,9 +727,8 @@ def kube_job_status_succeeded(self, metric, scraper_config):
tags = [] + scraper_config['custom_tags']
for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]):
if label_name == 'job' or label_name == 'job_name':
trimmed_job = self._trim_job_tag(label_value)
tags += self._get_job_tags(label_name, label_value, scraper_config)
job_ts = self._extract_job_timestamp(label_value)
tags += self._build_tags(label_name, trimmed_job, scraper_config)
else:
tags += self._build_tags(label_name, label_value, scraper_config)
if job_ts is not None: # if there is a timestamp, this is a Cron Job
Expand Down
64 changes: 57 additions & 7 deletions kubernetes_state/tests/test_kubernetes_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ def test_join_standard_tags_labels(aggregator, instance, check_with_join_standar
tags=[
'job_name:curl-cron-job',
'kube_job:curl-cron-job',
'kube_cronjob:curl-cron-job',
'kube_namespace:default',
'namespace:default',
'optional:tag1',
Expand Down Expand Up @@ -726,12 +727,26 @@ def test_job_counts(aggregator, instance):
# Test cron jobs
aggregator.assert_metric(
NAMESPACE + '.job.failed',
tags=['namespace:default', 'kube_namespace:default', 'kube_job:hello', 'job:hello', 'optional:tag1'],
tags=[
'namespace:default',
'kube_namespace:default',
'kube_job:hello',
'kube_cronjob:hello',
'job:hello',
'optional:tag1',
],
value=0,
)
aggregator.assert_metric(
NAMESPACE + '.job.succeeded',
tags=['namespace:default', 'kube_namespace:default', 'kube_job:hello', 'job:hello', 'optional:tag1'],
tags=[
'namespace:default',
'kube_namespace:default',
'kube_job:hello',
'kube_cronjob:hello',
'job:hello',
'optional:tag1',
],
value=3,
)

Expand All @@ -753,12 +768,26 @@ def test_job_counts(aggregator, instance):
# Test cron jobs
aggregator.assert_metric(
NAMESPACE + '.job.failed',
tags=['namespace:default', 'kube_namespace:default', 'kube_job:hello', 'job:hello', 'optional:tag1'],
tags=[
'namespace:default',
'kube_namespace:default',
'kube_job:hello',
'kube_cronjob:hello',
'job:hello',
'optional:tag1',
],
value=0,
)
aggregator.assert_metric(
NAMESPACE + '.job.succeeded',
tags=['namespace:default', 'kube_namespace:default', 'kube_job:hello', 'job:hello', 'optional:tag1'],
tags=[
'namespace:default',
'kube_namespace:default',
'kube_job:hello',
'kube_cronjob:hello',
'job:hello',
'optional:tag1',
],
value=3,
)

Expand Down Expand Up @@ -792,12 +821,26 @@ def test_job_counts(aggregator, instance):
check.check(instance)
aggregator.assert_metric(
NAMESPACE + '.job.failed',
tags=['namespace:default', 'kube_namespace:default', 'job:hello', 'kube_job:hello', 'optional:tag1'],
tags=[
'namespace:default',
'kube_namespace:default',
'job:hello',
'kube_job:hello',
'kube_cronjob:hello',
'optional:tag1',
],
value=1,
)
aggregator.assert_metric(
NAMESPACE + '.job.succeeded',
tags=['namespace:default', 'kube_namespace:default', 'job:hello', 'kube_job:hello', 'optional:tag1'],
tags=[
'namespace:default',
'kube_namespace:default',
'job:hello',
'kube_job:hello',
'kube_cronjob:hello',
'optional:tag1',
],
value=4,
)

Expand Down Expand Up @@ -831,7 +874,14 @@ def test_job_counts(aggregator, instance):
check.check(instance)
aggregator.assert_metric(
NAMESPACE + '.job.succeeded',
tags=['namespace:default', 'kube_namespace:default', 'job:hello', 'kube_job:hello', 'optional:tag1'],
tags=[
'namespace:default',
'kube_namespace:default',
'job:hello',
'kube_job:hello',
'kube_cronjob:hello',
'optional:tag1',
],
value=5,
)

Expand Down

0 comments on commit b21da7f

Please sign in to comment.