Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Kerberos auth #2823

Merged
merged 1 commit into from
Jan 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ instances:
# username: user
# password: pass

# If your service uses Kerberos authentication, you can optionally
# specify the Kerberos strategy to use.
# See https://github.com/requests/requests-kerberos#mutual-authentication
# kerberos: "required|optional|disabled"
# kerberos_delegate: false
# kerberos_force_initiate: false
# kerberos_hostname: null
# kerberos_principal: null
# kerberos_keytab: /path/to/keytab_file

# Optionally disable SSL validation. Sometimes when using proxies or self-signed certs
# we'll want to override validation.
# disable_ssl_validation: false
Expand Down
103 changes: 51 additions & 52 deletions hdfs_namenode/datadog_checks/hdfs_namenode/hdfs_namenode.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,24 @@
# (C) Datadog, Inc. 2018
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)


"""
HDFS NameNode Metrics
---------------------
hdfs.namenode.capacity_total Total disk capacity in bytes
hdfs.namenode.capacity_used Disk usage in bytes
hdfs.namenode.capacity_remaining Remaining disk space left in bytes
hdfs.namenode.total_load Total load on the file system
hdfs.namenode.fs_lock_queue_length Lock queue length
hdfs.namenode.blocks_total Total number of blocks
hdfs.namenode.max_objects Maximum number of files HDFS supports
hdfs.namenode.files_total Total number of files
hdfs.namenode.pending_replication_blocks Number of blocks pending replication
hdfs.namenode.under_replicated_blocks Number of under replicated blocks
hdfs.namenode.scheduled_replication_blocks Number of blocks scheduled for replication
hdfs.namenode.pending_deletion_blocks Number of pending deletion blocks
hdfs.namenode.num_live_data_nodes Total number of live data nodes
hdfs.namenode.num_dead_data_nodes Total number of dead data nodes
hdfs.namenode.num_decom_live_data_nodes Number of decommissioning live data nodes
hdfs.namenode.num_decom_dead_data_nodes Number of decommissioning dead data nodes
hdfs.namenode.volume_failures_total Total volume failures
hdfs.namenode.estimated_capacity_lost_total Estimated capacity lost in bytes
hdfs.namenode.num_decommissioning_data_nodes Number of decommissioning data nodes
hdfs.namenode.num_stale_data_nodes Number of stale data nodes
hdfs.namenode.num_stale_storages Number of stale storages
hdfs.namenode.missing_blocks Number of missing blocks
hdfs.namenode.corrupt_blocks Number of corrupt blocks
"""

from __future__ import division

from six.moves.urllib.parse import urljoin
from six import iteritems
import os

import requests
import requests_kerberos
from requests.exceptions import Timeout, HTTPError, InvalidURL, ConnectionError
from simplejson import JSONDecodeError
from six import iteritems
from six.moves.urllib.parse import urljoin

from datadog_checks.base import AgentCheck, is_affirmative

from datadog_checks.checks import AgentCheck
KERBEROS_STRATEGIES = {
'required': requests_kerberos.REQUIRED,
'optional': requests_kerberos.OPTIONAL,
'disabled': requests_kerberos.DISABLED,
}


class HDFSNameNode(AgentCheck):
Expand Down Expand Up @@ -99,20 +77,11 @@ def check(self, instance):
tags.append("namenode_url:{}".format(jmx_address))
tags = list(set(tags))

# Authenticate our connection to JMX endpoint if required
username = instance.get('username')
password = instance.get('password')
auth = None
if username is not None and password is not None:
auth = (username, password)

# Get data from JMX
disable_ssl_validation = instance.get('disable_ssl_validation', False)
hdfs_system_state_beans = self._get_jmx_data(
jmx_address, auth, disable_ssl_validation, self.HDFS_NAME_SYSTEM_STATE_BEAN, tags
instance, jmx_address, self.HDFS_NAME_SYSTEM_STATE_BEAN, tags
)
hdfs_system_beans = self._get_jmx_data(
jmx_address, auth, disable_ssl_validation, self.HDFS_NAME_SYSTEM_BEAN, tags
instance, jmx_address, self.HDFS_NAME_SYSTEM_BEAN, tags
)

