Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add yarn application status mapping #4642

Merged
merged 6 commits into from
Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions yarn/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ The Yarn check does not include any events.

Returns `CRITICAL` if the Agent cannot connect to the ResourceManager URI to collect metrics, otherwise `OK`.

**yarn.application.status**:

Returns per application status according to the mapping specified in the [`conf.yaml`][5] file.

## Troubleshooting
Need help? Contact [Datadog support][9].

Expand Down
22 changes: 22 additions & 0 deletions yarn/datadog_checks/yarn/data/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,28 @@ instances:
# <TAG_KEY1>: <YARN_KEY>
# <TAG_KEY2>: <YARN_KEY>

## @param application_status_mapping - list of key:value elements - optional
##
## Custom application status mapping for service check `yarn.application.status`.
##
## The key is the Yarn application channel status. Valid keys are: `ALL`, `NEW`,
## `NEW_SAVING`, `SUBMITTED`, `ACCEPTED`, `RUNNING`, `FINISHED`, `FAILED` and `KILLED`.
## The value is the Datadog service check level. Valid values are: `ok`, `warning`, `critical`, and `unknown`.
## If a channel status is not mapped, it is mapped to `unknown`.
##
## Find below the default mapping configuration.
#
# application_status_mapping:
# ALL: unknown
# NEW: ok
# NEW_SAVING: ok
# SUBMITTED: ok
# ACCEPTED: ok
# RUNNING: ok
# FINISHED: ok
# FAILED: critical
# KILLED: critical

## @param collect_app_metrics - boolean - optional - default: true
## Set this parameter to false to remove yarn.app metrics from metric collection.
#
Expand Down
70 changes: 52 additions & 18 deletions yarn/datadog_checks/yarn/yarn.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from six.moves.urllib.parse import urljoin, urlsplit, urlunsplit

from datadog_checks.base import AgentCheck, is_affirmative
from datadog_checks.errors import ConfigurationError

# Default settings
DEFAULT_RM_URI = 'http://localhost:8088'
Expand Down Expand Up @@ -33,8 +34,22 @@
# Name of the service check
SERVICE_CHECK_NAME = 'yarn.can_connect'

# Application states to collect
YARN_APPLICATION_STATES = 'RUNNING'
# Application states
YARN_APPLICATION_RUNNING = 'RUNNING'

APPLICATION_STATUS_SERVICE_CHECK = 'yarn.application.status'

DEFAULT_APPLICATION_STATUS_MAPPING = {
'ALL': 'unknown',
'NEW': 'ok',
'NEW_SAVING': 'ok',
'SUBMITTED': 'ok',
'ACCEPTED': 'ok',
YARN_APPLICATION_RUNNING: 'ok',
'FINISHED': 'ok',
'FAILED': 'critical',
'KILLED': 'critical',
}

# Cluster metrics identifier
YARN_CLUSTER_METRICS_ELEMENT = 'clusterMetrics'
Expand Down Expand Up @@ -144,6 +159,18 @@ class YarnCheck(AgentCheck):

_ALLOWED_APPLICATION_TAGS = ['applicationTags', 'applicationType', 'name', 'queue', 'user']

def __init__(self, *args, **kwargs):
super(YarnCheck, self).__init__(*args, **kwargs)
application_status_mapping = self.instances[0].get(
'application_status_mapping', DEFAULT_APPLICATION_STATUS_MAPPING
)
try:
self.application_status_mapping = {
k.upper(): getattr(AgentCheck, v.upper()) for k, v in application_status_mapping.items()
}
except AttributeError as e:
raise ConfigurationError("Invalid mapping: {}".format(e))

def check(self, instance):

# Get properties from conf file
Expand Down Expand Up @@ -203,25 +230,32 @@ def _yarn_app_metrics(self, rm_address, app_tags, addl_tags):
"""
Get metrics for running applications
"""
metrics_json = self._rest_request_to_json(rm_address, YARN_APPS_PATH, addl_tags, states=YARN_APPLICATION_STATES)
metrics_json = self._rest_request_to_json(rm_address, YARN_APPS_PATH, addl_tags)

