Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for OTLP/HTTP log exporter #2462

Merged
merged 34 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e021891
WIP: otlp/htpp logging exporter
srikanthccv Feb 11, 2022
3a012d5
Add tests and fix bug
srikanthccv Feb 12, 2022
ebe6f66
Add CHANGELOG entry
srikanthccv Feb 13, 2022
3ac474c
Merge branch 'main' into issue-2446
srikanthccv Feb 13, 2022
e96d536
Fix lint
srikanthccv Feb 13, 2022
a97c18a
Merge branch 'issue-2446' of github.com:lonewolf3739/opentelemetry-py…
srikanthccv Feb 13, 2022
503acc4
Merge branch 'main' into issue-2446
srikanthccv Feb 17, 2022
912ae2f
Merge branch 'main' into issue-2446
srikanthccv Feb 18, 2022
c9f42ef
Merge branch 'main' into issue-2446
srikanthccv Feb 19, 2022
7fa922c
Resolve merge conflicts
srikanthccv Feb 24, 2022
26b26f2
Fix lint
srikanthccv Feb 24, 2022
b7af733
Merge branch 'issue-2446' of github.com:srikanthccv/opentelemetry-pyt…
srikanthccv Feb 24, 2022
8253b4f
Merge branch 'main' into issue-2446
srikanthccv Feb 25, 2022
1f1f324
Merge branch 'main' into issue-2446
srikanthccv Feb 26, 2022
b3fcb88
Merge branch 'main' into issue-2446
srikanthccv Mar 1, 2022
e033784
Merge branch 'main' into issue-2446
srikanthccv Apr 2, 2022
83953be
Update CHANGELOG
srikanthccv Apr 2, 2022
9ee2808
resolve merge conflicts
srikanthccv May 8, 2022
47f3a0a
Update with latest proto
srikanthccv May 8, 2022
428971b
Merge branch 'main' into issue-2446
srikanthccv May 18, 2022
7aec8c7
rearrange the changelog
srikanthccv May 18, 2022
8328f44
Merge branch 'main' into issue-2446
srikanthccv May 26, 2022
250e2db
Merge branch 'main' into issue-2446
srikanthccv May 26, 2022
3037c8a
Merge branch 'main' into issue-2446
srikanthccv May 31, 2022
5da5ad6
Merge branch 'main' into issue-2446
srikanthccv May 31, 2022
f18792c
Merge branch 'main' into issue-2446
srikanthccv Jun 2, 2022
66f155b
Merge branch 'main' into issue-2446
srikanthccv Jun 2, 2022
a8ae93e
Merge branch 'main' into issue-2446
srikanthccv Jun 8, 2022
fa81455
Append v1/logs for OTLP endpoint
srikanthccv Jun 10, 2022
5231c43
Merge branch 'issue-2446' of github.com:srikanthccv/opentelemetry-pyt…
srikanthccv Jun 10, 2022
bbc8324
Merge branch 'main' into issue-2446
srikanthccv Jun 10, 2022
85de539
Merge branch 'main' into issue-2446
srikanthccv Jun 11, 2022
624f17f
Merge branch 'main' into issue-2446
srikanthccv Jun 12, 2022
9078264
Fix lint
srikanthccv Jun 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.12.0rc1-0.31b0...HEAD)

