Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zipkin exporter: Add timeout support and implement shutdown #1799

Merged
merged 13 commits into from
Apr 29, 2021
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1764](https://github.com/open-telemetry/opentelemetry-python/pull/1764))
- Added experimental HTTP back propagators.
([#1762](https://github.com/open-telemetry/opentelemetry-python/pull/1762))
- Zipkin exporter: Add support for timeout and implement shutdown
([#1799](https://github.com/open-telemetry/opentelemetry-python/pull/1799))

### Changed
- Adjust `B3Format` propagator to be spec compliant by not modifying context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
# local_node_ipv6="2001:db8::c001",
# local_node_port=31313,
# max_tag_value_length=256
# timeout=5 (in seconds)
)

# Create a BatchSpanProcessor and add the exporter to it
Expand All @@ -62,6 +63,7 @@
The exporter supports the following environment variable for configuration:

- :envvar:`OTEL_EXPORTER_ZIPKIN_ENDPOINT`
- :envvar:`OTEL_EXPORTER_ZIPKIN_TIMEOUT`

API
---
Expand All @@ -83,6 +85,7 @@
from opentelemetry.exporter.zipkin.node_endpoint import IpInput, NodeEndpoint
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_ZIPKIN_ENDPOINT,
OTEL_EXPORTER_ZIPKIN_TIMEOUT,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this env var be proposed to the specs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR in spec repo open-telemetry/opentelemetry-specification#1636. It has already two approvals and I don't see any objection coming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice looks like it merged!

)
from opentelemetry.sdk.resources import SERVICE_NAME
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
Expand All @@ -103,7 +106,24 @@ def __init__(
local_node_ipv6: IpInput = None,
local_node_port: Optional[int] = None,
max_tag_value_length: Optional[int] = None,
timeout: Optional[int] = None,
):
"""Zipkin exporter.

Args:
version: The protocol version to be used.
endpoint: The endpoint of the Zipkin collector.
local_node_ipv4: Primary IPv4 address associated with this connection.
local_node_ipv6: Primary IPv6 address associated with this connection.
local_node_port: Depending on context, this could be a listen port or the
client-side of a socket.
max_tag_value_length: Max length string attribute values can have.
timeout: Maximum time the Zipkin exporter will wait for each batch export.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth calling out the 10s default?

codeboten marked this conversation as resolved.
Show resolved Hide resolved
The default value is 10s.

The tuple (local_node_ipv4, local_node_ipv6, local_node_port) is used to represent
the network context of a node in the service graph.
"""
self.local_node = NodeEndpoint(
local_node_ipv4, local_node_ipv6, local_node_port
)
Expand All @@ -119,7 +139,22 @@ def __init__(
elif version == Protocol.V2:
self.encoder = JsonV2Encoder(max_tag_value_length)

self.session = requests.Session()
self.session.headers.update(
{"Content-Type": self.encoder.content_type()}
)
self._closed = False
self.timeout = timeout or int(
environ.get(OTEL_EXPORTER_ZIPKIN_TIMEOUT, 10)
)

def export(self, spans: Sequence[Span]) -> SpanExportResult:
# After the call to Shutdown subsequent calls to Export are
# not allowed and should return a Failure result
if self._closed:
logger.warning("Exporter already shutdown, ignoring batch")
return SpanExportResult.FAILURE

# Populate service_name from first span
# We restrict any SpanProcessor to be only associated with a single
# TracerProvider, so it is safe to assume that all Spans in a single
Expand All @@ -129,10 +164,10 @@ def export(self, spans: Sequence[Span]) -> SpanExportResult:
service_name = spans[0].resource.attributes.get(SERVICE_NAME)
if service_name:
self.local_node.service_name = service_name
result = requests.post(
result = self.session.post(
url=self.endpoint,
data=self.encoder.serialize(spans, self.local_node),
headers={"Content-Type": self.encoder.content_type()},
timeout=self.timeout,
)

if result.status_code not in REQUESTS_SUCCESS_STATUS_CODES:
Expand All @@ -145,4 +180,8 @@ def export(self, spans: Sequence[Span]) -> SpanExportResult:
return SpanExportResult.SUCCESS

def shutdown(self) -> None:
pass
if self._closed:
logger.warning("Exporter already shutdown, ignoring call")
return
self.session.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this do to in-flight requests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't affect the in-flight requests. Internally it uses this pool manager https://urllib3.readthedocs.io/en/1.24.3/reference/index.html#urllib3.poolmanager.PoolManager.clear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it won't drop them but it won't wait for them to finish either? I think we should probably wait for them to finish if it can be done easily.

self._closed = True
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from opentelemetry.exporter.zipkin.node_endpoint import NodeEndpoint
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_ZIPKIN_ENDPOINT,
OTEL_EXPORTER_ZIPKIN_TIMEOUT,
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider, _Span
Expand All @@ -48,8 +49,8 @@ def setUpClass(cls):
)

def tearDown(self):
if OTEL_EXPORTER_ZIPKIN_ENDPOINT in os.environ:
del os.environ[OTEL_EXPORTER_ZIPKIN_ENDPOINT]
os.environ.pop(OTEL_EXPORTER_ZIPKIN_ENDPOINT, None)
os.environ.pop(OTEL_EXPORTER_ZIPKIN_TIMEOUT, None)

def test_constructor_default(self):
exporter = ZipkinExporter()
Expand All @@ -63,6 +64,7 @@ def test_constructor_default(self):
def test_constructor_env_vars(self):
os_endpoint = "https://foo:9911/path"
os.environ[OTEL_EXPORTER_ZIPKIN_ENDPOINT] = os_endpoint
os.environ[OTEL_EXPORTER_ZIPKIN_TIMEOUT] = "15"

exporter = ZipkinExporter()

Expand All @@ -71,6 +73,7 @@ def test_constructor_env_vars(self):
self.assertEqual(exporter.local_node.ipv4, None)
self.assertEqual(exporter.local_node.ipv6, None)
self.assertEqual(exporter.local_node.port, None)
self.assertEqual(exporter.timeout, 15)

def test_constructor_protocol_endpoint(self):
"""Test the constructor for the common usage of providing the
Expand All @@ -92,13 +95,15 @@ def test_constructor_all_params_and_env_vars(self):
"""
os_endpoint = "https://os.env.param:9911/path"
os.environ[OTEL_EXPORTER_ZIPKIN_ENDPOINT] = os_endpoint
os.environ[OTEL_EXPORTER_ZIPKIN_TIMEOUT] = "15"

constructor_param_version = Protocol.V2
constructor_param_endpoint = "https://constructor.param:9911/path"
local_node_ipv4 = "192.168.0.1"
local_node_ipv6 = "2001:db8::1000"
local_node_port = 30301
max_tag_value_length = 56
timeout_param = 20

exporter = ZipkinExporter(
constructor_param_version,
Expand All @@ -107,6 +112,7 @@ def test_constructor_all_params_and_env_vars(self):
local_node_ipv6,
local_node_port,
max_tag_value_length,
timeout_param,
)

self.assertIsInstance(exporter.encoder, JsonV2Encoder)
Expand All @@ -119,24 +125,27 @@ def test_constructor_all_params_and_env_vars(self):
exporter.local_node.ipv6, ipaddress.IPv6Address(local_node_ipv6)
)
self.assertEqual(exporter.local_node.port, local_node_port)
# Assert timeout passed in constructor is prioritized over env
# when both are set.
self.assertEqual(exporter.timeout, 20)

@patch("requests.post")
@patch("requests.Session.post")
def test_export_success(self, mock_post):
mock_post.return_value = MockResponse(200)
spans = []
exporter = ZipkinExporter()
status = exporter.export(spans)
self.assertEqual(SpanExportResult.SUCCESS, status)

@patch("requests.post")
@patch("requests.Session.post")
def test_export_invalid_response(self, mock_post):
mock_post.return_value = MockResponse(404)
spans = []
exporter = ZipkinExporter()
status = exporter.export(spans)
self.assertEqual(SpanExportResult.FAILURE, status)

@patch("requests.post")
@patch("requests.Session.post")
def test_export_span_service_name(self, mock_post):
mock_post.return_value = MockResponse(200)
resource = Resource.create({SERVICE_NAME: "test"})
Expand All @@ -152,6 +161,30 @@ def test_export_span_service_name(self, mock_post):
exporter.export([span])
self.assertEqual(exporter.local_node.service_name, "test")

@patch("requests.Session.post")
def test_export_shutdown(self, mock_post):
mock_post.return_value = MockResponse(200)
spans = []
exporter = ZipkinExporter()
status = exporter.export(spans)
self.assertEqual(SpanExportResult.SUCCESS, status)

exporter.shutdown()
# Any call to .export() post shutdown should return failure
status = exporter.export(spans)
self.assertEqual(SpanExportResult.FAILURE, status)

@patch("requests.Session.post")
def test_export_timeout(self, mock_post):
mock_post.return_value = MockResponse(200)
spans = []
exporter = ZipkinExporter(timeout=2)
status = exporter.export(spans)
self.assertEqual(SpanExportResult.SUCCESS, status)
mock_post.assert_called_with(
url="http://localhost:9411/api/v2/spans", data="[]", timeout=2
)


class TestZipkinNodeEndpoint(unittest.TestCase):
def test_constructor_default(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
# local_node_ipv6="2001:db8::c001",
# local_node_port=31313,
# max_tag_value_length=256
# timeout=5 (in seconds)
)

# Create a BatchSpanProcessor and add the exporter to it
Expand All @@ -61,6 +62,7 @@
The exporter supports the following environment variable for configuration:

- :envvar:`OTEL_EXPORTER_ZIPKIN_ENDPOINT`
- :envvar:`OTEL_EXPORTER_ZIPKIN_TIMEOUT`

API
---
Expand All @@ -81,6 +83,7 @@
from opentelemetry.exporter.zipkin.node_endpoint import IpInput, NodeEndpoint
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_ZIPKIN_ENDPOINT,
OTEL_EXPORTER_ZIPKIN_TIMEOUT,
)
from opentelemetry.sdk.resources import SERVICE_NAME
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
Expand All @@ -100,7 +103,24 @@ def __init__(
local_node_ipv6: IpInput = None,
local_node_port: Optional[int] = None,
max_tag_value_length: Optional[int] = None,
timeout: Optional[int] = None,
):
"""Zipkin exporter.

Args:
version: The protocol version to be used.
endpoint: The endpoint of the Zipkin collector.
local_node_ipv4: Primary IPv4 address associated with this connection.
local_node_ipv6: Primary IPv6 address associated with this connection.
local_node_port: Depending on context, this could be a listen port or the
client-side of a socket.
max_tag_value_length: Max length string attribute values can have.
timeout: Maximum time the Zipkin exporter will wait for each batch export.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same note about mentioning the default timeout

codeboten marked this conversation as resolved.
Show resolved Hide resolved
The default value is 10s.

The tuple (local_node_ipv4, local_node_ipv6, local_node_port) is used to represent
the network context of a node in the service graph.
"""
self.local_node = NodeEndpoint(
local_node_ipv4, local_node_ipv6, local_node_port
)
Expand All @@ -113,7 +133,21 @@ def __init__(

self.encoder = ProtobufEncoder(max_tag_value_length)

self.session = requests.Session()
self.session.headers.update(
{"Content-Type": self.encoder.content_type()}
)
self._closed = False
self.timeout = timeout or int(
environ.get(OTEL_EXPORTER_ZIPKIN_TIMEOUT, 10)
)

def export(self, spans: Sequence[Span]) -> SpanExportResult:
# After the call to Shutdown subsequent calls to Export are
# not allowed and should return a Failure result
if self._closed:
logger.warning("Exporter already shutdown, ignoring batch")
return SpanExportResult.FAILURE
# Populate service_name from first span
# We restrict any SpanProcessor to be only associated with a single
# TracerProvider, so it is safe to assume that all Spans in a single
Expand All @@ -123,10 +157,10 @@ def export(self, spans: Sequence[Span]) -> SpanExportResult:
service_name = spans[0].resource.attributes.get(SERVICE_NAME)
if service_name:
self.local_node.service_name = service_name
result = requests.post(
result = self.session.post(
url=self.endpoint,
data=self.encoder.serialize(spans, self.local_node),
headers={"Content-Type": self.encoder.content_type()},
timeout=self.timeout,
)

if result.status_code not in REQUESTS_SUCCESS_STATUS_CODES:
Expand All @@ -139,4 +173,8 @@ def export(self, spans: Sequence[Span]) -> SpanExportResult:
return SpanExportResult.SUCCESS

def shutdown(self) -> None:
pass
if self._closed:
logger.warning("Exporter already shutdown, ignoring call")
return
self.session.close()
self._closed = True
Loading