From 3514f1c50a8e14005d48e5982d2096abfb790e9d Mon Sep 17 00:00:00 2001 From: Amos Law Date: Tue, 15 Sep 2020 16:50:04 -0700 Subject: [PATCH] Add type hints to OTLP exporter --- .../opentelemetry/exporter/otlp/exporter.py | 35 ++++++++++++---- .../otlp/metrics_exporter/__init__.py | 36 +++++++++++----- .../exporter/otlp/trace_exporter/__init__.py | 41 +++++++++++-------- 3 files changed, 76 insertions(+), 36 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/exporter.py b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/exporter.py index 7cd9f905e06..49edf6d8b1c 100644 --- a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/exporter.py +++ b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/exporter.py @@ -18,6 +18,9 @@ from abc import ABC, abstractmethod from collections.abc import Mapping, Sequence from time import sleep +from typing import Any, Callable, Dict, Generic, List, Optional +from typing import Sequence as TypingSequence +from typing import Text, Tuple, TypeVar from backoff import expo from google.rpc.error_details_pb2 import RetryInfo @@ -31,11 +34,17 @@ from opentelemetry.proto.common.v1.common_pb2 import AnyValue, KeyValue from opentelemetry.proto.resource.v1.resource_pb2 import Resource +from opentelemetry.sdk.resources import Resource as SDKResource logger = logging.getLogger(__name__) +SDKDataT = TypeVar("SDKDataT") +ResourceDataT = TypeVar("ResourceDataT") +TypingResourceT = TypeVar("TypingResourceT") +ExportServiceRequestT = TypeVar("ExportServiceRequestT") +ExportResultT = TypeVar("ExportResultT") -def _translate_key_values(key, value): +def _translate_key_values(key: Text, value: Any) -> KeyValue: if isinstance(value, bool): any_value = AnyValue(bool_value=value) @@ -64,8 +73,12 @@ def _translate_key_values(key, value): def _get_resource_data( - sdk_resource_instrumentation_library_data, resource_class, name -): + sdk_resource_instrumentation_library_data: Dict[ + SDKResource, ResourceDataT + ], + resource_class: Callable[..., TypingResourceT], + name: str, +) -> List[TypingResourceT]: resource_data = [] @@ -101,7 +114,9 @@ def _get_resource_data( # pylint: disable=no-member -class OTLPExporterMixin(ABC): +class OTLPExporterMixin( + ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT] +): """OTLP span/metric exporter Args: @@ -114,12 +129,12 @@ def __init__( self, endpoint: str = "localhost:55680", credentials: ChannelCredentials = None, - metadata: tuple = None, + metadata: Tuple[Any] = None, ): super().__init__() self._metadata = metadata - self._collector_span_kwargs = None + self._collector_span_kwargs: Optional[Dict[str, Any]] = None if credentials is None: self._client = self._stub(insecure_channel(endpoint)) @@ -127,10 +142,12 @@ def __init__( self._client = self._stub(secure_channel(endpoint, credentials)) @abstractmethod - def _translate_data(self, data): + def _translate_data( + self, data: TypingSequence[SDKDataT] + ) -> ExportServiceRequestT: pass - def _export(self, data): + def _export(self, data: TypingSequence[SDKDataT]) -> ExportResultT: # expo returns a generator that yields delay values which grow # exponentially. Once delay is greater than max_value, the yielded # value will remain constant. @@ -190,5 +207,5 @@ def _export(self, data): return self._result.FAILURE - def shutdown(self): + def shutdown(self) -> None: pass diff --git a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py index 944428e37d0..630010f1b2e 100644 --- a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/metrics_exporter/__init__.py @@ -15,7 +15,7 @@ """OTLP Metrics Exporter""" import logging -from typing import Sequence +from typing import Any, Dict, List, Sequence, Type, TypeVar # pylint: disable=duplicate-code from opentelemetry.exporter.otlp.exporter import ( @@ -41,9 +41,8 @@ MetricDescriptor, ResourceMetrics, ) -from opentelemetry.sdk.metrics import Counter -from opentelemetry.sdk.metrics import Metric as SDKMetric from opentelemetry.sdk.metrics import ( + Counter, SumObserver, UpDownCounter, UpDownSumObserver, @@ -51,14 +50,19 @@ ValueRecorder, ) from opentelemetry.sdk.metrics.export import ( + MetricRecord, MetricsExporter, MetricsExportResult, ) +from opentelemetry.sdk.resources import Resource logger = logging.getLogger(__name__) +DataPointT = TypeVar("DataPointT", Int64DataPoint, DoubleDataPoint) -def _get_data_points(sdk_metric, data_point_class): +def _get_data_points( + sdk_metric: MetricRecord, data_point_class: Type[DataPointT] +) -> List[DataPointT]: data_points = [] @@ -89,7 +93,7 @@ def _get_data_points(sdk_metric, data_point_class): return data_points -def _get_temporality(instrument): +def _get_temporality(instrument: Any) -> MetricDescriptor.TemporalityValue: # pylint: disable=no-member if isinstance(instrument, (Counter, UpDownCounter)): temporality = MetricDescriptor.Temporality.DELTA @@ -107,7 +111,7 @@ def _get_temporality(instrument): return temporality -def _get_type(value_type): +def _get_type(value_type: Any) -> MetricDescriptor.TypeValue: # pylint: disable=no-member if value_type is int: type_ = MetricDescriptor.Type.INT64 @@ -126,7 +130,13 @@ def _get_type(value_type): return type_ -class OTLPMetricsExporter(MetricsExporter, OTLPExporterMixin): +class OTLPMetricsExporter( + MetricsExporter, + OTLPExporterMixin[ + MetricRecord, ExportMetricsServiceRequest, MetricsExportResult + ], +): + # pylint: disable=unsubscriptable-object """OTLP metrics exporter Args: @@ -138,11 +148,15 @@ class OTLPMetricsExporter(MetricsExporter, OTLPExporterMixin): _stub = MetricsServiceStub _result = MetricsExportResult - def _translate_data(self, data): + def _translate_data( + self, data: Sequence[MetricRecord] + ) -> ExportMetricsServiceRequest: # pylint: disable=too-many-locals,no-member # pylint: disable=attribute-defined-outside-init - sdk_resource_instrumentation_library_metrics = {} + sdk_resource_instrumentation_library_metrics: Dict[ + Resource, InstrumentationLibraryMetrics + ] = {} for sdk_metric in data: @@ -153,7 +167,7 @@ def _translate_data(self, data): sdk_metric.instrument.meter.resource ] = InstrumentationLibraryMetrics() - self._metric_descriptor_kwargs = {} + self._metric_descriptor_kwargs: Dict[Any, Any] = {} metric_descriptor = MetricDescriptor( name=sdk_metric.instrument.name, @@ -193,6 +207,6 @@ def _translate_data(self, data): ) ) - def export(self, metrics: Sequence[SDKMetric]) -> MetricsExportResult: + def export(self, metrics: Sequence[MetricRecord]) -> MetricsExportResult: # pylint: disable=arguments-differ return self._export(metrics) diff --git a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/trace_exporter/__init__.py index fd1d8e235e4..49c8c53155e 100644 --- a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/trace_exporter/__init__.py @@ -14,7 +14,7 @@ """OTLP Span Exporter""" import logging -from typing import Sequence +from typing import Any, Dict, Sequence from opentelemetry.exporter.otlp.exporter import ( OTLPExporterMixin, @@ -34,6 +34,7 @@ ) from opentelemetry.proto.trace.v1.trace_pb2 import Span as CollectorSpan from opentelemetry.proto.trace.v1.trace_pb2 import Status +from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import Span as SDKSpan from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult @@ -41,7 +42,11 @@ # pylint: disable=no-member -class OTLPSpanExporter(SpanExporter, OTLPExporterMixin): +class OTLPSpanExporter( + SpanExporter, + OTLPExporterMixin[SDKSpan, ExportTraceServiceRequest, SpanExportResult], +): + # pylint: disable=unsubscriptable-object """OTLP span exporter Args: @@ -53,34 +58,34 @@ class OTLPSpanExporter(SpanExporter, OTLPExporterMixin): _result = SpanExportResult _stub = TraceServiceStub - def _translate_name(self, sdk_span): + def _translate_name(self, sdk_span: SDKSpan) -> None: self._collector_span_kwargs["name"] = sdk_span.name - def _translate_start_time(self, sdk_span): + def _translate_start_time(self, sdk_span: SDKSpan) -> None: self._collector_span_kwargs[ "start_time_unix_nano" ] = sdk_span.start_time - def _translate_end_time(self, sdk_span): + def _translate_end_time(self, sdk_span: SDKSpan) -> None: self._collector_span_kwargs["end_time_unix_nano"] = sdk_span.end_time - def _translate_span_id(self, sdk_span): + def _translate_span_id(self, sdk_span: SDKSpan) -> None: self._collector_span_kwargs[ "span_id" ] = sdk_span.context.span_id.to_bytes(8, "big") - def _translate_trace_id(self, sdk_span): + def _translate_trace_id(self, sdk_span: SDKSpan) -> None: self._collector_span_kwargs[ "trace_id" ] = sdk_span.context.trace_id.to_bytes(16, "big") - def _translate_parent(self, sdk_span): + def _translate_parent(self, sdk_span: SDKSpan) -> None: if sdk_span.parent is not None: self._collector_span_kwargs[ "parent_span_id" ] = sdk_span.parent.span_id.to_bytes(8, "big") - def _translate_context_trace_state(self, sdk_span): + def _translate_context_trace_state(self, sdk_span: SDKSpan) -> None: if sdk_span.context.trace_state is not None: self._collector_span_kwargs["trace_state"] = ",".join( [ @@ -89,7 +94,7 @@ def _translate_context_trace_state(self, sdk_span): ] ) - def _translate_attributes(self, sdk_span): + def _translate_attributes(self, sdk_span: SDKSpan) -> None: if sdk_span.attributes: self._collector_span_kwargs["attributes"] = [] @@ -103,7 +108,7 @@ def _translate_attributes(self, sdk_span): except Exception as error: # pylint: disable=broad-except logger.exception(error) - def _translate_events(self, sdk_span): + def _translate_events(self, sdk_span: SDKSpan) -> None: if sdk_span.events: self._collector_span_kwargs["events"] = [] @@ -127,7 +132,7 @@ def _translate_events(self, sdk_span): collector_span_event ) - def _translate_links(self, sdk_span): + def _translate_links(self, sdk_span: SDKSpan) -> None: if sdk_span.links: self._collector_span_kwargs["links"] = [] @@ -153,16 +158,20 @@ def _translate_links(self, sdk_span): collector_span_link ) - def _translate_status(self, sdk_span): + def _translate_status(self, sdk_span: SDKSpan) -> None: if sdk_span.status is not None: self._collector_span_kwargs["status"] = Status( code=sdk_span.status.canonical_code.value, message=sdk_span.status.description, ) - def _translate_data(self, data) -> ExportTraceServiceRequest: + def _translate_data( + self, data: Sequence[SDKSpan] + ) -> ExportTraceServiceRequest: - sdk_resource_instrumentation_library_spans = {} + sdk_resource_instrumentation_library_spans: Dict[ + Resource, InstrumentationLibrarySpans + ] = {} for sdk_span in data: @@ -186,7 +195,7 @@ def _translate_data(self, data) -> ExportTraceServiceRequest: sdk_span.resource ] = instrumentation_library_spans - self._collector_span_kwargs = {} + self._collector_span_kwargs: Dict[str, Any] = {} self._translate_name(sdk_span) self._translate_start_time(sdk_span)