- `opentelemetry-exporter-otlp-proto-http` Add support for OTLP/HTTP log exporter
([#2462](https://github.com/open-telemetry/opentelemetry-python/pull/2462))
- Fix yield of `None`-valued points
([#2745](https://github.com/open-telemetry/opentelemetry-python/pull/2745))
- Add missing `to_json` methods
Expand Down Expand Up @@ -115,7 +117,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
pages that have moved, see
[#2453](https://github.com/open-telemetry/opentelemetry-python/pull/2453), and
[#2498](https://github.com/open-telemetry/opentelemetry-python/pull/2498).
- `opentelemetry-exporter-otlp-grpc` update SDK dependency to ~1.9.
- `opentelemetry-exporter-otlp-proto-grpc` update SDK dependency to ~1.9.
([#2442](https://github.com/open-telemetry/opentelemetry-python/pull/2442))
- bugfix(auto-instrumentation): attach OTLPHandler to root logger
([#2450](https://github.com/open-telemetry/opentelemetry-python/pull/2450))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import gzip
import logging
import zlib
from io import BytesIO
from os import environ
from typing import Dict, Optional, Sequence
from time import sleep

import requests
from backoff import expo

from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_CERTIFICATE,
OTEL_EXPORTER_OTLP_COMPRESSION,
OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_HEADERS,
OTEL_EXPORTER_OTLP_TIMEOUT,
)
from opentelemetry.sdk._logs.export import (
LogExporter,
LogExportResult,
LogData,
)
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http._log_exporter.encoder import (
_ProtobufEncoder,
)
from opentelemetry.util.re import parse_headers


_logger = logging.getLogger(__name__)


DEFAULT_COMPRESSION = Compression.NoCompression
DEFAULT_ENDPOINT = "http://localhost:4318/"
DEFAULT_LOGS_EXPORT_PATH = "v1/logs"
DEFAULT_TIMEOUT = 10 # in seconds


class OTLPLogExporter(LogExporter):
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved

_MAX_RETRY_TIMEOUT = 64

def __init__(
self,
endpoint: Optional[str] = None,
certificate_file: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
compression: Optional[Compression] = None,
):
self._endpoint = endpoint or _append_logs_path(
environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT)
)
self._certificate_file = certificate_file or environ.get(
OTEL_EXPORTER_OTLP_CERTIFICATE, True
)
headers_string = environ.get(OTEL_EXPORTER_OTLP_HEADERS, "")
self._headers = headers or parse_headers(headers_string)
self._timeout = timeout or int(
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT)
)
self._compression = compression or _compression_from_env()
self._session = requests.Session()
self._session.headers.update(self._headers)
self._session.headers.update(
{"Content-Type": _ProtobufEncoder._CONTENT_TYPE}
)
if self._compression is not Compression.NoCompression:
self._session.headers.update(
{"Content-Encoding": self._compression.value}
)
self._shutdown = False

def _export(self, serialized_data: str):
data = serialized_data
if self._compression == Compression.Gzip:
gzip_data = BytesIO()
with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream:
gzip_stream.write(serialized_data)
data = gzip_data.getvalue()
elif self._compression == Compression.Deflate:
data = zlib.compress(bytes(serialized_data))

return self._session.post(
url=self._endpoint,
data=data,
verify=self._certificate_file,
timeout=self._timeout,
)

@staticmethod
def _retryable(resp: requests.Response) -> bool:
if resp.status_code == 408:
return True
if resp.status_code >= 500 and resp.status_code <= 599:
return True
return False

def export(self, batch: Sequence[LogData]) -> LogExportResult:
# After the call to Shutdown subsequent calls to Export are
# not allowed and should return a Failure result.
if self._shutdown:
_logger.warning("Exporter already shutdown, ignoring batch")
return LogExportResult.FAILURE

serialized_data = _ProtobufEncoder.serialize(batch)

for delay in expo(max_value=self._MAX_RETRY_TIMEOUT):

if delay == self._MAX_RETRY_TIMEOUT:
return LogExportResult.FAILURE

resp = self._export(serialized_data)
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
# pylint: disable=no-else-return
if resp.status_code in (200, 202):
return LogExportResult.SUCCESS
elif self._retryable(resp):
_logger.warning(
"Transient error %s encountered while exporting logs batch, retrying in %ss.",
resp.reason,
delay,
)
sleep(delay)
continue
else:
_logger.error(
"Failed to export logs batch code: %s, reason: %s",
resp.status_code,
resp.text,
)
return LogExportResult.FAILURE
return LogExportResult.FAILURE

def shutdown(self):
if self._shutdown:
_logger.warning("Exporter already shutdown, ignoring call")
return
self._session.close()
self._shutdown = True


def _compression_from_env() -> Compression:
compression = (
environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none").lower().strip()
)
return Compression(compression)


def _append_logs_path(endpoint: str) -> str:
if endpoint.endswith("/"):
return endpoint + DEFAULT_LOGS_EXPORT_PATH
return endpoint + f"/{DEFAULT_LOGS_EXPORT_PATH}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Sequence, List

from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import (
ExportLogsServiceRequest,
)
from opentelemetry.proto.logs.v1.logs_pb2 import (
ScopeLogs,
ResourceLogs,
)
from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord
from opentelemetry.exporter.otlp.proto.http.trace_exporter.encoder import (
_encode_instrumentation_scope,
_encode_resource,
_encode_span_id,
_encode_trace_id,
_encode_value,
_encode_attributes,
)


from opentelemetry.sdk._logs.export import LogData


class _ProtobufEncoder:
_CONTENT_TYPE = "application/x-protobuf"

@classmethod
def serialize(cls, batch: Sequence[LogData]) -> str:
return cls.encode(batch).SerializeToString()

@staticmethod
def encode(batch: Sequence[LogData]) -> ExportLogsServiceRequest:
return ExportLogsServiceRequest(
resource_logs=_encode_resource_logs(batch)
)


def _encode_log(log_data: LogData) -> PB2LogRecord:
kwargs = {}
kwargs["time_unix_nano"] = log_data.log_record.timestamp
kwargs["span_id"] = _encode_span_id(log_data.log_record.span_id)
kwargs["trace_id"] = _encode_trace_id(log_data.log_record.trace_id)
kwargs["flags"] = int(log_data.log_record.trace_flags)
kwargs["body"] = _encode_value(log_data.log_record.body)
kwargs["severity_text"] = log_data.log_record.severity_text
kwargs["attributes"] = _encode_attributes(log_data.log_record.attributes)
kwargs["severity_number"] = log_data.log_record.severity_number.value

return PB2LogRecord(**kwargs)


def _encode_resource_logs(batch: Sequence[LogData]) -> List[ResourceLogs]:
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved

sdk_resource_logs = {}

for sdk_log in batch:
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
sdk_resource = sdk_log.log_record.resource
sdk_instrumentation = sdk_log.instrumentation_scope or None
pb2_log = _encode_log(sdk_log)

if sdk_resource not in sdk_resource_logs.keys():
sdk_resource_logs[sdk_resource] = {sdk_instrumentation: [pb2_log]}
elif sdk_instrumentation not in sdk_resource_logs[sdk_resource].keys():
sdk_resource_logs[sdk_resource][sdk_instrumentation] = [pb2_log]
else:
sdk_resource_logs[sdk_resource][sdk_instrumentation].append(
pb2_log
)

pb2_resource_logs = []

for sdk_resource, sdk_instrumentations in sdk_resource_logs.items():
scope_logs = []
for sdk_instrumentation, pb2_logs in sdk_instrumentations.items():
scope_logs.append(
ScopeLogs(
scope=(_encode_instrumentation_scope(sdk_instrumentation)),
log_records=pb2_logs,
)
)
pb2_resource_logs.append(
ResourceLogs(
resource=_encode_resource(sdk_resource),
scope_logs=scope_logs,
)
)

return pb2_resource_logs
Loading