Skip to content

Commit

Permalink
Aggregator calls Resource.create() to set default attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanielRN committed Aug 13, 2021
1 parent 8f6f1b0 commit a12c3a0
Showing 1 changed file with 20 additions and 36 deletions.
56 changes: 20 additions & 36 deletions opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,20 @@ def create(
"""
if not attributes:
attributes = {}

return get_aggregated_resources(
[], final_resource=Resource(attributes, schema_url)
)
resource = _DEFAULT_RESOURCE.merge(
OTELResourceDetector().detect()
).merge(Resource(attributes, schema_url))
if not resource.attributes.get(SERVICE_NAME, None):
default_service_name = "unknown_service"
process_executable_name = resource.attributes.get(
PROCESS_EXECUTABLE_NAME, None
)
if process_executable_name:
default_service_name += ":" + process_executable_name
resource = resource.merge(
Resource({SERVICE_NAME: default_service_name}, schema_url)
)
return resource

@staticmethod
def get_empty() -> "Resource":
Expand Down Expand Up @@ -272,7 +282,6 @@ def detect(self) -> "Resource":
def get_aggregated_resources(
detectors: typing.List["ResourceDetector"],
initial_resource: typing.Optional[Resource] = None,
final_resource: typing.Optional[Resource] = None,
timeout=5,
) -> "Resource":
"""Retrieves resources from detectors in the order that they were passed
Expand All @@ -282,50 +291,25 @@ def get_aggregated_resources(
:param timeout: Number of seconds to wait for each detector to return
:return:
"""
detectors_resource = (
_DEFAULT_RESOURCE.merge(initial_resource)
if initial_resource
else _DEFAULT_RESOURCE
)
detectors = [OTELResourceDetector()] + detectors
detectors_merged_resource = initial_resource or _EMPTY_RESOURCE

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(detector.detect) for detector in detectors]
for detector_ind, future in enumerate(futures):
detector = detectors[detector_ind]
try:
detected_resources = future.result(timeout=timeout)
detected_resource = future.result(timeout=timeout)
# pylint: disable=broad-except
except Exception as ex:
if detector.raise_on_error:
raise ex
logger.warning(
"Exception %s in detector %s, ignoring", ex, detector
)
detected_resources = _EMPTY_RESOURCE
detected_resource = _EMPTY_RESOURCE
finally:
detectors_resource = detectors_resource.merge(
detected_resources
detectors_merged_resource = detectors_merged_resource.merge(
detected_resource
)

merged_resource = (
detectors_resource.merge(final_resource)
if final_resource
else detectors_resource
)

if merged_resource.attributes.get(SERVICE_NAME):
return merged_resource

default_service_name = "unknown_service"

process_executable_name = merged_resource.attributes.get(
PROCESS_EXECUTABLE_NAME
)

if process_executable_name:
default_service_name += ":" + process_executable_name

return merged_resource.merge(
Resource({SERVICE_NAME: default_service_name})
)
return Resource.create().merge(detectors_merged_resource)

0 comments on commit a12c3a0

Please sign in to comment.