Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename cluster_name tag to yarn_cluster #8579

Merged
merged 3 commits into from
Feb 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion yarn/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ files:
value:
example: false
type: boolean

- name: disable_legacy_cluster_tag
description: Enable to stop submitting the tag `cluster_name`, which has been renamed to `yarn_cluster`.
value:
type: boolean
example: false
- template: instances/http
- template: instances/default

Expand Down
5 changes: 5 additions & 0 deletions yarn/datadog_checks/yarn/data/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ instances:
#
# split_yarn_application_tags: false

## @param disable_legacy_cluster_tag - boolean - optional - default: false
## Enable to stop submitting the tag `cluster_name`, which has been renamed to `yarn_cluster`.
#
# disable_legacy_cluster_tag: false

## @param proxy - mapping - optional
## This overrides the `proxy` setting in `init_config`.
##
Expand Down
4 changes: 3 additions & 1 deletion yarn/datadog_checks/yarn/yarn.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ def check(self, instance):
)
cluster_name = DEFAULT_CLUSTER_NAME

tags.append('cluster_name:{}'.format(cluster_name))
tags.append('yarn_cluster:{}'.format(cluster_name))
if not is_affirmative(self.instance.get('disable_legacy_cluster_tag', False)):
tags.append('cluster_name:{}'.format(cluster_name))

# Get metrics from the Resource Manager
self._yarn_cluster_metrics(rm_address, tags)
Expand Down
17 changes: 9 additions & 8 deletions yarn/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
"cluster_name": CLUSTER_NAME,
}

CLUSTER_TAG = "cluster_name:{}".format(CLUSTER_NAME)
LEGACY_CLUSTER_TAG = "cluster_name:{}".format(CLUSTER_NAME)
YARN_CLUSTER_TAG = "yarn_cluster:{}".format(CLUSTER_NAME)

EXPECTED_METRICS = [
"yarn.metrics.apps_submitted",
Expand Down Expand Up @@ -90,7 +91,7 @@
"yarn.node.num_containers",
]

EXPECTED_TAGS = ['cluster_name:test']
EXPECTED_TAGS = ['cluster_name:test', 'yarn_cluster:test']

YARN_CONFIG = {
'instances': [
Expand Down Expand Up @@ -225,7 +226,7 @@
'yarn.metrics.rebooted_nodes': 0,
}

YARN_CLUSTER_METRICS_TAGS = ['cluster_name:{}'.format(CLUSTER_NAME)]
YARN_CLUSTER_METRICS_TAGS = [LEGACY_CLUSTER_TAG, YARN_CLUSTER_TAG]

DEPRECATED_YARN_APP_METRICS_VALUES = {
'yarn.apps.progress': 100,
Expand All @@ -251,7 +252,7 @@
'yarn.apps.vcore_seconds_gauge': 103,
}

YARN_APP_METRICS_TAGS = ['cluster_name:{}'.format(CLUSTER_NAME), 'app_name:word count', 'app_queue:default']
YARN_APP_METRICS_TAGS = ['app_name:word count', 'app_queue:default'] + YARN_CLUSTER_METRICS_TAGS

YARN_NODE_METRICS_VALUES = {
'yarn.node.last_health_update': 1324056895432,
Expand All @@ -262,15 +263,15 @@
'yarn.node.num_containers': 0,
}

YARN_NODE_METRICS_TAGS = ['cluster_name:{}'.format(CLUSTER_NAME), 'node_id:h2:1235']
YARN_NODE_METRICS_TAGS = ['node_id:h2:1235'] + YARN_CLUSTER_METRICS_TAGS

YARN_ROOT_QUEUE_METRICS_VALUES = {
'yarn.queue.root.max_capacity': 100,
'yarn.queue.root.used_capacity': 35.012,
'yarn.queue.root.capacity': 100,
}

YARN_ROOT_QUEUE_METRICS_TAGS = ['cluster_name:{}'.format(CLUSTER_NAME), 'queue_name:root']
YARN_ROOT_QUEUE_METRICS_TAGS = ['queue_name:root'] + YARN_CLUSTER_METRICS_TAGS

YARN_QUEUE_METRICS_VALUES = {
'yarn.queue.num_pending_applications': 0,
Expand All @@ -297,6 +298,6 @@
'yarn.queue.max_applications_per_user': 5212,
}

YARN_QUEUE_METRICS_TAGS = ['cluster_name:{}'.format(CLUSTER_NAME), 'queue_name:clientqueue']
YARN_QUEUE_METRICS_TAGS = ['queue_name:clientqueue'] + YARN_CLUSTER_METRICS_TAGS

YARN_QUEUE_NOFOLLOW_METRICS_TAGS = ['cluster_name:{}'.format(CLUSTER_NAME), 'queue_name:nofollowqueue']
YARN_QUEUE_NOFOLLOW_METRICS_TAGS = ['queue_name:nofollowqueue'] + YARN_CLUSTER_METRICS_TAGS
3 changes: 2 additions & 1 deletion yarn/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def test_check_e2e(dd_agent_check, instance):
def assert_check(aggregator):
aggregator.assert_service_check('yarn.can_connect', AgentCheck.OK)
for metric in common.EXPECTED_METRICS:
aggregator.assert_metric_has_tag(metric, common.CLUSTER_TAG)
aggregator.assert_metric_has_tag(metric, common.YARN_CLUSTER_TAG)
aggregator.assert_metric_has_tag(metric, common.LEGACY_CLUSTER_TAG)

aggregator.assert_all_metrics_covered()
82 changes: 60 additions & 22 deletions yarn/tests/test_yarn.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
YARN_AUTH_CONFIG,
YARN_CLUSTER_METRICS_TAGS,
YARN_CLUSTER_METRICS_VALUES,
YARN_CLUSTER_TAG,
YARN_CONFIG,
YARN_CONFIG_EXCLUDING_APP,
YARN_CONFIG_SPLIT_APPLICATION_TAGS,
Expand All @@ -39,6 +40,8 @@
YARN_SSL_VERIFY_TRUE_CONFIG,
)

