Skip to content

Commit

Permalink
Add default & service name to resource aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanielRN committed Aug 4, 2021
1 parent e2eb73c commit bfe963a
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 45 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.4.0-0.23b0...HEAD)
- `opentelemetry-sdk` `get_aggregated_resource()` returns default resource and service name
whenever called
([#2013])(https://github.com/open-telemetry/opentelemetry-python/pull/2013)
- `opentelemetry-distro` & `opentelemetry-sdk` Moved Auto Instrumentation Configurator code to SDK
to let distros use its default implementation
([#1937](https://github.com/open-telemetry/opentelemetry-python/pull/1937))
Expand Down
52 changes: 35 additions & 17 deletions opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,20 +165,10 @@ def create(
"""
if not attributes:
attributes = {}
resource = _DEFAULT_RESOURCE.merge(
OTELResourceDetector().detect()
).merge(Resource(attributes, schema_url))
if not resource.attributes.get(SERVICE_NAME, None):
default_service_name = "unknown_service"
process_executable_name = resource.attributes.get(
PROCESS_EXECUTABLE_NAME, None
)
if process_executable_name:
default_service_name += ":" + process_executable_name
resource = resource.merge(
Resource({SERVICE_NAME: default_service_name}, schema_url)
)
return resource

return get_aggregated_resources(
[], final_resource=Resource(attributes, schema_url)
)

@staticmethod
def get_empty() -> "Resource":
Expand Down Expand Up @@ -282,6 +272,7 @@ def detect(self) -> "Resource":
def get_aggregated_resources(
detectors: typing.List["ResourceDetector"],
initial_resource: typing.Optional[Resource] = None,
final_resource: typing.Optional[Resource] = None,
timeout=5,
) -> "Resource":
"""Retrieves resources from detectors in the order that they were passed
Expand All @@ -291,7 +282,11 @@ def get_aggregated_resources(
:param timeout: Number of seconds to wait for each detector to return
:return:
"""
final_resource = initial_resource or _EMPTY_RESOURCE
detectors_resource = (
_DEFAULT_RESOURCE.merge(initial_resource)
if initial_resource
else _DEFAULT_RESOURCE
)
detectors = [OTELResourceDetector()] + detectors

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
Expand All @@ -309,5 +304,28 @@ def get_aggregated_resources(
)
detected_resources = _EMPTY_RESOURCE
finally:
final_resource = final_resource.merge(detected_resources)
return final_resource
detectors_resource = detectors_resource.merge(
detected_resources
)

merged_resource = (
detectors_resource.merge(final_resource)
if final_resource
else detectors_resource
)

if merged_resource.attributes.get(SERVICE_NAME):
return merged_resource

default_service_name = "unknown_service"

process_executable_name = merged_resource.attributes.get(
PROCESS_EXECUTABLE_NAME
)

if process_executable_name:
default_service_name += ":" + process_executable_name

return merged_resource.merge(
Resource({SERVICE_NAME: default_service_name})
)
101 changes: 73 additions & 28 deletions opentelemetry-sdk/tests/resources/test_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,14 @@ def test_invalid_resource_attribute_values(self):

def test_aggregated_resources_no_detectors(self):
aggregated_resources = resources.get_aggregated_resources([])
self.assertEqual(aggregated_resources, resources.Resource.get_empty())
self.assertEqual(
aggregated_resources,
resources._DEFAULT_RESOURCE.merge(
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
),
)

def test_aggregated_resources_with_static_resource(self):
static_resource = resources.Resource({"static_key": "static_value"})
Expand All @@ -239,7 +246,11 @@ def test_aggregated_resources_with_static_resource(self):
resources.get_aggregated_resources(
[], initial_resource=static_resource
),
static_resource,
resources._DEFAULT_RESOURCE.merge(
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
).merge(static_resource),
)

resource_detector = mock.Mock(spec=resources.ResourceDetector)
Expand All @@ -250,11 +261,17 @@ def test_aggregated_resources_with_static_resource(self):
resources.get_aggregated_resources(
[resource_detector], initial_resource=static_resource
),
resources.Resource(
{
"static_key": "try_to_overwrite_existing_value",
"key": "value",
}
resources._DEFAULT_RESOURCE.merge(
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
).merge(
resources.Resource(
{
"static_key": "try_to_overwrite_existing_value",
"key": "value",
}
)
),
)

Expand All @@ -280,13 +297,19 @@ def test_aggregated_resources_multiple_detectors(self):
resources.get_aggregated_resources(
[resource_detector1, resource_detector2, resource_detector3]
),
resources.Resource(
{
"key1": "value1",
"key2": "try_to_overwrite_existing_value",
"key3": "try_to_overwrite_existing_value",
"key4": "value4",
}
resources._DEFAULT_RESOURCE.merge(
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
).merge(
resources.Resource(
{
"key1": "value1",
"key2": "try_to_overwrite_existing_value",
"key3": "try_to_overwrite_existing_value",
"key4": "value4",
}
)
),
)

Expand Down Expand Up @@ -321,18 +344,30 @@ def test_aggregated_resources_different_schema_urls(self):
resources.get_aggregated_resources(
[resource_detector1, resource_detector2]
),
resources.Resource(
{"key1": "value1", "key2": "value2", "key3": "value3"},
"url1",
resources._DEFAULT_RESOURCE.merge(
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
).merge(
resources.Resource(
{"key1": "value1", "key2": "value2", "key3": "value3"},
"url1",
)
),
)
with self.assertLogs(level=ERROR) as log_entry:
self.assertEqual(
resources.get_aggregated_resources(
[resource_detector2, resource_detector3]
),
resources.Resource(
{"key2": "value2", "key3": "value3"}, "url1"
resources._DEFAULT_RESOURCE.merge(
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
).merge(
resources.Resource(
{"key2": "value2", "key3": "value3"}, "url1"
)
),
)
self.assertIn("url1", log_entry.output[0])
Expand All @@ -347,14 +382,20 @@ def test_aggregated_resources_different_schema_urls(self):
resource_detector1,
]
),
resources.Resource(
{
"key1": "value1",
"key2": "try_to_overwrite_existing_value",
"key3": "try_to_overwrite_existing_value",
"key4": "value4",
},
"url1",
resources._DEFAULT_RESOURCE.merge(
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
).merge(
resources.Resource(
{
"key1": "value1",
"key2": "try_to_overwrite_existing_value",
"key3": "try_to_overwrite_existing_value",
"key4": "value4",
},
"url1",
)
),
)
self.assertIn("url1", log_entry.output[0])
Expand All @@ -366,7 +407,11 @@ def test_resource_detector_ignore_error(self):
resource_detector.raise_on_error = False
self.assertEqual(
resources.get_aggregated_resources([resource_detector]),
resources.Resource.get_empty(),
resources._DEFAULT_RESOURCE.merge(
resources.Resource(
{resources.SERVICE_NAME: "unknown_service"}, ""
)
),
)

def test_resource_detector_raise_error(self):
Expand Down

0 comments on commit bfe963a

Please sign in to comment.