Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for OTEL_EXPORTER_JAEGER_TIMEOUT #1863

Merged
merged 16 commits into from
Jun 4, 2021
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1829](https://github.com/open-telemetry/opentelemetry-python/pull/1829))
- Lazily read/configure limits and allow limits to be unset.
([#1839](https://github.com/open-telemetry/opentelemetry-python/pull/1839))
- Added support for OTEL_EXPORTER_JAEGER_TIMEOUT
([#1863](https://github.com/open-telemetry/opentelemetry-python/pull/1863))

### Changed
- Fixed OTLP gRPC exporter silently failing if scheme is not specified in endpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

- :envvar:`OTEL_EXPORTER_JAEGER_ENDPOINT`
- :envvar:`OTEL_EXPORTER_JAEGER_CERTIFICATE`
- :envvar:`OTEL_EXPORTER_JAEGER_TIMEOUT`

API
---
Expand All @@ -68,7 +69,7 @@
from os import environ
from typing import Optional

from grpc import ChannelCredentials, insecure_channel, secure_channel
from grpc import ChannelCredentials, RpcError, insecure_channel, secure_channel

from opentelemetry import trace
from opentelemetry.exporter.jaeger.proto.grpc import util
Expand All @@ -85,11 +86,13 @@
)
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_JAEGER_ENDPOINT,
OTEL_EXPORTER_JAEGER_TIMEOUT,
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult

DEFAULT_GRPC_COLLECTOR_ENDPOINT = "localhost:14250"
DEFAULT_EXPORT_TIMEOUT = 10

logger = logging.getLogger(__name__)

Expand All @@ -103,6 +106,7 @@ class JaegerExporter(SpanExporter):
insecure: True if collector has no encryption or authentication
credentials: Credentials for server authentication.
max_tag_value_length: Max length string attribute values can have. Set to None to disable.
timeout: Maximum time the Jaeger exporter should wait for each batch export.
"""

def __init__(
Expand All @@ -111,13 +115,15 @@ def __init__(
insecure: Optional[bool] = None,
credentials: Optional[ChannelCredentials] = None,
max_tag_value_length: Optional[int] = None,
timeout: Optional[int] = None,
):
self._max_tag_value_length = max_tag_value_length

self.collector_endpoint = _parameter_setter(
param=collector_endpoint,
env_variable=environ.get(OTEL_EXPORTER_JAEGER_ENDPOINT),
default=None,
self.collector_endpoint = collector_endpoint or environ.get(
OTEL_EXPORTER_JAEGER_ENDPOINT, DEFAULT_GRPC_COLLECTOR_ENDPOINT
)
self._timeout = timeout or int(
environ.get(OTEL_EXPORTER_JAEGER_TIMEOUT, DEFAULT_EXPORT_TIMEOUT)
)
self._grpc_client = None
self.insecure = insecure
Expand All @@ -131,16 +137,15 @@ def __init__(

@property
def _collector_grpc_client(self) -> Optional[CollectorServiceStub]:
endpoint = self.collector_endpoint or DEFAULT_GRPC_COLLECTOR_ENDPOINT

if self._grpc_client is None:
if self.insecure:
self._grpc_client = CollectorServiceStub(
insecure_channel(endpoint)
insecure_channel(self.collector_endpoint)
)
else:
self._grpc_client = CollectorServiceStub(
secure_channel(endpoint, self.credentials)
secure_channel(self.collector_endpoint, self.credentials)
)
return self._grpc_client

Expand All @@ -161,25 +166,16 @@ def export(self, spans) -> SpanExportResult:
jaeger_spans = translator._translate(pb_translator)
batch = model_pb2.Batch(spans=jaeger_spans)
request = PostSpansRequest(batch=batch)
self._collector_grpc_client.PostSpans(request)

return SpanExportResult.SUCCESS
try:
self._collector_grpc_client.PostSpans(
request, timeout=self._timeout
)
return SpanExportResult.SUCCESS
except RpcError as error:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were we incorrectly not catching this exception before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, We werent' doing anything earlier so this was being handled in processor.

logger.warning(
"Failed to export batch. Status code: %s", error.code()
)
return SpanExportResult.FAILURE

def shutdown(self):
pass


def _parameter_setter(param, env_variable, default):
"""Returns value according to the provided data.

Args:
param: Constructor parameter value
env_variable: Environment variable related to the parameter
default: Constructor parameter default value
"""
if param is None:
res = env_variable or default
else:
res = param

return res
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_JAEGER_CERTIFICATE,
OTEL_EXPORTER_JAEGER_ENDPOINT,
OTEL_EXPORTER_JAEGER_TIMEOUT,
OTEL_RESOURCE_ATTRIBUTES,
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
Expand Down Expand Up @@ -71,6 +72,7 @@ def test_constructor_by_environment_variables(self):
OTEL_EXPORTER_JAEGER_CERTIFICATE: os.path.dirname(__file__)
+ "/certs/cred.cert",
OTEL_RESOURCE_ATTRIBUTES: "service.name=my-opentelemetry-jaeger",
OTEL_EXPORTER_JAEGER_TIMEOUT: "5",
},
)

Expand All @@ -81,6 +83,7 @@ def test_constructor_by_environment_variables(self):
self.assertEqual(exporter.service_name, service)
self.assertIsNotNone(exporter._collector_grpc_client)
self.assertEqual(exporter.collector_endpoint, collector_endpoint)
self.assertEqual(exporter._timeout, 5)
self.assertIsNotNone(exporter.credentials)
env_patch.stop()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
- :envvar:`OTEL_EXPORTER_JAEGER_AGENT_PORT`
- :envvar:`OTEL_EXPORTER_JAEGER_AGENT_HOST`
- :envvar:`OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES`
- :envvar:`OTEL_EXPORTER_JAEGER_TIMEOUT`

API
---
Expand Down Expand Up @@ -94,13 +95,15 @@
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES,
OTEL_EXPORTER_JAEGER_ENDPOINT,
OTEL_EXPORTER_JAEGER_PASSWORD,
OTEL_EXPORTER_JAEGER_TIMEOUT,
OTEL_EXPORTER_JAEGER_USER,
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult

DEFAULT_AGENT_HOST_NAME = "localhost"
DEFAULT_AGENT_PORT = 6831
DEFAULT_EXPORT_TIMEOUT = 10

logger = logging.getLogger(__name__)

Expand All @@ -119,6 +122,7 @@ class JaegerExporter(SpanExporter):
required.
max_tag_value_length: Max length string attribute values can have. Set to None to disable.
udp_split_oversized_batches: Re-emit oversized batches in smaller chunks.
timeout: Maximum time the Jaeger exporter should wait for each batch export.
"""

def __init__(
Expand All @@ -130,51 +134,34 @@ def __init__(
password: Optional[str] = None,
max_tag_value_length: Optional[int] = None,
udp_split_oversized_batches: bool = None,
timeout: Optional[int] = None,
):
self._max_tag_value_length = max_tag_value_length
self.agent_host_name = _parameter_setter(
param=agent_host_name,
env_variable=environ.get(OTEL_EXPORTER_JAEGER_AGENT_HOST),
default=DEFAULT_AGENT_HOST_NAME,
self.agent_host_name = agent_host_name or environ.get(
OTEL_EXPORTER_JAEGER_AGENT_HOST, DEFAULT_AGENT_HOST_NAME
)

environ_agent_port = environ.get(OTEL_EXPORTER_JAEGER_AGENT_PORT)
environ_agent_port = (
int(environ_agent_port) if environ_agent_port is not None else None
self.agent_port = agent_port or int(
environ.get(OTEL_EXPORTER_JAEGER_AGENT_PORT, DEFAULT_AGENT_PORT)
)

self.agent_port = _parameter_setter(
param=agent_port,
env_variable=environ_agent_port,
default=DEFAULT_AGENT_PORT,
self._timeout = timeout or int(
environ.get(OTEL_EXPORTER_JAEGER_TIMEOUT, DEFAULT_EXPORT_TIMEOUT)
)
self.udp_split_oversized_batches = _parameter_setter(
param=udp_split_oversized_batches,
env_variable=environ.get(
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES
),
default=False,

self.udp_split_oversized_batches = udp_split_oversized_batches or bool(
environ.get(OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES)
)
self._agent_client = AgentClientUDP(
host_name=self.agent_host_name,
port=self.agent_port,
split_oversized_batches=self.udp_split_oversized_batches,
)
self.collector_endpoint = _parameter_setter(
param=collector_endpoint,
env_variable=environ.get(OTEL_EXPORTER_JAEGER_ENDPOINT),
default=None,
)
self.username = _parameter_setter(
param=username,
env_variable=environ.get(OTEL_EXPORTER_JAEGER_USER),
default=None,
)
self.password = _parameter_setter(
param=password,
env_variable=environ.get(OTEL_EXPORTER_JAEGER_PASSWORD),
default=None,
self.collector_endpoint = collector_endpoint or environ.get(
OTEL_EXPORTER_JAEGER_ENDPOINT
)
self.username = username or environ.get(OTEL_EXPORTER_JAEGER_USER)
self.password = password or environ.get(OTEL_EXPORTER_JAEGER_PASSWORD)
self._collector = None
tracer_provider = trace.get_tracer_provider()
self.service_name = (
Expand All @@ -195,8 +182,12 @@ def _collector_http_client(self) -> Optional[Collector]:
if self.username is not None and self.password is not None:
auth = (self.username, self.password)

# Thrift HTTP Client expects timeout in millis
timeout_in_millis = self._timeout * 1000.0
self._collector = Collector(
thrift_url=self.collector_endpoint, auth=auth
thrift_url=self.collector_endpoint,
auth=auth,
timeout_in_millis=timeout_in_millis,
)
return self._collector

Expand Down Expand Up @@ -226,19 +217,3 @@ def export(self, spans) -> SpanExportResult:

def shutdown(self):
pass


def _parameter_setter(param, env_variable, default):
"""Returns value according to the provided data.

Args:
param: Constructor parameter value
env_variable: Environment variable related to the parameter
default: Constructor parameter default value
"""
if param is None:
res = env_variable or default
else:
res = param

return res
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,29 @@ def emit(self, batch: jaeger.Batch):


class Collector:
"""Submits collected spans to Thrift HTTP server.
"""Submits collected spans to Jaeger collector in jaeger.thrift
format over binary thrift protocol. This is recommend option in cases where
it is not feasible to deploy Jaeger Agent next to the application,
for example, when the application code is running as AWS Lambda function.
In these scenarios the Jaeger Clients can be configured to submit spans directly
to the Collectors over HTTP/HTTPS.

Args:
thrift_url: URL of the Jaeger HTTP Thrift.
thrift_url: Endpoint used to send spans
directly to Collector the over HTTP.
auth: Auth tuple that contains username and password for Basic Auth.
timeout_in_millis: timeout for THttpClient.
"""

def __init__(self, thrift_url="", auth=None):
def __init__(self, thrift_url="", auth=None, timeout_in_millis=None):
self.thrift_url = thrift_url
self.auth = auth
self.http_transport = THttpClient.THttpClient(
uri_or_host=self.thrift_url
)
if timeout_in_millis is not None:
self.http_transport.setTimeout(timeout_in_millis)

self.protocol = TBinaryProtocol.TBinaryProtocol(self.http_transport)

# set basic auth header
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
OTEL_EXPORTER_JAEGER_AGENT_PORT,
OTEL_EXPORTER_JAEGER_ENDPOINT,
OTEL_EXPORTER_JAEGER_PASSWORD,
OTEL_EXPORTER_JAEGER_TIMEOUT,
OTEL_EXPORTER_JAEGER_USER,
)
from opentelemetry.sdk.resources import SERVICE_NAME
Expand Down Expand Up @@ -148,6 +149,7 @@ def test_constructor_by_environment_variables(self):
OTEL_EXPORTER_JAEGER_ENDPOINT: collector_endpoint,
OTEL_EXPORTER_JAEGER_USER: username,
OTEL_EXPORTER_JAEGER_PASSWORD: password,
OTEL_EXPORTER_JAEGER_TIMEOUT: "20",
},
)

Expand All @@ -164,6 +166,7 @@ def test_constructor_by_environment_variables(self):
self.assertEqual(exporter.service_name, service)
self.assertEqual(exporter.agent_host_name, agent_host_name)
self.assertEqual(exporter.agent_port, int(agent_port))
self.assertEqual(exporter._timeout, 20)
self.assertTrue(exporter._collector_http_client is not None)
self.assertEqual(exporter.collector_endpoint, collector_endpoint)
self.assertEqual(exporter._collector_http_client.auth, auth)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@
.. envvar:: OTEL_EXPORTER_JAEGER_PASSWORD
"""

OTEL_EXPORTER_JAEGER_TIMEOUT = "OTEL_EXPORTER_JAEGER_TIMEOUT"
"""
.. envvar:: OTEL_EXPORTER_JAEGER_TIMEOUT

Maximum time the Jaeger exporter will wait for each batch export, the default
timeout is 10s.
"""

OTEL_EXPORTER_ZIPKIN_ENDPOINT = "OTEL_EXPORTER_ZIPKIN_ENDPOINT"
"""
Expand Down