Skip to content

Commit

Permalink
Support keytab files for kerberos (#2591)
Browse files Browse the repository at this point in the history
* Support keytab files for kerberos

* address review
  • Loading branch information
ofek authored Nov 15, 2018
1 parent 4e671d0 commit 6006452
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ instances:
# kerberos_force_initiate: false
# kerberos_hostname: null
# kerberos_principal: null
# kerberos_keytab: /path/to/keytab_file

# Optionally disable SSL validation. Sometimes when using proxies or self-signed certs
# we'll want to override validation.
Expand Down
98 changes: 43 additions & 55 deletions hdfs_datanode/datadog_checks/hdfs_datanode/hdfs_datanode.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,15 @@
# (C) Datadog, Inc. 2018
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
import os

'''
HDFS DataNode Metrics
---------------------
hdfs.datanode.dfs_remaining The remaining disk space left in bytes
hdfs.datanode.dfs_capacity Disk capacity in bytes
hdfs.datanode.dfs_used Disk usage in bytes
hdfs.datanode.cache_capacity Cache capacity in bytes
hdfs.datanode.cache_used Cache used in bytes
hdfs.datanode.num_failed_volumes Number of failed volumes
hdfs.datanode.last_volume_failure_date Date the last volume failed
hdfs.datanode.estimated_capacity_lost_total The estimated capacity lost in bytes
hdfs.datanode.num_blocks_cached The number of blocks cached
hdfs.datanode.num_blocks_failed_to_cache The number of blocks that failed to cache
hdfs.datanode.num_blocks_failed_to_uncache The number of failed blocks to remove from cache
'''

# stdlib
from urlparse import urljoin

# 3rd party
import requests
import requests_kerberos
from requests.exceptions import Timeout, HTTPError, InvalidURL, ConnectionError
from simplejson import JSONDecodeError
from six import iteritems
from six.moves.urllib.parse import urljoin

# Project
from datadog_checks.base import AgentCheck, is_affirmative

KERBEROS_STRATEGIES = {
Expand Down Expand Up @@ -77,41 +59,19 @@ def check(self, instance):
tags.append("datanode_url:{}".format(jmx_address))
tags = list(set(tags))

auth = None

# Authenticate our connection to JMX endpoint if required
kerberos = instance.get('kerberos')
username = instance.get('username')
password = instance.get('password')
if username is not None and password is not None:
auth = (username, password)
elif kerberos is not None:
if kerberos not in KERBEROS_STRATEGIES:
raise Exception('Invalid Kerberos strategy `{}`'.format(kerberos))

auth = requests_kerberos.HTTPKerberosAuth(
mutual_authentication=KERBEROS_STRATEGIES[kerberos],
delegate=is_affirmative(instance.get('kerberos_delegate', False)),
force_preemptive=is_affirmative(instance.get('kerberos_force_initiate', False)),
hostname_override=instance.get('kerberos_hostname'),
principal=instance.get('kerberos_principal')
)

disable_ssl_validation = is_affirmative(instance.get('disable_ssl_validation', False))

# Get data from JMX
hdfs_datanode_beans = self._get_jmx_data(jmx_address, auth, disable_ssl_validation, tags)
hdfs_datanode_beans = self._get_jmx_data(instance, jmx_address, tags)

# Process the JMX data and send out metrics
if hdfs_datanode_beans:
self._hdfs_datanode_metrics(hdfs_datanode_beans, tags)

def _get_jmx_data(self, jmx_address, auth, disable_ssl_validation, tags):
def _get_jmx_data(self, instance, jmx_address, tags):
"""
Get namenode beans data from JMX endpoint
"""
response = self._rest_request_to_json(
jmx_address, auth, disable_ssl_validation, self.JMX_PATH, {'qry': self.HDFS_DATANODE_BEAN_NAME}, tags=tags
instance, jmx_address, self.JMX_PATH, {'qry': self.HDFS_DATANODE_BEAN_NAME}, tags=tags
)
beans = response.get('beans', [])
return beans
Expand All @@ -126,7 +86,7 @@ def _hdfs_datanode_metrics(self, beans, tags):

self.log.debug("Bean name retrieved: {}".format(bean_name))

for metric, (metric_name, metric_type) in self.HDFS_METRICS.iteritems():
for metric, (metric_name, metric_type) in iteritems(self.HDFS_METRICS):
metric_value = bean.get(metric)
if metric_value is not None:
self._set_metric(metric_name, metric_type, metric_value, tags)
Expand All @@ -140,22 +100,45 @@ def _set_metric(self, metric_name, metric_type, value, tags=None):
else:
self.log.error('Metric type "{}" unknown'.format(metric_type))

def _rest_request_to_json(self, address, auth, disable_ssl_validation, object_path, query_params, tags):
def _rest_request_to_json(self, instance, url, object_path, query_params, tags):
"""
Query the given URL and return the JSON response
"""
response_json = None

url = address

if object_path:
url = self._join_url_dir(url, object_path)

# Add query_params as arguments
if query_params:
query = '&'.join(['{}={}'.format(key, value) for key, value in query_params.iteritems()])
query = '&'.join(['{}={}'.format(key, value) for key, value in iteritems(query_params)])
url = urljoin(url, '?' + query)

auth = None

# Authenticate our connection to JMX endpoint if required
kerberos = instance.get('kerberos')
username = instance.get('username')
password = instance.get('password')
if username is not None and password is not None:
auth = (username, password)
elif kerberos is not None:
if kerberos not in KERBEROS_STRATEGIES:
raise Exception('Invalid Kerberos strategy `{}`'.format(kerberos))

auth = requests_kerberos.HTTPKerberosAuth(
mutual_authentication=KERBEROS_STRATEGIES[kerberos],
delegate=is_affirmative(instance.get('kerberos_delegate', False)),
force_preemptive=is_affirmative(instance.get('kerberos_force_initiate', False)),
hostname_override=instance.get('kerberos_hostname'),
principal=instance.get('kerberos_principal')
)

disable_ssl_validation = is_affirmative(instance.get('disable_ssl_validation', False))

old_keytab_path = None
if 'kerberos_keytab' in instance:
old_keytab_path = os.getenv('KRB5_CLIENT_KTNAME')
os.environ['KRB5_CLIENT_KTNAME'] = instance['kerberos_keytab']

self.log.debug('Attempting to connect to "{}"'.format(url))

try:
Expand Down Expand Up @@ -195,9 +178,14 @@ def _rest_request_to_json(self, address, auth, disable_ssl_validation, object_pa
self.JMX_SERVICE_CHECK, AgentCheck.OK, tags=tags, message="Connection to {} was successful".format(url)
)

return response_json
return response_json

finally:
if old_keytab_path is not None:
os.environ['KRB5_CLIENT_KTNAME'] = old_keytab_path

def _join_url_dir(self, url, *args):
@classmethod
def _join_url_dir(cls, url, *args):
"""
Join a URL with multiple directories
"""
Expand Down

0 comments on commit 6006452

Please sign in to comment.