Skip to content

Commit

Permalink
Add default and service name to get_aggregated_resource (#2013)
Browse files Browse the repository at this point in the history
* Add default & service name to resource aggregator

* Aggregator calls Resource.create() to set default attributes

* Allow get aggregated resources to remove defaults
  • Loading branch information
NathanielRN authored Aug 13, 2021
1 parent 240ee76 commit 3cdb54f
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 30 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.4.0-0.23b0...HEAD)
- Fix documentation on well known exporters and variable OTEL_TRACES_EXPORTER which were misnamed [#2023](https://github.com/open-telemetry/opentelemetry-python/pull/2023)
- Fix documentation on well known exporters and variable OTEL_TRACES_EXPORTER which were misnamed
([#2023](https://github.com/open-telemetry/opentelemetry-python/pull/2023))
- `opentelemetry-sdk` `get_aggregated_resource()` returns default resource and service name
whenever called
([#2013](https://github.com/open-telemetry/opentelemetry-python/pull/2013))
- `opentelemetry-distro` & `opentelemetry-sdk` Moved Auto Instrumentation Configurator code to SDK
to let distros use its default implementation
([#1937](https://github.com/open-telemetry/opentelemetry-python/pull/1937))
Expand Down
14 changes: 8 additions & 6 deletions opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,23 +291,25 @@ def get_aggregated_resources(
:param timeout: Number of seconds to wait for each detector to return
:return:
"""
final_resource = initial_resource or _EMPTY_RESOURCE
detectors = [OTELResourceDetector()] + detectors
detectors_merged_resource = initial_resource or Resource.create()

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(detector.detect) for detector in detectors]
for detector_ind, future in enumerate(futures):
detector = detectors[detector_ind]
try:
detected_resources = future.result(timeout=timeout)
detected_resource = future.result(timeout=timeout)
# pylint: disable=broad-except
except Exception as ex:
if detector.raise_on_error:
raise ex
logger.warning(
"Exception %s in detector %s, ignoring", ex, detector
)
detected_resources = _EMPTY_RESOURCE
detected_resource = _EMPTY_RESOURCE
finally:
final_resource = final_resource.merge(detected_resources)
return final_resource
detectors_merged_resource = detectors_merged_resource.merge(
detected_resource
)

return detectors_merged_resource
83 changes: 60 additions & 23 deletions opentelemetry-sdk/tests/resources/test_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,18 @@ def test_invalid_resource_attribute_values(self):

def test_aggregated_resources_no_detectors(self):
aggregated_resources = resources.get_aggregated_resources([])
self.assertEqual(aggregated_resources, resources.Resource.get_empty())
self.assertEqual(
aggregated_resources,
resources._DEFAULT_RESOURCE.merge(
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
),
)

def test_aggregated_resources_with_static_resource(self):
def test_aggregated_resources_with_default_destroying_static_resource(
self,
):
static_resource = resources.Resource({"static_key": "static_value"})

self.assertEqual(
Expand Down Expand Up @@ -280,13 +289,19 @@ def test_aggregated_resources_multiple_detectors(self):
resources.get_aggregated_resources(
[resource_detector1, resource_detector2, resource_detector3]
),
resources.Resource(
{
"key1": "value1",
"key2": "try_to_overwrite_existing_value",
"key3": "try_to_overwrite_existing_value",
"key4": "value4",
}
resources._DEFAULT_RESOURCE.merge(
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
).merge(
resources.Resource(
{
"key1": "value1",
"key2": "try_to_overwrite_existing_value",
"key3": "try_to_overwrite_existing_value",
"key4": "value4",
}
)
),
)

Expand Down Expand Up @@ -321,18 +336,30 @@ def test_aggregated_resources_different_schema_urls(self):
resources.get_aggregated_resources(
[resource_detector1, resource_detector2]
),
resources.Resource(
{"key1": "value1", "key2": "value2", "key3": "value3"},
"url1",
resources._DEFAULT_RESOURCE.merge(
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
).merge(
resources.Resource(
{"key1": "value1", "key2": "value2", "key3": "value3"},
"url1",
)
),
)
with self.assertLogs(level=ERROR) as log_entry:
self.assertEqual(
resources.get_aggregated_resources(
[resource_detector2, resource_detector3]
),
resources.Resource(
{"key2": "value2", "key3": "value3"}, "url1"
resources._DEFAULT_RESOURCE.merge(
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
).merge(
resources.Resource(
{"key2": "value2", "key3": "value3"}, "url1"
)
),
)
self.assertIn("url1", log_entry.output[0])
Expand All @@ -347,14 +374,20 @@ def test_aggregated_resources_different_schema_urls(self):
resource_detector1,
]
),
resources.Resource(
{
"key1": "value1",
"key2": "try_to_overwrite_existing_value",
"key3": "try_to_overwrite_existing_value",
"key4": "value4",
},
"url1",
resources._DEFAULT_RESOURCE.merge(
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
).merge(
resources.Resource(
{
"key1": "value1",
"key2": "try_to_overwrite_existing_value",
"key3": "try_to_overwrite_existing_value",
"key4": "value4",
},
"url1",
)
),
)
self.assertIn("url1", log_entry.output[0])
Expand All @@ -366,7 +399,11 @@ def test_resource_detector_ignore_error(self):
resource_detector.raise_on_error = False
self.assertEqual(
resources.get_aggregated_resources([resource_detector]),
resources.Resource.get_empty(),
resources._DEFAULT_RESOURCE.merge(
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
),
)

def test_resource_detector_raise_error(self):
Expand Down

0 comments on commit 3cdb54f

Please sign in to comment.