EXPECTED_TAGS = YARN_CLUSTER_METRICS_TAGS + CUSTOM_TAGS


def test_check(aggregator, mocked_request):
instance = YARN_CONFIG['instances'][0]
Expand All @@ -52,30 +55,30 @@ def test_check(aggregator, mocked_request):
aggregator.assert_service_check(
SERVICE_CHECK_NAME,
status=YarnCheck.OK,
tags=YARN_CLUSTER_METRICS_TAGS + CUSTOM_TAGS + ['url:{}'.format(RM_ADDRESS)],
tags=EXPECTED_TAGS + ['url:{}'.format(RM_ADDRESS)],
)

aggregator.assert_service_check(
APPLICATION_STATUS_SERVICE_CHECK,
status=YarnCheck.OK,
tags=['app_queue:default', 'app_name:word count', 'optional:tag1', 'cluster_name:SparkCluster'],
tags=['app_queue:default', 'app_name:word count'] + EXPECTED_TAGS,
)

aggregator.assert_service_check(
APPLICATION_STATUS_SERVICE_CHECK,
status=YarnCheck.CRITICAL,
tags=['app_queue:default', 'app_name:dead app', 'optional:tag1', 'cluster_name:SparkCluster'],
tags=['app_queue:default', 'app_name:dead app'] + EXPECTED_TAGS,
)

aggregator.assert_service_check(
APPLICATION_STATUS_SERVICE_CHECK,
status=YarnCheck.OK,
tags=['app_queue:default', 'app_name:new app', 'optional:tag1', 'cluster_name:SparkCluster'],
tags=['app_queue:default', 'app_name:new app'] + EXPECTED_TAGS,
)

# Check the YARN Cluster Metrics
for metric, value in iteritems(YARN_CLUSTER_METRICS_VALUES):
aggregator.assert_metric(metric, value=value, tags=YARN_CLUSTER_METRICS_TAGS + CUSTOM_TAGS, count=1)
aggregator.assert_metric(metric, value=value, tags=EXPECTED_TAGS, count=1)

# Check the YARN App Metrics
for metric, value in iteritems(YARN_APP_METRICS_VALUES):
Expand Down Expand Up @@ -114,25 +117,25 @@ def test_check_mapping(aggregator, mocked_request):
aggregator.assert_service_check(
SERVICE_CHECK_NAME,
status=YarnCheck.OK,
tags=YARN_CLUSTER_METRICS_TAGS + CUSTOM_TAGS + ['url:{}'.format(RM_ADDRESS)],
tags=EXPECTED_TAGS + ['url:{}'.format(RM_ADDRESS)],
)

aggregator.assert_service_check(
APPLICATION_STATUS_SERVICE_CHECK,
status=YarnCheck.OK,
tags=['app_queue:default', 'app_name:word count', 'optional:tag1', 'cluster_name:SparkCluster'],
tags=['app_queue:default', 'app_name:word count'] + EXPECTED_TAGS,
)

aggregator.assert_service_check(
APPLICATION_STATUS_SERVICE_CHECK,
status=YarnCheck.WARNING,
tags=['app_queue:default', 'app_name:dead app', 'optional:tag1', 'cluster_name:SparkCluster'],
tags=['app_queue:default', 'app_name:dead app'] + EXPECTED_TAGS,
)

aggregator.assert_service_check(
APPLICATION_STATUS_SERVICE_CHECK,
status=YarnCheck.OK,
tags=['app_queue:default', 'app_name:new app', 'optional:tag1', 'cluster_name:SparkCluster'],
tags=['app_queue:default', 'app_name:new app'] + EXPECTED_TAGS,
)