# Process the JMX data and send out metrics
Expand All @@ -130,13 +99,13 @@ def check(self, instance):
message="Connection to {} was successful".format(jmx_address),
)

def _get_jmx_data(self, jmx_address, auth, disable_ssl_validation, bean_name, tags):
def _get_jmx_data(self, instance, jmx_address, bean_name, tags):
"""
Get namenode beans data from JMX endpoint
"""

response = self._rest_request_to_json(
jmx_address, auth, disable_ssl_validation, self.JMX_PATH, {"qry": bean_name}, tags=tags
instance, jmx_address, self.JMX_PATH, {"qry": bean_name}, tags=tags
)
beans = response.get("beans", [])
return beans
Expand Down Expand Up @@ -174,13 +143,10 @@ def _set_metric(self, metric_name, metric_type, value, tags=None):
else:
self.log.error('Metric type "{}" unknown'.format(metric_type))

def _rest_request_to_json(self, address, auth, disable_ssl_validation, object_path, query_params, tags=None):
def _rest_request_to_json(self, instance, url, object_path, query_params, tags=None):
"""
Query the given URL and return the JSON response
"""
response_json = None
url = address

if object_path:
url = self._join_url_dir(url, object_path)

Expand All @@ -189,6 +155,33 @@ def _rest_request_to_json(self, address, auth, disable_ssl_validation, object_pa
query = '&'.join(['{}={}'.format(key, value) for key, value in iteritems(query_params)])
url = urljoin(url, '?' + query)

auth = None

# Authenticate our connection to JMX endpoint if required
kerberos = instance.get('kerberos')
username = instance.get('username')
password = instance.get('password')
if username is not None and password is not None:
auth = (username, password)
elif kerberos is not None:
if kerberos not in KERBEROS_STRATEGIES:
raise Exception('Invalid Kerberos strategy `{}`'.format(kerberos))

auth = requests_kerberos.HTTPKerberosAuth(
mutual_authentication=KERBEROS_STRATEGIES[kerberos],
delegate=is_affirmative(instance.get('kerberos_delegate', False)),
force_preemptive=is_affirmative(instance.get('kerberos_force_initiate', False)),
hostname_override=instance.get('kerberos_hostname'),
principal=instance.get('kerberos_principal')
)

disable_ssl_validation = instance.get('disable_ssl_validation', False)

old_keytab_path = None
if 'kerberos_keytab' in instance:
old_keytab_path = os.getenv('KRB5_CLIENT_KTNAME')
os.environ['KRB5_CLIENT_KTNAME'] = instance['kerberos_keytab']

self.log.debug('Attempting to connect to "{}"'.format(url))

try:
Expand Down Expand Up @@ -223,9 +216,15 @@ def _rest_request_to_json(self, address, auth, disable_ssl_validation, object_pa
self.service_check(self.JMX_SERVICE_CHECK, AgentCheck.CRITICAL, tags=tags, message=str(e))
raise

return response_json
else:
return response_json

finally:
if old_keytab_path is not None:
os.environ['KRB5_CLIENT_KTNAME'] = old_keytab_path

def _join_url_dir(self, url, *args):
@classmethod
def _join_url_dir(cls, url, *args):
"""
Join a URL with multiple directories
"""
Expand Down
3 changes: 1 addition & 2 deletions hdfs_namenode/requirements.in
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@


requests-kerberos==0.12.0
2 changes: 1 addition & 1 deletion hdfs_namenode/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def get_requirements(fpath):
return f.readlines()


CHECKS_BASE_REQ = 'datadog_checks_base'
CHECKS_BASE_REQ = 'datadog-checks-base>=4.2.0'

setup(
name='datadog-hdfs_namenode',
Expand Down