Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added request methods
Browse files Browse the repository at this point in the history
shovnik committed Nov 24, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent e9e348d commit ac5f46f
Showing 3 changed files with 122 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -9,3 +9,5 @@
((#206)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/206])
- Add conversion to TimeSeries methods
((#207)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/207])
- Add request methods
((#212)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/212])
Original file line number Diff line number Diff line change
@@ -12,9 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import re
from typing import Dict, Sequence

import requests

import snappy
from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import (
WriteRequest,
)
@@ -36,6 +40,8 @@
ValueObserverAggregator,
)

logger = logging.getLogger(__name__)


class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
"""
@@ -148,10 +154,13 @@ def headers(self, headers: Dict):
def export(
self, export_records: Sequence[ExportRecord]
) -> MetricsExportResult:
raise NotImplementedError()
timeseries = self.convert_to_timeseries(export_records)
message = self.build_message(timeseries)
headers = self.get_headers()
return self.send_message(message, headers)

def shutdown(self) -> None:
raise NotImplementedError()
pass

def convert_to_timeseries(
self, export_records: Sequence[ExportRecord]
@@ -280,12 +289,66 @@ def create_label(self, name: str, value: str) -> Label:
return label

def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
raise NotImplementedError()
write_request = WriteRequest()
write_request.timeseries.extend(timeseries)
serialized_message = write_request.SerializeToString()
return snappy.compress(serialized_message)

def get_headers(self) -> Dict:
raise NotImplementedError()
headers = {
"Content-Encoding": "snappy",
"Content-Type": "application/x-protobuf",
"X-Prometheus-Remote-Write-Version": "0.1.0",
}
if self.headers:
for header_name, header_value in self.headers.items():
headers[header_name] = header_value
return headers

def send_message(
self, message: bytes, headers: Dict
) -> MetricsExportResult:
raise NotImplementedError()
auth = None
if self.basic_auth:
basic_auth = self.basic_auth
if "password" in basic_auth:
auth = (basic_auth.username, basic_auth.password)
else:
with open(basic_auth.password_file) as file:
auth = (basic_auth.username, file.readline())

cert = None
verify = True
if self.tls_config:
if "ca_file" in self.tls_config:
verify = self.tls_config["ca_file"]
elif "insecure_skip_verify" in self.tls_config:
verify = self.tls_config["insecure_skip_verify"]

if (
"cert_file" in self.tls_config
and "key_file" in self.tls_config
):
cert = (
self.tls_config["cert_file"],
self.tls_config["key_file"],
)
response = requests.post(
self.endpoint,
data=message,
headers=headers,
auth=auth,
timeout=self.timeout,
proxies=self.proxies,
cert=cert,
verify=verify,
)
if response.status_code != 200:
logger.warning(
"POST request failed with status %s with reason: %s and content: %s",
str(response.status_code),
response.reason,
str(response.content),
)
return MetricsExportResult.FAILURE
return MetricsExportResult.SUCCESS
Original file line number Diff line number Diff line change
@@ -313,25 +313,65 @@ def test_create_timeseries(self):
self.assertEqual(timeseries, expected_timeseries)


class ResponseStub:
def __init__(self, status_code):
self.status_code = status_code
self.reason = "dummy_reason"
self.content = "dummy_content"


class TestExport(unittest.TestCase):
# Initializes test data that is reused across tests
def setUp(self):
pass
self._exporter = PrometheusRemoteWriteMetricsExporter(
endpoint="/prom/test_endpoint"
)

# Ensures export is successful with valid export_records and config
def test_export(self):
pass

def test_valid_send_message(self):
pass

def test_invalid_send_message(self):
pass
@mock.patch("requests.post", return_value=ResponseStub(200))
def test_export(self, mock_post):
test_metric = Counter("testname", "testdesc", "testunit", int, None)
labels = {"environment": "testing"}
record = ExportRecord(
test_metric, labels, SumAggregator(), Resource({}),
)
result = self._exporter.export([record])
self.assertIs(result, MetricsExportResult.SUCCESS)
self.assertEqual(mock_post.call_count, 1)

@mock.patch("requests.post", return_value=ResponseStub(200))
def test_valid_send_message(self, mock_post):
result = self._exporter.send_message(bytes(), {})
self.assertEqual(mock_post.call_count, 1)
self.assertEqual(result, MetricsExportResult.SUCCESS)

@mock.patch("requests.post", return_value=ResponseStub(404))
def test_invalid_send_message(self, mock_post):
result = self._exporter.send_message(bytes(), {})
self.assertEqual(mock_post.call_count, 1)
self.assertEqual(result, MetricsExportResult.FAILURE)

# Verifies that build_message calls snappy.compress and returns SerializedString
def test_build_message(self):
pass
@mock.patch("snappy.compress", return_value=bytes())
def test_build_message(self, mock_compress):
test_timeseries = [
TimeSeries(),
TimeSeries(),
]
message = self._exporter.build_message(test_timeseries)
self.assertEqual(mock_compress.call_count, 1)
self.assertIsInstance(message, bytes)

# Ensure correct headers are added when valid config is provided
def test_get_headers(self):
pass
self._exporter.headers = {"Custom Header": "test_header"}

headers = self._exporter.get_headers()
self.assertEqual(headers.get("Content-Encoding", ""), "snappy")
self.assertEqual(
headers.get("Content-Type", ""), "application/x-protobuf"
)
self.assertEqual(
headers.get("X-Prometheus-Remote-Write-Version", ""), "0.1.0"
)
self.assertEqual(headers.get("Custom Header", ""), "test_header")

0 comments on commit ac5f46f

Please sign in to comment.