Expand All @@ -153,7 +156,7 @@ def test_check_excludes_app_metrics(aggregator, mocked_request):
aggregator.assert_service_check(
SERVICE_CHECK_NAME,
status=YarnCheck.OK,
tags=YARN_CLUSTER_METRICS_TAGS + CUSTOM_TAGS + ['url:{}'.format(RM_ADDRESS)],
tags=EXPECTED_TAGS + ['url:{}'.format(RM_ADDRESS)],
count=3,
)

Expand All @@ -170,19 +173,19 @@ def test_custom_mapping(aggregator, mocked_request):
aggregator.assert_service_check(
APPLICATION_STATUS_SERVICE_CHECK,
status=YarnCheck.OK,
tags=['app_queue:default', 'app_name:word count', 'optional:tag1', 'cluster_name:SparkCluster'],
tags=['app_queue:default', 'app_name:word count'] + EXPECTED_TAGS,
)

aggregator.assert_service_check(
APPLICATION_STATUS_SERVICE_CHECK,
status=YarnCheck.WARNING,
tags=['app_queue:default', 'app_name:dead app', 'optional:tag1', 'cluster_name:SparkCluster'],
tags=['app_queue:default', 'app_name:dead app'] + EXPECTED_TAGS,
)

aggregator.assert_service_check(
APPLICATION_STATUS_SERVICE_CHECK,
status=YarnCheck.UNKNOWN,
tags=['app_queue:default', 'app_name:new app', 'optional:tag1', 'cluster_name:SparkCluster'],
tags=['app_queue:default', 'app_name:new app'] + EXPECTED_TAGS,
)


Expand All @@ -204,9 +207,45 @@ def test_check_splits_yarn_application_tags(aggregator, mocked_request):
'app_name:word count',
'app_key1:value1',
'app_key2:value2',
'optional:tag1',
'cluster_name:SparkCluster',
],
]
+ EXPECTED_TAGS,
)

# And check that the YARN application tags have not been split for other tags
aggregator.assert_service_check(
APPLICATION_STATUS_SERVICE_CHECK,
status=YarnCheck.WARNING,
tags=[
'app_queue:default',
'app_name:dead app',
'app_tags:tag1,tag2',
]
+ EXPECTED_TAGS,
)


def test_disable_legacy_cluster_tag(aggregator, mocked_request):
instance = YARN_CONFIG_SPLIT_APPLICATION_TAGS['instances'][0]
instance['disable_legacy_cluster_tag'] = True

# Instantiate YarnCheck
yarn = YarnCheck('yarn', {}, [instance])

# Run the check once
yarn.check(instance)
# Check that the YARN application tags have been split for properly formatted tags without cluster_name tag
expected_tags = CUSTOM_TAGS
expected_tags.append(YARN_CLUSTER_TAG)
aggregator.assert_service_check(
APPLICATION_STATUS_SERVICE_CHECK,
status=YarnCheck.OK,
tags=[
'app_queue:default',
'app_name:word count',
'app_key1:value1',
'app_key2:value2',
]
+ expected_tags,
)

# And check that the YARN application tags have not been split for other tags
Expand All @@ -217,9 +256,8 @@ def test_check_splits_yarn_application_tags(aggregator, mocked_request):
'app_queue:default',
'app_name:dead app',
'app_tags:tag1,tag2',
'optional:tag1',
'cluster_name:SparkCluster',
],
]
+ expected_tags,
)


Expand All @@ -236,7 +274,7 @@ def test_auth(aggregator, mocked_auth_request):
aggregator.assert_service_check(
SERVICE_CHECK_NAME,
status=YarnCheck.OK,
tags=YARN_CLUSTER_METRICS_TAGS + CUSTOM_TAGS + ['url:{}'.format(RM_ADDRESS)],
tags=EXPECTED_TAGS + ['url:{}'.format(RM_ADDRESS)],
count=4,
)

Expand All @@ -254,7 +292,7 @@ def test_ssl_verification(aggregator, mocked_bad_cert_request):
aggregator.assert_service_check(
SERVICE_CHECK_NAME,
status=YarnCheck.CRITICAL,
tags=YARN_CLUSTER_METRICS_TAGS + CUSTOM_TAGS + ['url:{}'.format(RM_ADDRESS)],
tags=EXPECTED_TAGS + ['url:{}'.format(RM_ADDRESS)],
count=1,
)
pass
Expand All @@ -268,7 +306,7 @@ def test_ssl_verification(aggregator, mocked_bad_cert_request):
aggregator.assert_service_check(
SERVICE_CHECK_NAME,
status=YarnCheck.OK,
tags=YARN_CLUSTER_METRICS_TAGS + CUSTOM_TAGS + ['url:{}'.format(RM_ADDRESS)],
tags=EXPECTED_TAGS + ['url:{}'.format(RM_ADDRESS)],
count=4,
)

Expand Down