Skip to content

Commit

Permalink
Add conversion to TimeSeries methods
Browse files Browse the repository at this point in the history
  • Loading branch information
shovnik committed Dec 1, 2020
1 parent aba6e97 commit 8750f9d
Show file tree
Hide file tree
Showing 3 changed files with 362 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@
((#180)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/180])
- Add Exporter constructor validation methods
((#206)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/206])
- Add conversion to TimeSeries methods
((#207)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/207])
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import re
from typing import Dict, Sequence

from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import (
WriteRequest,
)
from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import (
Label,
Sample,
Expand All @@ -24,6 +29,15 @@
MetricsExporter,
MetricsExportResult,
)
from opentelemetry.sdk.metrics.export.aggregate import (
HistogramAggregator,
LastValueAggregator,
MinMaxSumCountAggregator,
SumAggregator,
ValueObserverAggregator,
)

logger = logging.getLogger(__name__)


class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
Expand Down Expand Up @@ -55,6 +69,14 @@ def __init__(
self.tls_config = tls_config
self.proxies = proxies

self.converter_map = {
MinMaxSumCountAggregator: self._convert_from_min_max_sum_count,
SumAggregator: self._convert_from_sum,
HistogramAggregator: self._convert_from_histogram,
LastValueAggregator: self._convert_from_last_value,
ValueObserverAggregator: self._convert_from_value_observer,
}

@property
def endpoint(self):
return self._endpoint
Expand Down Expand Up @@ -142,50 +164,145 @@ def export(
def shutdown(self) -> None:
raise NotImplementedError()

def convert_to_timeseries(
def _convert_to_timeseries(
self, export_records: Sequence[ExportRecord]
) -> Sequence[TimeSeries]:
raise NotImplementedError()

def convert_from_sum(self, sum_record: ExportRecord) -> TimeSeries:
raise NotImplementedError()
timeseries = []
for export_record in export_records:
aggregator_type = type(export_record.aggregator)
converter = self.converter_map.get(aggregator_type)
if converter:
timeseries.extend(converter(export_record))
else:
logger.warning(
"%s aggregator is not supported, record dropped",
aggregator_type,
)
return timeseries

def convert_from_min_max_sum_count(
def _convert_from_sum(
self, sum_record: ExportRecord
) -> Sequence[TimeSeries]:
return [
self._create_timeseries(
sum_record,
sum_record.instrument.name + "_sum",
sum_record.aggregator.checkpoint,
)
]

def _convert_from_min_max_sum_count(
self, min_max_sum_count_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()

def convert_from_histogram(
) -> Sequence[TimeSeries]:
timeseries = []
for agg_type in ["min", "max", "sum", "count"]:
name = min_max_sum_count_record.instrument.name + "_" + agg_type
value = getattr(
min_max_sum_count_record.aggregator.checkpoint, agg_type
)
timeseries.append(
self._create_timeseries(min_max_sum_count_record, name, value)
)
return timeseries

def _convert_from_histogram(
self, histogram_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()
) -> Sequence[TimeSeries]:
timeseries = []
for bound in histogram_record.aggregator.checkpoint.keys():
bound_str = "+Inf" if bound == float("inf") else str(bound)
value = histogram_record.aggregator.checkpoint[bound]
timeseries.append(
self._create_timeseries(
histogram_record,
histogram_record.instrument.name + "_histogram",
value,
extra_label=("le", bound_str),
)
)
return timeseries

def convert_from_last_value(
def _convert_from_last_value(
self, last_value_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()

def convert_from_value_observer(
) -> Sequence[TimeSeries]:
return [
self._create_timeseries(
last_value_record,
last_value_record.instrument.name + "_last",
last_value_record.aggregator.checkpoint,
)
]

def _convert_from_value_observer(
self, value_observer_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()
) -> Sequence[TimeSeries]:
timeseries = []
for agg_type in ["min", "max", "sum", "count", "last"]:
timeseries.append(
self._create_timeseries(
value_observer_record,
value_observer_record.instrument.name + "_" + agg_type,
getattr(
value_observer_record.aggregator.checkpoint, agg_type
),
)
)
return timeseries

def convert_from_quantile(
# TODO: Implement convert from quantile once supported by SDK for Prometheus Summaries
def _convert_from_quantile(
self, summary_record: ExportRecord
) -> TimeSeries:
) -> Sequence[TimeSeries]:
raise NotImplementedError()

# pylint: disable=no-member
def create_timeseries(
self, export_record: ExportRecord, name, value: float
def _create_timeseries(
self,
export_record: ExportRecord,
name: str,
value: float,
extra_label: (str, str) = None,
) -> TimeSeries:
raise NotImplementedError()

def create_sample(self, timestamp: int, value: float) -> Sample:
raise NotImplementedError()
timeseries = TimeSeries()
seen = set()

def add_label(label_name: str, label_value: str):
# Label name must contain only alphanumeric characters and underscores
label_name = re.sub("[^\\w_]", "_", label_name)
if label_name not in seen:
label = Label()
label.name = label_name
label.value = label_value
timeseries.labels.append(label)
seen.add(label_name)
else:
logger.warning(
"Duplicate label with name %s and value %s",
label_name,
label_value,
)

def create_label(self, name: str, value: str) -> Label:
raise NotImplementedError()
# The __name__ label is required by PromQL as its value appears as the metric_name
add_label("__name__", name)
if extra_label:
add_label(extra_label[0], extra_label[1])
if export_record.resource.attributes:
for (
label_name,
label_value,
) in export_record.resource.attributes.items():
add_label(label_name, str(label_value))
if export_record.labels:
for [label_name, label_value] in export_record.labels:
add_label(label_name, label_value)

sample = Sample()
sample.timestamp = int(
export_record.aggregator.last_update_timestamp / 1000000
)
sample.value = value
timeseries.samples.append(sample)
return timeseries

def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
raise NotImplementedError()
Expand Down
Loading

0 comments on commit 8750f9d

Please sign in to comment.