Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Prometheus Remote Write Exporter (3/6) #207

Merged
merged 1 commit into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@
((#180)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/180])
- Add Exporter constructor validation methods
((#206)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/206])
- Add conversion to TimeSeries methods
((#207)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/207])
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import re
from typing import Dict, Sequence

from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import (
WriteRequest,
)
from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import (
Label,
Sample,
Expand All @@ -24,6 +29,15 @@
MetricsExporter,
MetricsExportResult,
)
from opentelemetry.sdk.metrics.export.aggregate import (
HistogramAggregator,
LastValueAggregator,
MinMaxSumCountAggregator,
SumAggregator,
ValueObserverAggregator,
)

logger = logging.getLogger(__name__)


class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
Expand Down Expand Up @@ -55,6 +69,14 @@ def __init__(
self.tls_config = tls_config
self.proxies = proxies

self.converter_map = {
MinMaxSumCountAggregator: self._convert_from_min_max_sum_count,
SumAggregator: self._convert_from_sum,
HistogramAggregator: self._convert_from_histogram,
LastValueAggregator: self._convert_from_last_value,
ValueObserverAggregator: self._convert_from_value_observer,
}

@property
def endpoint(self):
return self._endpoint
Expand Down Expand Up @@ -142,50 +164,145 @@ def export(
def shutdown(self) -> None:
raise NotImplementedError()

def convert_to_timeseries(
def _convert_to_timeseries(
lzchen marked this conversation as resolved.
Show resolved Hide resolved
self, export_records: Sequence[ExportRecord]
) -> Sequence[TimeSeries]:
raise NotImplementedError()

def convert_from_sum(self, sum_record: ExportRecord) -> TimeSeries:
raise NotImplementedError()
timeseries = []
for export_record in export_records:
aggregator_type = type(export_record.aggregator)
converter = self.converter_map.get(aggregator_type)
if converter:
timeseries.extend(converter(export_record))
else:
logger.warning(
"%s aggregator is not supported, record dropped",
aggregator_type,
)
return timeseries

def convert_from_min_max_sum_count(
def _convert_from_sum(
self, sum_record: ExportRecord
) -> Sequence[TimeSeries]:
return [
self._create_timeseries(
sum_record,
sum_record.instrument.name + "_sum",
sum_record.aggregator.checkpoint,
)
]

def _convert_from_min_max_sum_count(
self, min_max_sum_count_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()

def convert_from_histogram(
) -> Sequence[TimeSeries]:
timeseries = []
for agg_type in ["min", "max", "sum", "count"]:
name = min_max_sum_count_record.instrument.name + "_" + agg_type
value = getattr(
min_max_sum_count_record.aggregator.checkpoint, agg_type
)
timeseries.append(
lzchen marked this conversation as resolved.
Show resolved Hide resolved
self._create_timeseries(min_max_sum_count_record, name, value)
)
return timeseries

def _convert_from_histogram(
self, histogram_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()
) -> Sequence[TimeSeries]:
timeseries = []
for bound in histogram_record.aggregator.checkpoint.keys():
bound_str = "+Inf" if bound == float("inf") else str(bound)
value = histogram_record.aggregator.checkpoint[bound]
timeseries.append(
self._create_timeseries(
histogram_record,
histogram_record.instrument.name + "_histogram",
value,
extra_label=("le", bound_str),
)
)
return timeseries

def convert_from_last_value(
def _convert_from_last_value(
self, last_value_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()

def convert_from_value_observer(
) -> Sequence[TimeSeries]:
return [
self._create_timeseries(
last_value_record,
last_value_record.instrument.name + "_last",
last_value_record.aggregator.checkpoint,
)
]

def _convert_from_value_observer(
self, value_observer_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()
) -> Sequence[TimeSeries]:
timeseries = []
for agg_type in ["min", "max", "sum", "count", "last"]:
timeseries.append(
self._create_timeseries(
value_observer_record,
value_observer_record.instrument.name + "_" + agg_type,
getattr(
value_observer_record.aggregator.checkpoint, agg_type
),
)
)
return timeseries

def convert_from_quantile(
# TODO: Implement convert from quantile once supported by SDK for Prometheus Summaries
def _convert_from_quantile(
self, summary_record: ExportRecord
) -> TimeSeries:
) -> Sequence[TimeSeries]:
raise NotImplementedError()

# pylint: disable=no-member
def create_timeseries(
self, export_record: ExportRecord, name, value: float
def _create_timeseries(
self,
export_record: ExportRecord,
name: str,
value: float,
extra_label: (str, str) = None,
) -> TimeSeries:
raise NotImplementedError()

def create_sample(self, timestamp: int, value: float) -> Sample:
raise NotImplementedError()
timeseries = TimeSeries()
seen = set()

def add_label(label_name: str, label_value: str):
# Label name must contain only alphanumeric characters and underscores
label_name = re.sub("[^\\w_]", "_", label_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we might get labels that are supposed to be unique but will be dropped due to removal of these characters. Is the alphanumeric + underscores limitation of the labels specific for Prometheus?

e.g.
testing and ${testing}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limitation is specific to the TimeSeries type https://prometheus.io/docs/concepts/data_model/ . You bring up a valid point. Since the non-alphanumeric characters are replaced with underscores: '$testing' and '_testing' will be considered the same label which would be an issue. Also, a label_name cannot start with two underscores. Would it be better to validate label names (raise Error) or add more complex sanitizing? I am not quite sure how to prevent duplicates when addressing label names that contain non-alphanumeric characters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the limitation is Prometheus specific, I am okay with the current implementation. The only problem was if it were a limitation we imposed on the OT side, then Prometheus users would have unexpected behaviour. But if it's on the Prometheus side, then user's should EXPECT this behaviour.

I wonder in this case if we should DROP labels that defy the TimeSeries definition (and maybe log a warning), instead of only dropping the duplicate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the rare case this occurs, I think whoever is misusing the exporter will value the label with the non-ascii characters as much as the valid one, so I suggest dropping the duplicate but logging a warning that after sanitizing, a duplicate label had to be dropped.

if label_name not in seen:
label = Label()
label.name = label_name
label.value = label_value
timeseries.labels.append(label)
seen.add(label_name)
else:
logger.warning(
"Duplicate label with name %s and value %s",
label_name,
label_value,
)

def create_label(self, name: str, value: str) -> Label:
raise NotImplementedError()
# The __name__ label is required by PromQL as its value appears as the metric_name
add_label("__name__", name)
lzchen marked this conversation as resolved.
Show resolved Hide resolved
if extra_label:
add_label(extra_label[0], extra_label[1])
if export_record.resource.attributes:
for (
label_name,
label_value,
) in export_record.resource.attributes.items():
add_label(label_name, str(label_value))
if export_record.labels:
for [label_name, label_value] in export_record.labels:
add_label(label_name, label_value)

sample = Sample()
sample.timestamp = int(
export_record.aggregator.last_update_timestamp / 1000000
)
sample.value = value
timeseries.samples.append(sample)
return timeseries

def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
raise NotImplementedError()
Expand Down
Loading