From c79a4c9a89547e02780d5bcf5f3ab2a699663323 Mon Sep 17 00:00:00 2001 From: Shovnik Bhattacharya Date: Tue, 24 Nov 2020 17:17:45 -0500 Subject: [PATCH] Added export request methods --- .../CHANGELOG.md | 2 + .../setup.cfg | 2 + .../prometheus_remote_write/__init__.py | 96 +++++++++++++++---- .../test_prometheus_remote_write_exporter.py | 55 ++++++++--- 4 files changed, 126 insertions(+), 29 deletions(-) diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md b/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md index 323ab2817d..65b0f6cdc5 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md @@ -7,3 +7,5 @@ ((#206)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/206]) - Add conversion to TimeSeries methods ((#207)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/207]) +- Add request methods + ((#212)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/212]) diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/setup.cfg b/exporter/opentelemetry-exporter-prometheus-remote-write/setup.cfg index 7d2869022b..bed525307d 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/setup.cfg +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/setup.cfg @@ -39,6 +39,8 @@ package_dir= =src packages=find_namespace: install_requires = + protobuf >= 3.13.0 + requests == 2.25.0 opentelemetry-api == 0.16.dev0 opentelemetry-sdk == 0.16.dev0 diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py index eadacd6e01..be1292f764 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py @@ -16,6 +16,9 @@ import re from typing import Dict, Sequence +import requests + +import snappy from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import ( WriteRequest, ) @@ -48,7 +51,7 @@ class PrometheusRemoteWriteMetricsExporter(MetricsExporter): endpoint: url where data will be sent (Required) basic_auth: username and password for authentication (Optional) headers: additional headers for remote write request (Optional) - timeout: timeout for requests to the remote write endpoint in seconds (Optional) + timeout: timeout for remote write requests in seconds, defaults to 30 (Optional) proxies: dict mapping request proxy protocols to proxy urls (Optional) tls_config: configuration for remote write TLS settings (Optional) """ @@ -96,15 +99,15 @@ def basic_auth(self, basic_auth: Dict): if basic_auth: if "username" not in basic_auth: raise ValueError("username required in basic_auth") - if ( - "password" not in basic_auth - and "password_file" not in basic_auth - ): + if "password_file" in basic_auth: + if "password" in basic_auth: + raise ValueError( + "basic_auth cannot contain password and password_file" + ) + with open(basic_auth["password_file"]) as file: + basic_auth["password"] = file.readline().strip() + elif "password" not in basic_auth: raise ValueError("password required in basic_auth") - if "password" in basic_auth and "password_file" in basic_auth: - raise ValueError( - "basic_auth cannot contain password and password_file" - ) self._basic_auth = basic_auth @property @@ -159,10 +162,20 @@ def headers(self, headers: Dict): def export( self, export_records: Sequence[ExportRecord] ) -> MetricsExportResult: - raise NotImplementedError() + if not export_records: + return MetricsExportResult.SUCCESS + timeseries = self._convert_to_timeseries(export_records) + if not timeseries: + logger.error( + "All records contain unsupported aggregators, export aborted" + ) + return MetricsExportResult.FAILURE + message = self._build_message(timeseries) + headers = self._build_headers() + return self._send_message(message, headers) def shutdown(self) -> None: - raise NotImplementedError() + pass def _convert_to_timeseries( self, export_records: Sequence[ExportRecord] @@ -304,13 +317,60 @@ def add_label(label_name: str, label_value: str): timeseries.samples.append(sample) return timeseries - def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes: - raise NotImplementedError() - - def get_headers(self) -> Dict: - raise NotImplementedError() + def _build_message(self, timeseries: Sequence[TimeSeries]) -> bytes: + write_request = WriteRequest() + write_request.timeseries.extend(timeseries) + serialized_message = write_request.SerializeToString() + return snappy.compress(serialized_message) + + def _build_headers(self) -> Dict: + headers = { + "Content-Encoding": "snappy", + "Content-Type": "application/x-protobuf", + "X-Prometheus-Remote-Write-Version": "0.1.0", + } + if self.headers: + for header_name, header_value in self.headers.items(): + headers[header_name] = header_value + return headers - def send_message( + def _send_message( self, message: bytes, headers: Dict ) -> MetricsExportResult: - raise NotImplementedError() + auth = None + if self.basic_auth: + auth = (self.basic_auth["username"], self.basic_auth["password"]) + + cert = None + verify = True + if self.tls_config: + if "ca_file" in self.tls_config: + verify = self.tls_config["ca_file"] + elif "insecure_skip_verify" in self.tls_config: + verify = self.tls_config["insecure_skip_verify"] + + if ( + "cert_file" in self.tls_config + and "key_file" in self.tls_config + ): + cert = ( + self.tls_config["cert_file"], + self.tls_config["key_file"], + ) + try: + response = requests.post( + self.endpoint, + data=message, + headers=headers, + auth=auth, + timeout=self.timeout, + proxies=self.proxies, + cert=cert, + verify=verify, + ) + if not response.ok: + response.raise_for_status() + except requests.exceptions.RequestException as e: + logger.error("Export POST request failed with reason: %s", e) + return MetricsExportResult.FAILURE + return MetricsExportResult.SUCCESS diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py index 1996f73417..4b4bb95820 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py @@ -360,22 +360,55 @@ def create_label(name, value): class TestExport(unittest.TestCase): # Initializes test data that is reused across tests def setUp(self): - pass + self.exporter = PrometheusRemoteWriteMetricsExporter( + endpoint="/prom/test_endpoint" + ) # Ensures export is successful with valid export_records and config - def test_export(self): - pass - - def test_valid_send_message(self): - pass + @patch("requests.post") + def test_valid_export(self, mock_post): + mock_post.return_value.configure_mock(**{"status_code": 200}) + test_metric = Counter("testname", "testdesc", "testunit", int, None) + labels = get_dict_as_key({"environment": "testing"}) + record = ExportRecord( + test_metric, labels, SumAggregator(), Resource({}) + ) + result = self.exporter.export([record]) + self.assertIs(result, MetricsExportResult.SUCCESS) + self.assertEqual(mock_post.call_count, 1) + + result = self.exporter.export([]) + self.assertIs(result, MetricsExportResult.SUCCESS) + + def test_invalid_export(self): + record = ExportRecord(None, None, None, None) + result = self.exporter.export([record]) + self.assertIs(result, MetricsExportResult.FAILURE) + + @patch("requests.post") + def test_valid_send_message(self, mock_post): + mock_post.return_value.configure_mock(**{"ok": True}) + result = self.exporter._send_message(bytes(), {}) + self.assertEqual(mock_post.call_count, 1) + self.assertEqual(result, MetricsExportResult.SUCCESS) def test_invalid_send_message(self): - pass + result = self.exporter._send_message(bytes(), {}) + self.assertEqual(result, MetricsExportResult.FAILURE) # Verifies that build_message calls snappy.compress and returns SerializedString - def test_build_message(self): - pass + @patch("snappy.compress", return_value=bytes()) + def test_build_message(self, mock_compress): + message = self.exporter._build_message([TimeSeries()]) + self.assertEqual(mock_compress.call_count, 1) + self.assertIsInstance(message, bytes) # Ensure correct headers are added when valid config is provided - def test_get_headers(self): - pass + def test_build_headers(self): + self.exporter.headers = {"Custom Header": "test_header"} + + headers = self.exporter._build_headers() + self.assertEqual(headers["Content-Encoding"], "snappy") + self.assertEqual(headers["Content-Type"], "application/x-protobuf") + self.assertEqual(headers["X-Prometheus-Remote-Write-Version"], "0.1.0") + self.assertEqual(headers["Custom Header"], "test_header")