diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md b/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md index 323ab2817d..65b0f6cdc5 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md @@ -7,3 +7,5 @@ ((#206)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/206]) - Add conversion to TimeSeries methods ((#207)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/207]) +- Add request methods + ((#212)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/212]) diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py index eadacd6e01..de6f14bcc6 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py @@ -16,6 +16,9 @@ import re from typing import Dict, Sequence +import requests + +import snappy from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import ( WriteRequest, ) @@ -96,15 +99,15 @@ def basic_auth(self, basic_auth: Dict): if basic_auth: if "username" not in basic_auth: raise ValueError("username required in basic_auth") - if ( - "password" not in basic_auth - and "password_file" not in basic_auth - ): + if "password_file" in basic_auth: + if "password" in basic_auth: + raise ValueError( + "basic_auth cannot contain password and password_file" + ) + with open(basic_auth["password_file"]) as file: + basic_auth["password"] = file.readline().strip() + elif "password" not in basic_auth: raise ValueError("password required in basic_auth") - if "password" in basic_auth and "password_file" in basic_auth: - raise ValueError( - "basic_auth cannot contain password and password_file" - ) self._basic_auth = basic_auth @property @@ -159,10 +162,16 @@ def headers(self, headers: Dict): def export( self, export_records: Sequence[ExportRecord] ) -> MetricsExportResult: - raise NotImplementedError() + timeseries = self._convert_to_timeseries(export_records) + if not timeseries: + logger.warning("No valid records found, export aborted") + return MetricsExportResult.FAILURE + message = self._build_message(timeseries) + headers = self._build_headers() + return self._send_message(message, headers) def shutdown(self) -> None: - raise NotImplementedError() + pass def _convert_to_timeseries( self, export_records: Sequence[ExportRecord] @@ -304,13 +313,62 @@ def add_label(label_name: str, label_value: str): timeseries.samples.append(sample) return timeseries - def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes: - raise NotImplementedError() - - def get_headers(self) -> Dict: - raise NotImplementedError() + def _build_message(self, timeseries: Sequence[TimeSeries]) -> bytes: + write_request = WriteRequest() + write_request.timeseries.extend(timeseries) + serialized_message = write_request.SerializeToString() + return snappy.compress(serialized_message) + + def _build_headers(self) -> Dict: + headers = { + "Content-Encoding": "snappy", + "Content-Type": "application/x-protobuf", + "X-Prometheus-Remote-Write-Version": "0.1.0", + } + if self.headers: + for header_name, header_value in self.headers.items(): + headers[header_name] = header_value + return headers - def send_message( + def _send_message( self, message: bytes, headers: Dict ) -> MetricsExportResult: - raise NotImplementedError() + auth = None + if self.basic_auth: + auth = (self.basic_auth["username"], self.basic_auth["password"]) + + cert = None + verify = True + if self.tls_config: + if "ca_file" in self.tls_config: + verify = self.tls_config["ca_file"] + elif "insecure_skip_verify" in self.tls_config: + verify = self.tls_config["insecure_skip_verify"] + + if ( + "cert_file" in self.tls_config + and "key_file" in self.tls_config + ): + cert = ( + self.tls_config["cert_file"], + self.tls_config["key_file"], + ) + response = requests.post( + self.endpoint, + data=message, + headers=headers, + auth=auth, + timeout=self.timeout, + proxies=self.proxies, + cert=cert, + verify=verify, + ) + if response.status_code != 200: + logger.warning( + "POST request failed with status %s with reason: %s and content: %s", + str(response.status_code), + response.reason, + str(response.content), + ) + return MetricsExportResult.FAILURE + return MetricsExportResult.SUCCESS diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py index 1996f73417..819ee46591 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py @@ -360,22 +360,61 @@ def create_label(name, value): class TestExport(unittest.TestCase): # Initializes test data that is reused across tests def setUp(self): - pass + self.exporter = PrometheusRemoteWriteMetricsExporter( + endpoint="/prom/test_endpoint" + ) # Ensures export is successful with valid export_records and config - def test_export(self): - pass - - def test_valid_send_message(self): - pass - - def test_invalid_send_message(self): - pass + @patch("requests.post") + def test_valid_export(self, mock_post): + mock_post.return_value.configure_mock(**{"status_code": 200}) + test_metric = Counter("testname", "testdesc", "testunit", int, None) + labels = get_dict_as_key({"environment": "testing"}) + record = ExportRecord( + test_metric, labels, SumAggregator(), Resource({}) + ) + result = self.exporter.export([record]) + self.assertIs(result, MetricsExportResult.SUCCESS) + self.assertEqual(mock_post.call_count, 1) + + def test_invalid_export(self): + record = ExportRecord(None, None, None, None) + result = self.exporter.export([record]) + self.assertIs(result, MetricsExportResult.FAILURE) + + @patch("requests.post") + def test_valid_send_message(self, mock_post): + mock_post.return_value.configure_mock(**{"status_code": 200}) + result = self.exporter._send_message(bytes(), {}) + self.assertEqual(mock_post.call_count, 1) + self.assertEqual(result, MetricsExportResult.SUCCESS) + + @patch("requests.post") + def test_invalid_send_message(self, mock_post): + mock_post.return_value.configure_mock( + **{ + "status_code": 404, + "reason": "test_reason", + "content": "test_content", + } + ) + result = self.exporter._send_message(bytes(), {}) + self.assertEqual(mock_post.call_count, 1) + self.assertEqual(result, MetricsExportResult.FAILURE) # Verifies that build_message calls snappy.compress and returns SerializedString - def test_build_message(self): - pass + @patch("snappy.compress", return_value=bytes()) + def test_build_message(self, mock_compress): + message = self.exporter._build_message([TimeSeries()]) + self.assertEqual(mock_compress.call_count, 1) + self.assertIsInstance(message, bytes) # Ensure correct headers are added when valid config is provided - def test_get_headers(self): - pass + def test_build_headers(self): + self.exporter.headers = {"Custom Header": "test_header"} + + headers = self.exporter._build_headers() + self.assertEqual(headers["Content-Encoding"], "snappy") + self.assertEqual(headers["Content-Type"], "application/x-protobuf") + self.assertEqual(headers["X-Prometheus-Remote-Write-Version"], "0.1.0") + self.assertEqual(headers["Custom Header"], "test_header")