if metrics_json and metrics_json['apps'] is not None and metrics_json['apps']['app'] is not None:

for app_json in metrics_json['apps']['app']:

tags = []
for dd_tag, yarn_key in iteritems(app_tags):
try:
val = app_json[yarn_key]
if val:
tags.append('{tag}:{value}'.format(tag=dd_tag, value=val))
except KeyError:
self.log.error("Invalid value {} for application_tag".format(yarn_key))

tags.extend(addl_tags)

self._set_yarn_metrics_from_json(tags, app_json, DEPRECATED_YARN_APP_METRICS)
self._set_yarn_metrics_from_json(tags, app_json, YARN_APP_METRICS)
tags = self._get_app_tags(app_json, app_tags) + addl_tags

if app_json['state'] == YARN_APPLICATION_RUNNING:
self._set_yarn_metrics_from_json(tags, app_json, DEPRECATED_YARN_APP_METRICS)
self._set_yarn_metrics_from_json(tags, app_json, YARN_APP_METRICS)

self.service_check(
APPLICATION_STATUS_SERVICE_CHECK,
self.application_status_mapping.get(app_json['state'], AgentCheck.UNKNOWN),
tags=tags,
)

def _get_app_tags(self, app_json, app_tags):
tags = []
for dd_tag, yarn_key in iteritems(app_tags):
try:
val = app_json[yarn_key]
if val:
tags.append('{tag}:{value}'.format(tag=dd_tag, value=val))
except KeyError:
self.log.error("Invalid value {} for application_tag".format(yarn_key))
return tags

