From a12c3a0e3abefcac95e8af9ccb7af9bdfbe6ea77 Mon Sep 17 00:00:00 2001 From: "(Eliseo) Nathaniel Ruiz Nowell" Date: Wed, 4 Aug 2021 13:49:19 -0700 Subject: [PATCH] Aggregator calls Resource.create() to set default attributes --- .../opentelemetry/sdk/resources/__init__.py | 56 +++++++------------ 1 file changed, 20 insertions(+), 36 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py index d329be5aa06..39210fdf3e2 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py @@ -165,10 +165,20 @@ def create( """ if not attributes: attributes = {} - - return get_aggregated_resources( - [], final_resource=Resource(attributes, schema_url) - ) + resource = _DEFAULT_RESOURCE.merge( + OTELResourceDetector().detect() + ).merge(Resource(attributes, schema_url)) + if not resource.attributes.get(SERVICE_NAME, None): + default_service_name = "unknown_service" + process_executable_name = resource.attributes.get( + PROCESS_EXECUTABLE_NAME, None + ) + if process_executable_name: + default_service_name += ":" + process_executable_name + resource = resource.merge( + Resource({SERVICE_NAME: default_service_name}, schema_url) + ) + return resource @staticmethod def get_empty() -> "Resource": @@ -272,7 +282,6 @@ def detect(self) -> "Resource": def get_aggregated_resources( detectors: typing.List["ResourceDetector"], initial_resource: typing.Optional[Resource] = None, - final_resource: typing.Optional[Resource] = None, timeout=5, ) -> "Resource": """Retrieves resources from detectors in the order that they were passed @@ -282,19 +291,14 @@ def get_aggregated_resources( :param timeout: Number of seconds to wait for each detector to return :return: """ - detectors_resource = ( - _DEFAULT_RESOURCE.merge(initial_resource) - if initial_resource - else _DEFAULT_RESOURCE - ) - detectors = [OTELResourceDetector()] + detectors + detectors_merged_resource = initial_resource or _EMPTY_RESOURCE with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(detector.detect) for detector in detectors] for detector_ind, future in enumerate(futures): detector = detectors[detector_ind] try: - detected_resources = future.result(timeout=timeout) + detected_resource = future.result(timeout=timeout) # pylint: disable=broad-except except Exception as ex: if detector.raise_on_error: @@ -302,30 +306,10 @@ def get_aggregated_resources( logger.warning( "Exception %s in detector %s, ignoring", ex, detector ) - detected_resources = _EMPTY_RESOURCE + detected_resource = _EMPTY_RESOURCE finally: - detectors_resource = detectors_resource.merge( - detected_resources + detectors_merged_resource = detectors_merged_resource.merge( + detected_resource ) - merged_resource = ( - detectors_resource.merge(final_resource) - if final_resource - else detectors_resource - ) - - if merged_resource.attributes.get(SERVICE_NAME): - return merged_resource - - default_service_name = "unknown_service" - - process_executable_name = merged_resource.attributes.get( - PROCESS_EXECUTABLE_NAME - ) - - if process_executable_name: - default_service_name += ":" + process_executable_name - - return merged_resource.merge( - Resource({SERVICE_NAME: default_service_name}) - ) + return Resource.create().merge(detectors_merged_resource)