Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mauricio/add testbed for otshim #727

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ pytest!=5.2.3
pytest-cov>=2.8
readme-renderer~=24.0
httpretty~=1.0
opentracing~=2.2.0
47 changes: 47 additions & 0 deletions ext/opentelemetry-ext-opentracing-shim/tests/testbed/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@

Testbed suite for the OpenTelemetry-OpenTracing Bridge
======================================================

Testbed suite designed to test the API changes.

Build and test.
---------------

.. code-block:: sh

tox -e py37-test-opentracing-shim

Alternatively, due to the organization of the suite, it's possible to run directly the tests using ``py.test``\ :

.. code-block:: sh

py.test -s testbed/test_multiple_callbacks/test_threads.py

Tested frameworks
-----------------

Currently the examples cover ``threading`` and ``asyncio``.

List of patterns
----------------


* `Active Span replacement <test_active_span_replacement>`_ - Start an isolated task and query for its results in another task/thread.
* `Client-Server <test_client_server>`_ - Typical client-server example.
* `Common Request Handler <test_common_request_handler>`_ - One request handler for all requests.
* `Late Span finish <test_late_span_finish>`_ - Late parent ``Span`` finish.
* `Multiple callbacks <test_multiple_callbacks>`_ - Multiple callbacks spawned at the same time.
* `Nested callbacks <test_nested_callbacks>`_ - One callback at a time, defined in a pipeline fashion.
* `Subtask Span propagation <test_subtask_span_propagation>`_ - ``Span`` propagation for subtasks/coroutines.

Adding new patterns
-------------------

A new pattern is composed of a directory under *testbed* with the *test_* prefix, and containing the files for each platform, also with the *test_* prefix:

.. code-block::

testbed/
test_new_pattern/
test_threads.py
test_asyncio.py
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import opentelemetry.ext.opentracing_shim as opentracingshim
from opentelemetry.sdk import trace
from opentelemetry.sdk.trace.export import SimpleExportSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)


class MockTracer(opentracingshim.TracerShim):
"""Wrapper of `opentracingshim.TracerShim`.

MockTracer extends `opentracingshim.TracerShim` by adding a in memory
span exporter that can be used to get the list of finished spans."""

def __init__(self):
tracer_provider = trace.TracerProvider()
oteltracer = tracer_provider.get_tracer(__name__)
super(MockTracer, self).__init__(oteltracer)
exporter = InMemorySpanExporter()
span_processor = SimpleExportSpanProcessor(exporter)
tracer_provider.add_span_processor(span_processor)

self.exporter = exporter

def finished_spans(self):
return self.exporter.get_finished_spans()
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

Active Span replacement example.
================================

This example shows a ``Span`` being created and then passed to an asynchronous task, which will temporary activate it to finish its processing, and further restore the previously active ``Span``.

``threading`` implementation:

.. code-block:: python

# Create a new Span for this task
with self.tracer.start_active_span("task"):

with self.tracer.scope_manager.activate(span, True):
# Simulate work strictly related to the initial Span
pass

# Use the task span as parent of a new subtask
with self.tracer.start_active_span("subtask"):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from __future__ import print_function

import asyncio

from ..otel_ot_shim_tracer import MockTracer
from ..testcase import OpenTelemetryTestCase
from ..utils import stop_loop_when


class TestAsyncio(OpenTelemetryTestCase):
def setUp(self):
self.tracer = MockTracer()
self.loop = asyncio.get_event_loop()

def test_main(self):
# Start an isolated task and query for its result -and finish it-
# in another task/thread
span = self.tracer.start_span("initial")
self.submit_another_task(span)

stop_loop_when(
self.loop,
lambda: len(self.tracer.finished_spans()) >= 3,
timeout=5.0,
)
self.loop.run_forever()

spans = self.tracer.finished_spans()
self.assertEqual(len(spans), 3)
self.assertNamesEqual(spans, ["initial", "subtask", "task"])

# task/subtask are part of the same trace,
# and subtask is a child of task
self.assertSameTrace(spans[1], spans[2])
self.assertIsChildOf(spans[1], spans[2])

# initial task is not related in any way to those two tasks
self.assertNotSameTrace(spans[0], spans[1])
self.assertEqual(spans[0].parent, None)

async def task(self, span):
# Create a new Span for this task
with self.tracer.start_active_span("task"):

with self.tracer.scope_manager.activate(span, True):
# Simulate work strictly related to the initial Span
pass

# Use the task span as parent of a new subtask
with self.tracer.start_active_span("subtask"):
pass

def submit_another_task(self, span):
self.loop.create_task(self.task(span))
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from __future__ import print_function

from concurrent.futures import ThreadPoolExecutor

from ..otel_ot_shim_tracer import MockTracer
from ..testcase import OpenTelemetryTestCase


class TestThreads(OpenTelemetryTestCase):
def setUp(self):
self.tracer = MockTracer()
# use max_workers=3 as a general example even if only one would suffice
self.executor = ThreadPoolExecutor(max_workers=3)

def test_main(self):
# Start an isolated task and query for its result -and finish it-
# in another task/thread
span = self.tracer.start_span("initial")
self.submit_another_task(span)

self.executor.shutdown(True)