def _yarn_node_metrics(self, rm_address, addl_tags):
"""
Expand Down
10 changes: 2 additions & 8 deletions yarn/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,7 @@
import os

from datadog_checks.dev import get_docker_hostname
from datadog_checks.yarn.yarn import (
YARN_APPLICATION_STATES,
YARN_APPS_PATH,
YARN_CLUSTER_METRICS_PATH,
YARN_NODES_PATH,
YARN_SCHEDULER_PATH,
)
from datadog_checks.yarn.yarn import YARN_APPS_PATH, YARN_CLUSTER_METRICS_PATH, YARN_NODES_PATH, YARN_SCHEDULER_PATH

HERE = os.path.dirname(os.path.abspath(__file__))

Expand All @@ -23,7 +17,7 @@

# Service URLs
YARN_CLUSTER_METRICS_URL = '{}{}'.format(RM_ADDRESS, YARN_CLUSTER_METRICS_PATH)
YARN_APPS_URL = '{}{}?states={}'.format(RM_ADDRESS, YARN_APPS_PATH, YARN_APPLICATION_STATES)
YARN_APPS_URL = '{}{}'.format(RM_ADDRESS, YARN_APPS_PATH)
YARN_NODES_URL = '{}{}'.format(RM_ADDRESS, YARN_NODES_PATH)
YARN_SCHEDULER_URL = '{}{}'.format(RM_ADDRESS, YARN_SCHEDULER_PATH)

Expand Down
46 changes: 46 additions & 0 deletions yarn/tests/fixtures/apps_metrics
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,52 @@
"runningContainers": 0,
"memorySeconds": 151730,
"vcoreSeconds": 103
},
{
"finishedTime": 1326815598530,
"amContainerLogs": "http://host.domain.com:8042/node/containerlogs/container_1326815542473_0001_01_000001",
"trackingUI": "History",
"state": "KILLED",
"user": "user1",
"id": "application_1326815542473_0002",
"clusterId": 1326815542473,
"finalStatus": "KILLED",
"amHostHttpAddress": "host.domain.com:8042",
"progress": 30,
"name": "dead app",
"startedTime": 1326815573334,
"elapsedTime": 25196,
"diagnostics": "",
"trackingUrl": "http://host.domain.com:8088/proxy/application_1326815542473_0001/jobhistory/job/job_1326815542473_1_2",
"queue": "default",
"allocatedMB": 0,
"allocatedVCores": 0,
"runningContainers": 0,
"memorySeconds": 151730,
"vcoreSeconds": 103
},
{
"finishedTime": 1326815598530,
"amContainerLogs": "http://host.domain.com:8042/node/containerlogs/container_1326815542473_0001_01_000003",
"trackingUI": "History",
"state": "NEW",
"user": "user1",
"id": "application_1326815542473_0002",
"clusterId": 1326815542473,
"finalStatus": "KILLED",
"amHostHttpAddress": "host.domain.com:8042",
"progress": 30,
"name": "new app",
"startedTime": 1326815573334,
"elapsedTime": 25196,
"diagnostics": "",
"trackingUrl": "http://host.domain.com:8088/proxy/application_1326815542473_0001/jobhistory/job/job_1326815542473_1_3",
"queue": "default",
"allocatedMB": 0,
"allocatedVCores": 0,
"runningContainers": 0,
"memorySeconds": 151730,
"vcoreSeconds": 103
}
]
}
Expand Down
55 changes: 54 additions & 1 deletion yarn/tests/test_yarn.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
# (C) Datadog, Inc. 2018
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
import copy

from requests.exceptions import SSLError
from six import iteritems

from datadog_checks.yarn import YarnCheck
from datadog_checks.yarn.yarn import SERVICE_CHECK_NAME, YARN_APP_METRICS, YARN_QUEUE_METRICS
from datadog_checks.yarn.yarn import (
APPLICATION_STATUS_SERVICE_CHECK,
SERVICE_CHECK_NAME,
YARN_APP_METRICS,
YARN_QUEUE_METRICS,
)

from .common import (
CUSTOM_TAGS,
Expand Down Expand Up @@ -45,6 +52,24 @@ def test_check(aggregator, mocked_request):
tags=YARN_CLUSTER_METRICS_TAGS + CUSTOM_TAGS + ['url:{}'.format(RM_ADDRESS)],
)

aggregator.assert_service_check(
APPLICATION_STATUS_SERVICE_CHECK,
status=YarnCheck.OK,
tags=['app_queue:default', 'app_name:word count', 'optional:tag1', 'cluster_name:SparkCluster'],
)

aggregator.assert_service_check(
APPLICATION_STATUS_SERVICE_CHECK,
status=YarnCheck.CRITICAL,
tags=['app_queue:default', 'app_name:dead app', 'optional:tag1', 'cluster_name:SparkCluster'],
)

aggregator.assert_service_check(
APPLICATION_STATUS_SERVICE_CHECK,
status=YarnCheck.OK,
tags=['app_queue:default', 'app_name:new app', 'optional:tag1', 'cluster_name:SparkCluster'],
)

# Check the YARN Cluster Metrics
for metric, value in iteritems(YARN_CLUSTER_METRICS_VALUES):
aggregator.assert_metric(metric, value=value, tags=YARN_CLUSTER_METRICS_TAGS + CUSTOM_TAGS, count=1)
Expand Down Expand Up @@ -96,6 +121,34 @@ def test_check_excludes_app_metrics(aggregator, mocked_request):
)


def test_custom_mapping(aggregator, mocked_request):
instance = copy.deepcopy(YARN_CONFIG['instances'][0])
instance['application_status_mapping'] = {'KILLED': 'WARNING', 'RUNNING': 'OK'}

yarn = YarnCheck('yarn', {}, [instance])

# Run the check once
yarn.check(instance)

aggregator.assert_service_check(
APPLICATION_STATUS_SERVICE_CHECK,
status=YarnCheck.OK,
tags=['app_queue:default', 'app_name:word count', 'optional:tag1', 'cluster_name:SparkCluster'],
)

aggregator.assert_service_check(
APPLICATION_STATUS_SERVICE_CHECK,
status=YarnCheck.WARNING,
tags=['app_queue:default', 'app_name:dead app', 'optional:tag1', 'cluster_name:SparkCluster'],
)

aggregator.assert_service_check(
APPLICATION_STATUS_SERVICE_CHECK,
status=YarnCheck.UNKNOWN,
tags=['app_queue:default', 'app_name:new app', 'optional:tag1', 'cluster_name:SparkCluster'],
)


def test_auth(aggregator, mocked_auth_request):
instance = YARN_AUTH_CONFIG['instances'][0]

Expand Down