Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Prometheus Remote Write Exporter (4/6) #212

Merged
merged 1 commit into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@
((#206)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/206])
- Add conversion to TimeSeries methods
((#207)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/207])
- Add request methods
((#212)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/212])
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ package_dir=
=src
packages=find_namespace:
install_requires =
protobuf >= 3.13.0
requests == 2.25.0
opentelemetry-api == 0.17.dev0
opentelemetry-sdk == 0.17.dev0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import re
from typing import Dict, Sequence

import requests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should dependencies be added for python-snappy and requests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue that I am running into is that I cannot add python-snappy to the setup.cfg because it depends on the snappy library in C which I had to brew install to get. Is there a way to add a C dependency that a module requires? If not I might have to specify to first manually install snappy library and module before using exporter in README.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excluded python-snappy from install-requires for now.


import snappy
from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import (
WriteRequest,
)
Expand Down Expand Up @@ -48,7 +51,7 @@ class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
endpoint: url where data will be sent (Required)
basic_auth: username and password for authentication (Optional)
headers: additional headers for remote write request (Optional)
timeout: timeout for requests to the remote write endpoint in seconds (Optional)
timeout: timeout for remote write requests in seconds, defaults to 30 (Optional)
proxies: dict mapping request proxy protocols to proxy urls (Optional)
tls_config: configuration for remote write TLS settings (Optional)
"""
Expand Down Expand Up @@ -96,15 +99,15 @@ def basic_auth(self, basic_auth: Dict):
if basic_auth:
if "username" not in basic_auth:
raise ValueError("username required in basic_auth")
if (
"password" not in basic_auth
and "password_file" not in basic_auth
):
if "password_file" in basic_auth:
if "password" in basic_auth:
raise ValueError(
"basic_auth cannot contain password and password_file"
)
with open(basic_auth["password_file"]) as file:
basic_auth["password"] = file.readline().strip()
elif "password" not in basic_auth:
raise ValueError("password required in basic_auth")
if "password" in basic_auth and "password_file" in basic_auth:
raise ValueError(
"basic_auth cannot contain password and password_file"
)
self._basic_auth = basic_auth

@property
Expand Down Expand Up @@ -159,10 +162,20 @@ def headers(self, headers: Dict):
def export(
self, export_records: Sequence[ExportRecord]
) -> MetricsExportResult:
raise NotImplementedError()
if not export_records:
return MetricsExportResult.SUCCESS
timeseries = self._convert_to_timeseries(export_records)
if not timeseries:
logger.error(
"All records contain unsupported aggregators, export aborted"
)
return MetricsExportResult.FAILURE
lzchen marked this conversation as resolved.
Show resolved Hide resolved
message = self._build_message(timeseries)
headers = self._build_headers()
return self._send_message(message, headers)

def shutdown(self) -> None:
raise NotImplementedError()
pass

def _convert_to_timeseries(
self, export_records: Sequence[ExportRecord]
Expand Down Expand Up @@ -304,13 +317,60 @@ def add_label(label_name: str, label_value: str):
timeseries.samples.append(sample)
return timeseries

def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
raise NotImplementedError()

def get_headers(self) -> Dict:
raise NotImplementedError()
def _build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
write_request = WriteRequest()
write_request.timeseries.extend(timeseries)
serialized_message = write_request.SerializeToString()
return snappy.compress(serialized_message)

def _build_headers(self) -> Dict:
headers = {
"Content-Encoding": "snappy",
"Content-Type": "application/x-protobuf",
"X-Prometheus-Remote-Write-Version": "0.1.0",
}
if self.headers:
for header_name, header_value in self.headers.items():
headers[header_name] = header_value
return headers

def send_message(
def _send_message(
self, message: bytes, headers: Dict
) -> MetricsExportResult:
raise NotImplementedError()
auth = None
if self.basic_auth:
auth = (self.basic_auth["username"], self.basic_auth["password"])

cert = None
verify = True
if self.tls_config:
if "ca_file" in self.tls_config:
verify = self.tls_config["ca_file"]
elif "insecure_skip_verify" in self.tls_config:
verify = self.tls_config["insecure_skip_verify"]

if (
"cert_file" in self.tls_config
and "key_file" in self.tls_config
):
cert = (
self.tls_config["cert_file"],
self.tls_config["key_file"],
)
try:
response = requests.post(
self.endpoint,
data=message,
headers=headers,
auth=auth,
timeout=self.timeout,
proxies=self.proxies,
cert=cert,
verify=verify,
)
if not response.ok:
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error("Export POST request failed with reason: %s", e)
return MetricsExportResult.FAILURE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to be handling retries? Timeouts?, etc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout can be adjusted, but if a request fails (timeout or otherwise), we just report failure instead of retrying to avoid requests getting clogged due to records being exported continuously. If we set a timeout, but then retried multiple times, the timeout would no longer limit the time spent attempting to export one set of records. Our reasoning was that missing data is not as bad as delayed data especially in cases of cumulative data where missing data can be interpolated or cases where data is exported on a very short interval. Does not retrying failed requests sound reasonable for this use case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends on how robust we want the functionality of this exporter. We can handle it in a different PR in the future if is needed.

return MetricsExportResult.SUCCESS
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "0.16.dev0"
__version__ = "0.17.dev0"
Original file line number Diff line number Diff line change
Expand Up @@ -360,22 +360,55 @@ def create_label(name, value):
class TestExport(unittest.TestCase):
# Initializes test data that is reused across tests
def setUp(self):
pass
self.exporter = PrometheusRemoteWriteMetricsExporter(
endpoint="/prom/test_endpoint"
)

# Ensures export is successful with valid export_records and config
def test_export(self):
pass

def test_valid_send_message(self):
pass
@patch("requests.post")
def test_valid_export(self, mock_post):
mock_post.return_value.configure_mock(**{"status_code": 200})
test_metric = Counter("testname", "testdesc", "testunit", int, None)
labels = get_dict_as_key({"environment": "testing"})
record = ExportRecord(
test_metric, labels, SumAggregator(), Resource({})
)
result = self.exporter.export([record])
self.assertIs(result, MetricsExportResult.SUCCESS)
self.assertEqual(mock_post.call_count, 1)

result = self.exporter.export([])
self.assertIs(result, MetricsExportResult.SUCCESS)

def test_invalid_export(self):
record = ExportRecord(None, None, None, None)
result = self.exporter.export([record])
self.assertIs(result, MetricsExportResult.FAILURE)

@patch("requests.post")
def test_valid_send_message(self, mock_post):
mock_post.return_value.configure_mock(**{"ok": True})
result = self.exporter._send_message(bytes(), {})
self.assertEqual(mock_post.call_count, 1)
self.assertEqual(result, MetricsExportResult.SUCCESS)

def test_invalid_send_message(self):
pass
result = self.exporter._send_message(bytes(), {})
self.assertEqual(result, MetricsExportResult.FAILURE)

# Verifies that build_message calls snappy.compress and returns SerializedString
def test_build_message(self):
pass
@patch("snappy.compress", return_value=bytes())
def test_build_message(self, mock_compress):
message = self.exporter._build_message([TimeSeries()])
self.assertEqual(mock_compress.call_count, 1)
self.assertIsInstance(message, bytes)

# Ensure correct headers are added when valid config is provided
def test_get_headers(self):
pass
def test_build_headers(self):
self.exporter.headers = {"Custom Header": "test_header"}

headers = self.exporter._build_headers()
self.assertEqual(headers["Content-Encoding"], "snappy")
self.assertEqual(headers["Content-Type"], "application/x-protobuf")
self.assertEqual(headers["X-Prometheus-Remote-Write-Version"], "0.1.0")
self.assertEqual(headers["Custom Header"], "test_header")