From 8f6f1b044944ac96469d6f3ae825d9d2df00913d Mon Sep 17 00:00:00 2001 From: "(Eliseo) Nathaniel Ruiz Nowell" Date: Fri, 30 Jul 2021 15:33:15 -0700 Subject: [PATCH 1/3] Add default & service name to resource aggregator --- CHANGELOG.md | 6 +- .../opentelemetry/sdk/resources/__init__.py | 52 ++++++--- .../tests/resources/test_resources.py | 101 +++++++++++++----- 3 files changed, 113 insertions(+), 46 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a4d89e055a1..735e546a7a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.4.0-0.23b0...HEAD) -- Fix documentation on well known exporters and variable OTEL_TRACES_EXPORTER which were misnamed [#2023](https://github.com/open-telemetry/opentelemetry-python/pull/2023) +- Fix documentation on well known exporters and variable OTEL_TRACES_EXPORTER which were misnamed + ([#2023](https://github.com/open-telemetry/opentelemetry-python/pull/2023)) +- `opentelemetry-sdk` `get_aggregated_resource()` returns default resource and service name + whenever called + ([#2013](https://github.com/open-telemetry/opentelemetry-python/pull/2013)) - `opentelemetry-distro` & `opentelemetry-sdk` Moved Auto Instrumentation Configurator code to SDK to let distros use its default implementation ([#1937](https://github.com/open-telemetry/opentelemetry-python/pull/1937)) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py index 3ad01b0aec2..d329be5aa06 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py @@ -165,20 +165,10 @@ def create( """ if not attributes: attributes = {} - resource = _DEFAULT_RESOURCE.merge( - OTELResourceDetector().detect() - ).merge(Resource(attributes, schema_url)) - if not resource.attributes.get(SERVICE_NAME, None): - default_service_name = "unknown_service" - process_executable_name = resource.attributes.get( - PROCESS_EXECUTABLE_NAME, None - ) - if process_executable_name: - default_service_name += ":" + process_executable_name - resource = resource.merge( - Resource({SERVICE_NAME: default_service_name}, schema_url) - ) - return resource + + return get_aggregated_resources( + [], final_resource=Resource(attributes, schema_url) + ) @staticmethod def get_empty() -> "Resource": @@ -282,6 +272,7 @@ def detect(self) -> "Resource": def get_aggregated_resources( detectors: typing.List["ResourceDetector"], initial_resource: typing.Optional[Resource] = None, + final_resource: typing.Optional[Resource] = None, timeout=5, ) -> "Resource": """Retrieves resources from detectors in the order that they were passed @@ -291,7 +282,11 @@ def get_aggregated_resources( :param timeout: Number of seconds to wait for each detector to return :return: """ - final_resource = initial_resource or _EMPTY_RESOURCE + detectors_resource = ( + _DEFAULT_RESOURCE.merge(initial_resource) + if initial_resource + else _DEFAULT_RESOURCE + ) detectors = [OTELResourceDetector()] + detectors with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: @@ -309,5 +304,28 @@ def get_aggregated_resources( ) detected_resources = _EMPTY_RESOURCE finally: - final_resource = final_resource.merge(detected_resources) - return final_resource + detectors_resource = detectors_resource.merge( + detected_resources + ) + + merged_resource = ( + detectors_resource.merge(final_resource) + if final_resource + else detectors_resource + ) + + if merged_resource.attributes.get(SERVICE_NAME): + return merged_resource + + default_service_name = "unknown_service" + + process_executable_name = merged_resource.attributes.get( + PROCESS_EXECUTABLE_NAME + ) + + if process_executable_name: + default_service_name += ":" + process_executable_name + + return merged_resource.merge( + Resource({SERVICE_NAME: default_service_name}) + ) diff --git a/opentelemetry-sdk/tests/resources/test_resources.py b/opentelemetry-sdk/tests/resources/test_resources.py index 1f2a094f94f..a3b5044360f 100644 --- a/opentelemetry-sdk/tests/resources/test_resources.py +++ b/opentelemetry-sdk/tests/resources/test_resources.py @@ -230,7 +230,14 @@ def test_invalid_resource_attribute_values(self): def test_aggregated_resources_no_detectors(self): aggregated_resources = resources.get_aggregated_resources([]) - self.assertEqual(aggregated_resources, resources.Resource.get_empty()) + self.assertEqual( + aggregated_resources, + resources._DEFAULT_RESOURCE.merge( + resources.Resource( + {resources.SERVICE_NAME: "unknown_service"}, "" + ) + ), + ) def test_aggregated_resources_with_static_resource(self): static_resource = resources.Resource({"static_key": "static_value"}) @@ -239,7 +246,11 @@ def test_aggregated_resources_with_static_resource(self): resources.get_aggregated_resources( [], initial_resource=static_resource ), - static_resource, + resources._DEFAULT_RESOURCE.merge( + resources.Resource( + {resources.SERVICE_NAME: "unknown_service"}, "" + ) + ).merge(static_resource), ) resource_detector = mock.Mock(spec=resources.ResourceDetector) @@ -250,11 +261,17 @@ def test_aggregated_resources_with_static_resource(self): resources.get_aggregated_resources( [resource_detector], initial_resource=static_resource ), - resources.Resource( - { - "static_key": "try_to_overwrite_existing_value", - "key": "value", - } + resources._DEFAULT_RESOURCE.merge( + resources.Resource( + {resources.SERVICE_NAME: "unknown_service"}, "" + ) + ).merge( + resources.Resource( + { + "static_key": "try_to_overwrite_existing_value", + "key": "value", + } + ) ), ) @@ -280,13 +297,19 @@ def test_aggregated_resources_multiple_detectors(self): resources.get_aggregated_resources( [resource_detector1, resource_detector2, resource_detector3] ), - resources.Resource( - { - "key1": "value1", - "key2": "try_to_overwrite_existing_value", - "key3": "try_to_overwrite_existing_value", - "key4": "value4", - } + resources._DEFAULT_RESOURCE.merge( + resources.Resource( + {resources.SERVICE_NAME: "unknown_service"}, "" + ) + ).merge( + resources.Resource( + { + "key1": "value1", + "key2": "try_to_overwrite_existing_value", + "key3": "try_to_overwrite_existing_value", + "key4": "value4", + } + ) ), ) @@ -321,9 +344,15 @@ def test_aggregated_resources_different_schema_urls(self): resources.get_aggregated_resources( [resource_detector1, resource_detector2] ), - resources.Resource( - {"key1": "value1", "key2": "value2", "key3": "value3"}, - "url1", + resources._DEFAULT_RESOURCE.merge( + resources.Resource( + {resources.SERVICE_NAME: "unknown_service"}, "" + ) + ).merge( + resources.Resource( + {"key1": "value1", "key2": "value2", "key3": "value3"}, + "url1", + ) ), ) with self.assertLogs(level=ERROR) as log_entry: @@ -331,8 +360,14 @@ def test_aggregated_resources_different_schema_urls(self): resources.get_aggregated_resources( [resource_detector2, resource_detector3] ), - resources.Resource( - {"key2": "value2", "key3": "value3"}, "url1" + resources._DEFAULT_RESOURCE.merge( + resources.Resource( + {resources.SERVICE_NAME: "unknown_service"}, "" + ) + ).merge( + resources.Resource( + {"key2": "value2", "key3": "value3"}, "url1" + ) ), ) self.assertIn("url1", log_entry.output[0]) @@ -347,14 +382,20 @@ def test_aggregated_resources_different_schema_urls(self): resource_detector1, ] ), - resources.Resource( - { - "key1": "value1", - "key2": "try_to_overwrite_existing_value", - "key3": "try_to_overwrite_existing_value", - "key4": "value4", - }, - "url1", + resources._DEFAULT_RESOURCE.merge( + resources.Resource( + {resources.SERVICE_NAME: "unknown_service"}, "" + ) + ).merge( + resources.Resource( + { + "key1": "value1", + "key2": "try_to_overwrite_existing_value", + "key3": "try_to_overwrite_existing_value", + "key4": "value4", + }, + "url1", + ) ), ) self.assertIn("url1", log_entry.output[0]) @@ -366,7 +407,11 @@ def test_resource_detector_ignore_error(self): resource_detector.raise_on_error = False self.assertEqual( resources.get_aggregated_resources([resource_detector]), - resources.Resource.get_empty(), + resources._DEFAULT_RESOURCE.merge( + resources.Resource( + {resources.SERVICE_NAME: "unknown_service"}, "" + ) + ), ) def test_resource_detector_raise_error(self): From a12c3a0e3abefcac95e8af9ccb7af9bdfbe6ea77 Mon Sep 17 00:00:00 2001 From: "(Eliseo) Nathaniel Ruiz Nowell" Date: Wed, 4 Aug 2021 13:49:19 -0700 Subject: [PATCH 2/3] Aggregator calls Resource.create() to set default attributes --- .../opentelemetry/sdk/resources/__init__.py | 56 +++++++------------ 1 file changed, 20 insertions(+), 36 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py index d329be5aa06..39210fdf3e2 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py @@ -165,10 +165,20 @@ def create( """ if not attributes: attributes = {} - - return get_aggregated_resources( - [], final_resource=Resource(attributes, schema_url) - ) + resource = _DEFAULT_RESOURCE.merge( + OTELResourceDetector().detect() + ).merge(Resource(attributes, schema_url)) + if not resource.attributes.get(SERVICE_NAME, None): + default_service_name = "unknown_service" + process_executable_name = resource.attributes.get( + PROCESS_EXECUTABLE_NAME, None + ) + if process_executable_name: + default_service_name += ":" + process_executable_name + resource = resource.merge( + Resource({SERVICE_NAME: default_service_name}, schema_url) + ) + return resource @staticmethod def get_empty() -> "Resource": @@ -272,7 +282,6 @@ def detect(self) -> "Resource": def get_aggregated_resources( detectors: typing.List["ResourceDetector"], initial_resource: typing.Optional[Resource] = None, - final_resource: typing.Optional[Resource] = None, timeout=5, ) -> "Resource": """Retrieves resources from detectors in the order that they were passed @@ -282,19 +291,14 @@ def get_aggregated_resources( :param timeout: Number of seconds to wait for each detector to return :return: """ - detectors_resource = ( - _DEFAULT_RESOURCE.merge(initial_resource) - if initial_resource - else _DEFAULT_RESOURCE - ) - detectors = [OTELResourceDetector()] + detectors + detectors_merged_resource = initial_resource or _EMPTY_RESOURCE with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(detector.detect) for detector in detectors] for detector_ind, future in enumerate(futures): detector = detectors[detector_ind] try: - detected_resources = future.result(timeout=timeout) + detected_resource = future.result(timeout=timeout) # pylint: disable=broad-except except Exception as ex: if detector.raise_on_error: @@ -302,30 +306,10 @@ def get_aggregated_resources( logger.warning( "Exception %s in detector %s, ignoring", ex, detector ) - detected_resources = _EMPTY_RESOURCE + detected_resource = _EMPTY_RESOURCE finally: - detectors_resource = detectors_resource.merge( - detected_resources + detectors_merged_resource = detectors_merged_resource.merge( + detected_resource ) - merged_resource = ( - detectors_resource.merge(final_resource) - if final_resource - else detectors_resource - ) - - if merged_resource.attributes.get(SERVICE_NAME): - return merged_resource - - default_service_name = "unknown_service" - - process_executable_name = merged_resource.attributes.get( - PROCESS_EXECUTABLE_NAME - ) - - if process_executable_name: - default_service_name += ":" + process_executable_name - - return merged_resource.merge( - Resource({SERVICE_NAME: default_service_name}) - ) + return Resource.create().merge(detectors_merged_resource) From 47a45a1e9a3cb364722f6ee63c80428a019f4e99 Mon Sep 17 00:00:00 2001 From: "(Eliseo) Nathaniel Ruiz Nowell" Date: Wed, 4 Aug 2021 16:01:55 -0700 Subject: [PATCH 3/3] Allow get aggregated resources to remove defaults --- .../opentelemetry/sdk/resources/__init__.py | 4 +-- .../tests/resources/test_resources.py | 26 +++++++------------ 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py index 39210fdf3e2..5878f375d79 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py @@ -291,7 +291,7 @@ def get_aggregated_resources( :param timeout: Number of seconds to wait for each detector to return :return: """ - detectors_merged_resource = initial_resource or _EMPTY_RESOURCE + detectors_merged_resource = initial_resource or Resource.create() with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(detector.detect) for detector in detectors] @@ -312,4 +312,4 @@ def get_aggregated_resources( detected_resource ) - return Resource.create().merge(detectors_merged_resource) + return detectors_merged_resource diff --git a/opentelemetry-sdk/tests/resources/test_resources.py b/opentelemetry-sdk/tests/resources/test_resources.py index a3b5044360f..25e8ddb8674 100644 --- a/opentelemetry-sdk/tests/resources/test_resources.py +++ b/opentelemetry-sdk/tests/resources/test_resources.py @@ -239,18 +239,16 @@ def test_aggregated_resources_no_detectors(self): ), ) - def test_aggregated_resources_with_static_resource(self): + def test_aggregated_resources_with_default_destroying_static_resource( + self, + ): static_resource = resources.Resource({"static_key": "static_value"}) self.assertEqual( resources.get_aggregated_resources( [], initial_resource=static_resource ), - resources._DEFAULT_RESOURCE.merge( - resources.Resource( - {resources.SERVICE_NAME: "unknown_service"}, "" - ) - ).merge(static_resource), + static_resource, ) resource_detector = mock.Mock(spec=resources.ResourceDetector) @@ -261,17 +259,11 @@ def test_aggregated_resources_with_static_resource(self): resources.get_aggregated_resources( [resource_detector], initial_resource=static_resource ), - resources._DEFAULT_RESOURCE.merge( - resources.Resource( - {resources.SERVICE_NAME: "unknown_service"}, "" - ) - ).merge( - resources.Resource( - { - "static_key": "try_to_overwrite_existing_value", - "key": "value", - } - ) + resources.Resource( + { + "static_key": "try_to_overwrite_existing_value", + "key": "value", + } ), )