Skip to content

Commit

Permalink
Heavily refactored tests and code after review and fixed issus in His…
Browse files Browse the repository at this point in the history
…togram Conversion
  • Loading branch information
shovnik committed Nov 27, 2020
1 parent df62a17 commit f971041
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,131 +153,144 @@ def export(
def shutdown(self) -> None:
raise NotImplementedError()

def convert_to_timeseries(
def _convert_to_timeseries(
self, export_records: Sequence[ExportRecord]
) -> Sequence[TimeSeries]:
converter_map = {
MinMaxSumCountAggregator: self.convert_from_min_max_sum_count,
SumAggregator: self.convert_from_sum,
HistogramAggregator: self.convert_from_histogram,
LastValueAggregator: self.convert_from_last_value,
ValueObserverAggregator: self.convert_from_last_value,
MinMaxSumCountAggregator: self._convert_from_min_max_sum_count,
SumAggregator: self._convert_from_sum,
HistogramAggregator: self._convert_from_histogram,
LastValueAggregator: self._convert_from_last_value,
ValueObserverAggregator: self._convert_from_value_observer,
}
timeseries = []
for export_record in export_records:
aggregator_type = type(export_record.aggregator)
converter = converter_map.get(aggregator_type)
if not converter:
if converter is None:
raise ValueError(
str(aggregator_type) + " conversion is not supported"
)
timeseries.extend(converter(export_record))
return timeseries

def convert_from_sum(self, sum_record: ExportRecord) -> TimeSeries:
name = sum_record.instrument.name
value = sum_record.aggregator.checkpoint
return [self.create_timeseries(sum_record, name, value)]
def _convert_from_sum(self, sum_record: ExportRecord) -> TimeSeries:
return [
self._create_timeseries(
sum_record,
sum_record.instrument.name,
sum_record.aggregator.checkpoint,
)
]

def convert_from_min_max_sum_count(
def _convert_from_min_max_sum_count(
self, min_max_sum_count_record: ExportRecord
) -> TimeSeries:
timeseries = []
agg_types = ["min", "max", "sum", "count"]
for agg_type in agg_types:
for agg_type in ["min", "max", "sum", "count"]:
name = min_max_sum_count_record.instrument.name + "_" + agg_type
value = getattr(
min_max_sum_count_record.aggregator.checkpoint, agg_type
)
timeseries.append(
self.create_timeseries(min_max_sum_count_record, name, value)
self._create_timeseries(min_max_sum_count_record, name, value)
)
return timeseries

def convert_from_histogram(
def _convert_from_histogram(
self, histogram_record: ExportRecord
) -> TimeSeries:
count = 0
timeseries = []
for bound in histogram_record.aggregator.checkpoint.keys():
bb = "+Inf" if bound == float("inf") else str(bound)
name = (
histogram_record.instrument.name + '_bucket{le="' + bb + '"}'
)
bound_str = "+Inf" if bound == float("inf") else str(bound)
value = histogram_record.aggregator.checkpoint[bound]
timeseries.append(
self.create_timeseries(histogram_record, name, value)
self._create_timeseries(
histogram_record,
histogram_record.instrument.name,
value,
labels=[("le", bound_str)],
)
)
count += value
name = histogram_record.instrument.name + "_count"
timeseries.append(
self.create_timeseries(histogram_record, name, float(count))
self._create_timeseries(histogram_record, name, float(count))
)
return timeseries

def convert_from_last_value(
def _convert_from_last_value(
self, last_value_record: ExportRecord
) -> TimeSeries:
name = last_value_record.instrument.name
value = last_value_record.aggregator.checkpoint
return [self.create_timeseries(last_value_record, name, value)]
return [
self._create_timeseries(
last_value_record,
last_value_record.instrument.name,
last_value_record.aggregator.checkpoint,
)
]

def convert_from_value_observer(
def _convert_from_value_observer(
self, value_observer_record: ExportRecord
) -> TimeSeries:
timeseries = []
agg_types = ["min", "max", "sum", "count", "last"]
for agg_type in agg_types:
name = value_observer_record.instrument.name + "_" + agg_type
value = getattr(
value_observer_record.aggregator.checkpoint, agg_type
)
for agg_type in ["min", "max", "sum", "count", "last"]:
timeseries.append(
self.create_timeseries(value_observer_record, name, value)
self._create_timeseries(
value_observer_record,
value_observer_record.instrument.name + "_" + agg_type,
getattr(
value_observer_record.aggregator.checkpoint, agg_type
),
)
)
return timeseries

# TODO: Implement convert from quantile once supported by SDK for Prometheus Summaries
def convert_from_quantile(
def _convert_from_quantile(
self, summary_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()

# pylint: disable=no-member
def create_timeseries(
self, export_record: ExportRecord, name, value: float
def _create_timeseries(
self, export_record: ExportRecord, name, value: float, labels=None
) -> TimeSeries:
timeseries = TimeSeries()
# Add name label, record labels and resource labels
timeseries.labels.append(self.create_label("__name__", name))
resource_attributes = export_record.resource.attributes
for label_name, label_value in resource_attributes.items():
timeseries.labels.append(
self.create_label(label_name, label_value)
)
for label in export_record.labels:
if label[0] not in resource_attributes.keys():
timeseries.labels.append(self.create_label(label[0], label[1]))
# Add sample
timeseries.samples.append(
self.create_sample(
export_record.aggregator.last_update_timestamp, value
)
)
return timeseries
seen = set()

def add_label(label_name, label_value):
# Label name must contain only alphanumeric characters and underscores
label_name = re.sub("[^\\w_]", "_", label_name)
if label_name not in seen:
label = Label()
label.name = label_name
label.value = label_value
timeseries.labels.append(label)
seen.add(label_name)

add_label("__name__", name)
if labels:
for [label_name, label_value] in labels:
add_label(label_name, label_value)
if export_record.resource.attributes:
for (
label_name,
label_value,
) in export_record.resource.attributes.items():
add_label(label_name, label_value)
if export_record.labels:
for [label_name, label_value] in export_record.labels:
add_label(label_name, label_value)

def create_sample(self, timestamp: int, value: float) -> Sample:
sample = Sample()
sample.timestamp = int(timestamp / 1000000)
sample.timestamp = int(
export_record.aggregator.last_update_timestamp / 1000000
)
sample.value = value
return sample

def create_label(self, name: str, value: str) -> Label:
label = Label()
# Label name must contain only alphanumeric characters and underscores
label.name = re.sub("[^0-9a-zA-Z_]+", "_", name)
label.value = value
return label
timeseries.samples.append(sample)
return timeseries

def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
raise NotImplementedError()
Expand Down
Loading

0 comments on commit f971041

Please sign in to comment.