spans = self.tracer.finished_spans()
self.assertEqual(len(spans), 3)
self.assertNamesEqual(spans, ["initial", "subtask", "task"])

# task/subtask are part of the same trace,
# and subtask is a child of task
self.assertSameTrace(spans[1], spans[2])
self.assertIsChildOf(spans[1], spans[2])

# initial task is not related in any way to those two tasks
self.assertNotSameTrace(spans[0], spans[1])
self.assertEqual(spans[0].parent, None)
self.assertEqual(spans[2].parent, None)

def task(self, span):
# Create a new Span for this task
with self.tracer.start_active_span("task"):

with self.tracer.scope_manager.activate(span, True):
# Simulate work strictly related to the initial Span
pass

# Use the task span as parent of a new subtask
with self.tracer.start_active_span("subtask"):
pass

def submit_another_task(self, span):
self.executor.submit(self.task, span)
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

Client-Server example.
======================

This example shows a ``Span`` created by a ``Client``, which will send a ``Message`` / ``SpanContext`` to a ``Server``, which will in turn extract such context and use it as parent of a new (server-side) ``Span``.

``Client.send()`` is used to send messages and inject the ``SpanContext`` using the ``TEXT_MAP`` format, and ``Server.process()`` will process received messages and will extract the context used as parent.

.. code-block:: python

def send(self):
with self.tracer.start_active_span("send") as scope:
scope.span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)

message = {}
self.tracer.inject(scope.span.context,
opentracing.Format.TEXT_MAP,
message)
self.queue.put(message)
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from __future__ import print_function

import asyncio

import opentracing
from opentracing.ext import tags

from ..otel_ot_shim_tracer import MockTracer
from ..testcase import OpenTelemetryTestCase
from ..utils import get_logger, get_one_by_tag, stop_loop_when

logger = get_logger(__name__)


class Server:
def __init__(self, *args, **kwargs):
tracer = kwargs.pop("tracer")
queue = kwargs.pop("queue")
super(Server, self).__init__(*args, **kwargs)

self.tracer = tracer
self.queue = queue

async def run(self):
value = await self.queue.get()
self.process(value)

def process(self, message):
logger.info("Processing message in server")

ctx = self.tracer.extract(opentracing.Format.TEXT_MAP, message)
with self.tracer.start_active_span("receive", child_of=ctx) as scope:
scope.span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)


class Client:
def __init__(self, tracer, queue):
self.tracer = tracer
self.queue = queue

async def send(self):
with self.tracer.start_active_span("send") as scope:
scope.span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)

message = {}
self.tracer.inject(
scope.span.context, opentracing.Format.TEXT_MAP, message
)
await self.queue.put(message)

logger.info("Sent message from client")


class TestAsyncio(OpenTelemetryTestCase):
def setUp(self):
self.tracer = MockTracer()
self.queue = asyncio.Queue()
self.loop = asyncio.get_event_loop()
self.server = Server(tracer=self.tracer, queue=self.queue)

def test(self):
client = Client(self.tracer, self.queue)
self.loop.create_task(self.server.run())
self.loop.create_task(client.send())

stop_loop_when(
self.loop,
lambda: len(self.tracer.finished_spans()) >= 2,
timeout=5.0,
)
self.loop.run_forever()

spans = self.tracer.finished_spans()
self.assertIsNotNone(
get_one_by_tag(spans, tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
)
self.assertIsNotNone(
get_one_by_tag(spans, tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from __future__ import print_function

from queue import Queue
from threading import Thread

import opentracing
from opentracing.ext import tags

from ..otel_ot_shim_tracer import MockTracer
from ..testcase import OpenTelemetryTestCase
from ..utils import await_until, get_logger, get_one_by_tag

logger = get_logger(__name__)


class Server(Thread):
def __init__(self, *args, **kwargs):
tracer = kwargs.pop("tracer")
queue = kwargs.pop("queue")
super(Server, self).__init__(*args, **kwargs)

self.daemon = True
self.tracer = tracer
self.queue = queue

def run(self):
value = self.queue.get()
self.process(value)

def process(self, message):
logger.info("Processing message in server")

ctx = self.tracer.extract(opentracing.Format.TEXT_MAP, message)
with self.tracer.start_active_span("receive", child_of=ctx) as scope:
scope.span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)


class Client:
def __init__(self, tracer, queue):
self.tracer = tracer
self.queue = queue

def send(self):
with self.tracer.start_active_span("send") as scope:
scope.span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)

message = {}
self.tracer.inject(
scope.span.context, opentracing.Format.TEXT_MAP, message
)
self.queue.put(message)

logger.info("Sent message from client")


class TestThreads(OpenTelemetryTestCase):
def setUp(self):
self.tracer = MockTracer()
self.queue = Queue()
self.server = Server(tracer=self.tracer, queue=self.queue)
self.server.start()

def test(self):
client = Client(self.tracer, self.queue)
client.send()

await_until(lambda: len(self.tracer.finished_spans()) >= 2)

spans = self.tracer.finished_spans()
self.assertIsNotNone(
get_one_by_tag(spans, tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
)
self.assertIsNotNone(
get_one_by_tag(spans, tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)
)
Loading