Skip to content

Commit

Permalink
Run cloudera manager requests in parallel (#13499)
Browse files Browse the repository at this point in the history
* Added multithread

* Add hidden config for max connection pool size

* Added 'dd_environment' to test_metadata

* Update config

Co-authored-by: Andrew Zhang <[email protected]>
  • Loading branch information
jose-manuel-almaza and yzhan289 authored Dec 13, 2022
1 parent f8e4c7e commit 8b6f2eb
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 28 deletions.
7 changes: 7 additions & 0 deletions cloudera/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,10 @@ files:
required: true
value:
type: string
- name: max_parallel_requests
description: |
The maximum number of requests to Cloudera Manager that are allowed in parallel.
hidden: true
value:
type: integer
example: 100
3 changes: 2 additions & 1 deletion cloudera/datadog_checks/cloudera/api_client_factory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import cm_client
import cm_client.rest
import packaging.version

from datadog_checks.base import ConfigurationError
Expand All @@ -9,7 +10,7 @@ def make_api_client(check, config):
cm_client.configuration.username = config.workload_username
cm_client.configuration.password = config.workload_password
api_client = cm_client.ApiClient(config.api_url)

api_client.rest_client = cm_client.rest.RESTClientObject(maxsize=(config.max_connection_pool_size))
check.log.debug('Getting version from cloudera[%s]', config.api_url)
cloudera_manager_resource_api = cm_client.ClouderaManagerResourceApi(api_client)
get_version_response = cloudera_manager_resource_api.get_version()
Expand Down
57 changes: 31 additions & 26 deletions cloudera/datadog_checks/cloudera/api_client_v7.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from concurrent.futures import ThreadPoolExecutor

import cm_client

from datadog_checks.base import AgentCheck
Expand All @@ -20,16 +22,17 @@ def _collect_clusters(self):
read_clusters_response = clusters_resource_api.read_clusters(cluster_type='any', view='full')
self._log.debug("Full clusters response:")
self._log.debug(read_clusters_response)
for cluster in read_clusters_response.items:
cluster_name = cluster.name
self._log.debug('cluster_name: %s', cluster_name)
self._log.debug('cluster: %s', cluster)
with ThreadPoolExecutor(max_workers=len(read_clusters_response.items) * 3) as executor:
for cluster in read_clusters_response.items:
cluster_name = cluster.name
self._log.debug('cluster_name: %s', cluster_name)
self._log.debug('cluster: %s', cluster)

tags = self._collect_cluster_tags(cluster, self._check.config.tags)
tags = self._collect_cluster_tags(cluster, self._check.config.tags)

self._collect_cluster_metrics(cluster_name, tags)
self._collect_cluster_service_check(cluster, tags)
self._collect_hosts(cluster_name)
executor.submit(self._collect_cluster_metrics, cluster_name, tags)
executor.submit(self._collect_cluster_service_check, cluster, tags)
executor.submit(self._collect_hosts, cluster_name)

@staticmethod
def _collect_cluster_tags(cluster, custom_tags):
Expand All @@ -50,25 +53,25 @@ def _collect_cluster_service_check(self, cluster, tags):
def _collect_cluster_metrics(self, cluster_name, tags):
metric_names = ','.join(f'last({metric}) AS {metric}' for metric in TIMESERIES_METRICS['cluster'])
query = f'SELECT {metric_names} WHERE clusterName="{cluster_name}" AND category=CLUSTER'
self._query_time_series(query, category='cluster', tags=tags)
self._query_time_series(query, tags=tags)

def _collect_hosts(self, cluster_name):
clusters_resource_api = cm_client.ClustersResourceApi(self._api_client)
list_hosts_response = clusters_resource_api.list_hosts(cluster_name, view='full')
self._log.debug("Full hosts response:")
self._log.debug(list_hosts_response)
for host in list_hosts_response.items:
tags = self._collect_host_tags(host, self._check.config.tags)

if host.host_id:
self._collect_host_metrics(host, tags)
self._collect_role_metrics(host, tags)
self._collect_disk_metrics(host, tags)
self._collect_host_service_check(host, tags)
with ThreadPoolExecutor(max_workers=len(list_hosts_response.items) * 4) as executor:
for host in list_hosts_response.items:
tags = self._collect_host_tags(host, self._check.config.tags)
executor.submit(self._collect_host_metrics, host, tags)
executor.submit(self._collect_role_metrics, host, tags)
executor.submit(self._collect_disk_metrics, host, tags)
executor.submit(self._collect_host_service_check, host, tags)

@staticmethod
def _collect_host_tags(host, custom_tags):
tags = [
f'cloudera_hostname:{host.hostname}',
f'cloudera_rack_id:{host.rack_id}',
f'cloudera_cluster:{host.cluster_ref.cluster_name}',
]
Expand All @@ -86,11 +89,12 @@ def _collect_host_tags(host, custom_tags):
def _collect_host_service_check(self, host, tags):
host_entity_status = ENTITY_STATUS[host.entity_status] if host.entity_status else None
self._log.debug('host_entity_status: %s', host_entity_status)
self._check.service_check(HOST_HEALTH, host_entity_status, tags=tags + [f'cloudera_hostname:{host.hostname}'])
self._check.service_check(HOST_HEALTH, host_entity_status, tags=tags)

def _collect_host_metrics(self, host, tags):
self._collect_host_native_metrics(host, tags)
self._collect_host_timeseries_metrics(host, tags)
with ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(self._collect_host_native_metrics, host, tags)
executor.submit(self._collect_host_timeseries_metrics, host, tags)

def _collect_host_native_metrics(self, host, tags):
for metric in NATIVE_METRICS['host']:
Expand All @@ -99,19 +103,19 @@ def _collect_host_native_metrics(self, host, tags):
def _collect_host_timeseries_metrics(self, host, tags):
metric_names = ','.join(f'last({metric}) AS {metric}' for metric in TIMESERIES_METRICS['host'])
query = f'SELECT {metric_names} WHERE hostId="{host.host_id}" AND category=HOST'
self._query_time_series(query, category='host', tags=tags)
self._query_time_series(query, tags=tags)

def _collect_role_metrics(self, host, tags):
metric_names = ','.join(f'last({metric}) AS {metric}' for metric in TIMESERIES_METRICS['role'])
query = f'SELECT {metric_names} WHERE hostId="{host.host_id}" AND category=ROLE'
self._query_time_series(query, category='role', tags=tags)
self._query_time_series(query, tags=tags)

def _collect_disk_metrics(self, host, tags):
metric_names = ','.join(f'last({metric}) AS {metric}' for metric in TIMESERIES_METRICS['disk'])
query = f'SELECT {metric_names} WHERE hostId="{host.host_id}" AND category=DISK'
self._query_time_series(query, category='disk', tags=tags)
self._query_time_series(query, tags=tags)

def _query_time_series(self, query, category, tags):
def _query_time_series(self, query, tags):
self._log.debug('query: %s', query)
time_series_resource_api = cm_client.TimeSeriesResourceApi(self._api_client)
query_time_series_response = time_series_resource_api.query_time_series(query=query)
Expand All @@ -120,11 +124,12 @@ def _query_time_series(self, query, category, tags):
for ts in item.time_series:
self._log.debug('ts: %s', ts)
metric_name = ts.metadata.alias
full_metric_name = f'{category}.{metric_name}'
category_name = ts.metadata.attributes['category'].lower()
full_metric_name = f'{category_name}.{metric_name}'
for d in ts.data:
value = d.value
self._log.debug('full_metric_name: %s', full_metric_name)
self._log.debug('value: %s', value)
self._check.gauge(
full_metric_name, value, tags=tags + [f'cloudera_{category}:{ts.metadata.entity_name}']
full_metric_name, value, tags=tags + [f'cloudera_{category_name}:{ts.metadata.entity_name}']
)
4 changes: 4 additions & 0 deletions cloudera/datadog_checks/cloudera/config_models/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ def instance_empty_default_hostname(field, value):
return False


def instance_max_parallel_requests(field, value):
return 100


def instance_metric_patterns(field, value):
return get_default_field_value(field, value)

Expand Down
1 change: 1 addition & 0 deletions cloudera/datadog_checks/cloudera/config_models/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Config:
api_url: str
disable_generic_tags: Optional[bool]
empty_default_hostname: Optional[bool]
max_parallel_requests: Optional[int]
metric_patterns: Optional[MetricPatterns]
min_collection_interval: Optional[float]
service: Optional[str]
Expand Down
2 changes: 1 addition & 1 deletion cloudera/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def get_timeseries_resource():
data=[
ApiTimeSeriesData(value=49.7),
],
metadata=ApiTimeSeriesMetadata(entity_name=category, alias=metric),
metadata=ApiTimeSeriesMetadata(attributes={'category': category}, alias=metric),
)
for metric in metrics
]
Expand Down
1 change: 1 addition & 0 deletions cloudera/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def test_given_api_v48_endpoint_when_check_runs_then_service_check_ok_and_metric
aggregator.assert_all_metrics_covered()


@pytest.mark.usefixtures('dd_environment')
@pytest.mark.integration
def test_metadata(cloudera_check, instance, datadog_agent, dd_run_check):
check = cloudera_check(instance)
Expand Down

0 comments on commit 8b6f2eb

Please sign in to comment.