diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py new file mode 100644 index 00000000000..bd25103d973 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/accumulator.py @@ -0,0 +1,55 @@ +import collections +import threading +import typing + +from opentelemetry.sdk.trace import ReadableSpan + + +class SpanAccumulator: + """ + A thread-safe container designed to collect and batch spans. It accumulates spans until a specified batch size is + reached, at which point the accumulated spans are moved into a FIFO queue. Provides methods to add spans, check if + the accumulator is non-empty, and retrieve the earliest batch of spans from the queue. + """ + + def __init__(self, batch_size: int): + self._batch_size = batch_size + self._spans: typing.List[ReadableSpan] = [] + self._batches = collections.deque() # fixme set max size + self._lock = threading.Lock() + + def nonempty(self) -> bool: + """ + Checks if the accumulator contains any spans or batches. It returns True if either the span list or the batch + queue is non-empty, and False otherwise. + """ + with self._lock: + return len(self._spans) > 0 or len(self._batches) > 0 + + def push(self, span: ReadableSpan) -> bool: + """ + Adds a span to the accumulator. If the addition causes the number of spans to reach the + specified batch size, the accumulated spans are moved into a FIFO queue as a new batch. Returns True if a new + batch was created, otherwise returns False. + """ + with self._lock: + self._spans.append(span) + if len(self._spans) < self._batch_size: + return False + self._batches.appendleft(self._spans) + self._spans = [] + return True + + def batch(self) -> typing.List[ReadableSpan]: + """ + Returns the earliest (first in line) batch of spans from the FIFO queue. If the queue is empty, returns any + remaining spans that haven't been batched. + """ + try: + return self._batches.pop() + except IndexError: + # if there are no batches left, return the current spans + with self._lock: + out = self._spans + self._spans = [] + return out diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/client.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/client.py new file mode 100644 index 00000000000..b64a4ee440c --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/client.py @@ -0,0 +1,102 @@ +import abc +import time +import typing + +import grpc + +from opentelemetry.exporter.otlp.proto.common._internal import trace_encoder +from opentelemetry.proto.collector.trace.v1 import trace_service_pb2_grpc as pb +from opentelemetry.sdk.trace import ReadableSpan + + +class GrpcClientABC(abc.ABC): + + @abc.abstractmethod + def send(self, batch: typing.Sequence[ReadableSpan]) -> grpc.StatusCode: + pass + + +class GrpcClient(GrpcClientABC): + """ + Exports a batch of spans to the specified endpoint. Wrap this in a RetryingGrpcClient for retry/backoff. + """ + + def __init__( + self, + target: str = 'localhost:4317', + timeout_sec: int = 10, + ): + self._timeout_sec = timeout_sec + self._stub = pb.TraceServiceStub(grpc.insecure_channel(target)) + + def send(self, batch: typing.Sequence[ReadableSpan]) -> grpc.StatusCode: + try: + self._stub.Export(request=trace_encoder.encode_spans(batch), timeout=self._timeout_sec) + except grpc.RpcError as err: + # noinspection PyUnresolvedReferences + return err.code() + return grpc.StatusCode.OK + + +class RetryingGrpcClient(GrpcClientABC): + """ + A GRPC client implementation that wraps another GRPC client and retries failed requests using exponential backoff. + The `sleepfunc` arg can be passed in to fake time-based sleeping for testing. + """ + + def __init__( + self, + client: GrpcClientABC, + sleepfunc: typing.Callable[[int], None] = time.sleep, + max_retries: int = 4, + initial_sleep_time_sec: int = 0.5, + ): + self._client = client + self._sleep = sleepfunc + self._max_retries = max_retries + self._initial_sleep_time_sec = initial_sleep_time_sec + + def send(self, batch: typing.Sequence[ReadableSpan]) -> grpc.StatusCode: + sleep_time_sec = self._initial_sleep_time_sec + out = grpc.StatusCode.OK + for i in range(self._max_retries): + out = self._client.send(batch) + if out == grpc.StatusCode.OK: + return grpc.StatusCode.OK + self._sleep(sleep_time_sec) + sleep_time_sec *= 2 + return out + + +class FakeGrpcClient(GrpcClientABC): + """ + A fake GRPC client that can be used for testing. To fake status codes, optionally set the `status_codes` arg to a + list/tuple of status codes you want the send() method to return. If there are more calls to send() than there are + status codes, the last status code is reused. Set the `sleep_time_sec` arg to a positive value to sleep every time + there is a send(), simulating network transmission time. + """ + + def __init__( + self, + status_codes: typing.List[grpc.StatusCode] = (grpc.StatusCode.OK,), + sleep_time_sec: int = 0, + ): + self._status_codes = status_codes + self._sleep_time_sec = sleep_time_sec + self._batches = [] + + def send(self, batch: typing.Sequence[ReadableSpan]) -> grpc.StatusCode: + time.sleep(self._sleep_time_sec) + self._batches.append(batch) + num_sends = len(self._batches) + idx = min(num_sends, len(self._status_codes)) - 1 + return self._status_codes[idx] + + def get_batches(self): + return self._batches + + def num_batches(self): + return len(self._batches) + + def num_spans_in_batch(self, idx: int): + return len(self._batches[idx]) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/exporter.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/exporter.py new file mode 100644 index 00000000000..4a460ef9d60 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/exporter.py @@ -0,0 +1,40 @@ +import typing +from enum import Enum + +import grpc + +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from opentelemetry.sdk.trace.export.experimental.client import GrpcClientABC, RetryingGrpcClient, GrpcClient + + +class ExporterFlushResult(Enum): + SUCCESS = 1 + FAILURE = 2 + TIMEOUT = 3 + + +class OTLPSpanExporter2(SpanExporter): + """ + An implementation of SpanExporter. Accepts an optional client. If one is not supplied, creates a retrying client. + Sends spans immediately -- has no queue to flush or separate thread to shut down. + """ + + def __init__(self, client: GrpcClientABC = RetryingGrpcClient(GrpcClient())): + self._client = client + + def export(self, batch: typing.Sequence[ReadableSpan]) -> SpanExportResult: + status_code = self._client.send(batch) + return SpanExportResult.SUCCESS if status_code == grpc.StatusCode.OK else SpanExportResult.FAILURE + + def force_flush(self, timeout_millis: int = 30000) -> ExporterFlushResult: + """ + Nothing to flush. + """ + return ExporterFlushResult.SUCCESS + + def shutdown(self) -> None: + """ + Nothing to shut down. + """ + pass diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py new file mode 100644 index 00000000000..40680d8ee61 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/processor.py @@ -0,0 +1,66 @@ +import typing + +from opentelemetry.context import Context +from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor, Span +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from opentelemetry.sdk.trace.export.experimental.accumulator import SpanAccumulator +from opentelemetry.sdk.trace.export.experimental.timer import TimerABC, EventBasedTimer, ThreadBasedTimer + + +class BatchSpanProcessor2(SpanProcessor): + """ + A SpanProcessor that sends spans in batches on an interval or when a maximum number of spans has been reached, + whichever comes first. + """ + + def __init__( + self, + exporter: SpanExporter, + max_batch_size: int = 512, + interval_sec: int = 5, + timer: typing.Optional[TimerABC] = None, + ): + self._exporter = exporter + self._accumulator = SpanAccumulator(max_batch_size) + self._timer = timer or ThreadBasedTimer(interval_sec) + self._timer.set_callback(self._export_batch) + self._timer.start() + + def on_start(self, span: Span, parent_context: typing.Optional[Context] = None) -> None: + pass + + def on_end(self, span: ReadableSpan) -> None: + """ + This method must be extremely fast. It adds the span to the accumulator for later sending and pokes the timer + if the number of spans waiting to be sent has reached the maximum batch size. + """ + full = self._accumulator.push(span) + if full: + self._timer.poke() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """ + Stops the timer, exports any spans in the accumulator then restarts the timer. + """ + self._timer.stop() + self._exporter.force_flush(timeout_millis) # this may be a no-op depending on the impl + while self._accumulator.nonempty(): + result = self._export_batch() + if result != SpanExportResult.SUCCESS: + return False + self._timer.start() + + def shutdown(self) -> None: + self._timer.stop() + while self._accumulator.nonempty(): + self._export_batch() + self._exporter.shutdown() + + def _export_batch(self) -> SpanExportResult: + """ + This is the timer's callback. It runs on its own thread. + """ + batch = self._accumulator.batch() + if len(batch) > 0: + return self._exporter.export(batch) + return SpanExportResult.SUCCESS diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py new file mode 100644 index 00000000000..eb8f769fb86 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/experimental/timer.py @@ -0,0 +1,151 @@ +import abc +import threading +import typing + + +class TimerABC(abc.ABC): + """ + An interface extracted from PeriodicTimer so alternative implementations can be used for testing. + + Implementations should execute the passed-in callback on a timer at the specified interval at a minimum. The + callback can be run sooner than the interval via the poke() method, which also resets the timer. + """ + + @abc.abstractmethod + def set_callback(self, cb) -> None: + pass + + @abc.abstractmethod + def start(self) -> None: + pass + + @abc.abstractmethod + def poke(self) -> None: + pass + + @abc.abstractmethod + def stop(self) -> None: + pass + + +class ThreadBasedTimer(TimerABC): + """ + A Timer implementation that uses a threading.Timer for each interval and runs the callback asynchronously using a + new Thread on poke(). + """ + + def __init__(self, interval_sec: int): + self.interval_sec = interval_sec + self.cb = lambda: None + self.timer = None + self.lock = threading.Lock() + + def set_callback(self, cb) -> None: + with self.lock: + self.cb = cb + + def start(self) -> None: + with self.lock: + self.timer = threading.Timer(self.interval_sec, self._work) + self.timer.daemon = True + self.timer.start() + + def _work(self): + self.cb() + self.start() + + def poke(self) -> None: + with self.lock: + self._do_stop() + threading.Thread(target=self._work, daemon=True).start() + + def stop(self) -> None: + with self.lock: + self._do_stop() + + def _do_stop(self): + if self.timer is None: + return + self.timer.cancel() + self.timer = None + + +class EventBasedTimer(TimerABC): + """ + Deprecated but left here for reference. I believe this implementation is unnecessarily complicated. + """ + + def __init__( + self, + interval_sec: int, + callback: typing.Callable[[], None] = lambda: None, + daemon: bool = True, + ): + self._interval_sec = interval_sec + self._callback = callback + self._daemon = daemon + self._stop = threading.Event() + self._poke = threading.Event() + self._new_thread() + + def _new_thread(self): + self._thread = threading.Thread(target=self._work, daemon=self._daemon) + + def set_callback(self, callback: typing.Callable[[], None]) -> None: + self._callback = callback + + def start(self) -> None: + self._thread.start() + + def _work(self) -> None: + while True: + self._poke.wait(timeout=self._interval_sec) + if self._stop.is_set(): + break + self._callback() + + def poke(self) -> None: + """ + This method schedules the callback to be executed immediately instead of waiting for the next timeout. It also + resets the timer. + """ + self._poke.set() + + def stop(self) -> None: + self._stop.set() + self.poke() # in case we're waiting for a poke timeout + self._thread.join() + self._new_thread() # in case we want to start it again + + def started(self) -> bool: + return self._thread.is_alive() + + def stopped(self) -> bool: + return self._stop.is_set() + + +class ThreadlessTimer(TimerABC): + """ + For testing/experimentation. Synchronously executes the callback when you call poke(). + """ + + def __init__(self): + self._cb = lambda: None + + def set_callback(self, cb): + self._cb = cb + + def start(self): + pass + + def poke(self): + self._cb() + + def stop(self): + pass + + def started(self) -> None: + pass + + def stopped(self) -> None: + pass diff --git a/opentelemetry-sdk/tests/trace/export/__init__.py b/opentelemetry-sdk/tests/trace/export/__init__.py deleted file mode 100644 index b0a6f428417..00000000000 --- a/opentelemetry-sdk/tests/trace/export/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/opentelemetry-sdk/tests/trace/export/test_experimental.py b/opentelemetry-sdk/tests/trace/export/test_experimental.py new file mode 100644 index 00000000000..591a30825f7 --- /dev/null +++ b/opentelemetry-sdk/tests/trace/export/test_experimental.py @@ -0,0 +1,123 @@ +import time +import typing +import unittest + +import grpc + +import util +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult, BatchSpanProcessor +from opentelemetry.sdk.trace.export.experimental.client import RetryingGrpcClient, FakeGrpcClient +from opentelemetry.sdk.trace.export.experimental.exporter import OTLPSpanExporter2 +from opentelemetry.sdk.trace.export.experimental.processor import BatchSpanProcessor2 +from opentelemetry.sdk.trace.export.experimental.timer import ThreadlessTimer, ThreadBasedTimer + + +class TestBatchSpanProcessor(unittest.TestCase): + + def test_export_by_max_batch_size(self): + exporter = FakeSpanExporter() + timer = ThreadlessTimer() + proc = BatchSpanProcessor2(exporter=exporter, timer=timer, max_batch_size=2) + num_spans = 16 + for _ in range(num_spans): + util.create_start_and_end_span('foo', proc) + # we have a batch size of 2, and we've exported 16 spans, so we expect 8 batches + self.assertEqual(8, exporter.count()) + + def test_export_by_timer(self): + exporter = FakeSpanExporter() + timer = ThreadlessTimer() + # we have a batch size of 32, which is larger than the 16 spans that we're planning on sending + proc = BatchSpanProcessor2(exporter=exporter, timer=timer, max_batch_size=32) + num_spans = 16 + for i in range(num_spans): + util.create_start_and_end_span('foo', proc) + self.assertEqual(0, exporter.count()) + # we want this test to be fast, so we don't sleep() -- instead we perform a manual poke() + timer.poke() + self.assertEqual(1, exporter.count()) + + +class TestRetryingGrpcClient(unittest.TestCase): + + def test_success(self): + fs = FakeSleeper() + rt = RetryingGrpcClient( + FakeGrpcClient([grpc.StatusCode.OK]), + sleepfunc=fs.sleep + ) + status_code = rt.send([(util.mk_readable_span())]) + self.assertEqual(grpc.StatusCode.OK, status_code) + self.assertListEqual([], fs.get_sleeps()) + + def test_retry_all_failed(self): + fs = FakeSleeper() + rt = RetryingGrpcClient( + FakeGrpcClient([grpc.StatusCode.UNAVAILABLE]), + sleepfunc=fs.sleep + ) + status_code = rt.send([(util.mk_readable_span())]) + self.assertEqual(grpc.StatusCode.UNAVAILABLE, status_code) + self.assertListEqual([0.5, 1.0, 2.0, 4.0], fs.get_sleeps()) + + def test_retry_initial_failure_then_success(self): + fs = FakeSleeper() + rt = RetryingGrpcClient( + FakeGrpcClient([grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.OK]), + sleepfunc=fs.sleep + ) + status_code = rt.send([(util.mk_readable_span())]) + self.assertEqual(grpc.StatusCode.OK, status_code) + self.assertListEqual([0.5], fs.get_sleeps()) + + +class TestOTLPSpanExporter(unittest.TestCase): + + def test_exporter(self): + client = FakeGrpcClient() + exporter = OTLPSpanExporter2(client=client) + timer = ThreadlessTimer() + proc = BatchSpanProcessor2(exporter, timer=timer, max_batch_size=128) + span = util.mk_readable_span() + num_spans = 100 # less than the batch size of 128 + for _ in range(num_spans): + proc.on_end(span) + timer.poke() + client.num_batches() + self.assertEqual(1, client.num_batches()) + self.assertEqual(num_spans, client.num_spans_in_batch(0)) + + +class FakeSleeper: + + def __init__(self): + self._sleeps = [] + + def sleep(self, seconds): + self._sleeps.append(seconds) + + def get_sleeps(self): + return self._sleeps + + +class FakeSpanExporter(SpanExporter): + + def __init__(self): + self._exported = [] + + def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: + self._exported.append(spans) + return SpanExportResult.SUCCESS + + def shutdown(self) -> None: + pass + + def force_flush(self, timeout_millis: int = 30000) -> bool: + pass + + def get_exported(self): + return self._exported + + def count(self): + return len(self._exported) diff --git a/opentelemetry-sdk/tests/trace/export/test_integration.py b/opentelemetry-sdk/tests/trace/export/test_integration.py new file mode 100644 index 00000000000..8f1cddaf513 --- /dev/null +++ b/opentelemetry-sdk/tests/trace/export/test_integration.py @@ -0,0 +1,165 @@ +import threading +import time +import unittest +from concurrent import futures +from os import environ + +import grpc + +import util +from opentelemetry.proto.collector.logs.v1 import logs_service_pb2, logs_service_pb2_grpc +from opentelemetry.proto.collector.metrics.v1 import metrics_service_pb2, metrics_service_pb2_grpc +from opentelemetry.proto.collector.trace.v1 import trace_service_pb2, trace_service_pb2_grpc +from opentelemetry.sdk.trace import SpanProcessor +from opentelemetry.sdk.trace.export.experimental.exporter import OTLPSpanExporter2 +from opentelemetry.sdk.trace.export.experimental.processor import BatchSpanProcessor2 +from opentelemetry.sdk.trace.export.experimental.timer import ThreadBasedTimer, EventBasedTimer, ThreadlessTimer + + +@unittest.skipUnless(environ.get('RUN_LONG_TESTS', '').lower() == 'true', 'Skipping, RUN_LONG_TESTS not set') +class TestIntegration(unittest.TestCase): + + def test_full_speed(self): + server = OTLPServer() + server.start() + max_interval_sec = 4 + + bsp = BatchSpanProcessor2(OTLPSpanExporter2()) + num_spans_per_firehose = 1_000 + sf = SpanFirehose(bsp, num_spans=num_spans_per_firehose, sleep_sec=0) + + threads = [] + num_threads = 128 + for _ in range(num_threads): + thread = threading.Thread(target=sf.run) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + + time.sleep(max_interval_sec * 2) + + num_span_received = server.get_num_spans_received() + self.assertEqual(num_spans_per_firehose * num_threads, num_span_received) + server.stop() + + def test_slower(self): + server = OTLPServer() + server.start() + max_interval_sec = 4 + bsp = BatchSpanProcessor2(OTLPSpanExporter2(), timer=ThreadBasedTimer(max_interval_sec)) + num_spans_per_firehose = 1000 + sf = SpanFirehose(bsp, num_spans=num_spans_per_firehose, sleep_sec=0.01) + threads = [] + num_threads = 128 + for _ in range(num_threads): + thread = threading.Thread(target=sf.run) + thread.start() + threads.append(thread) + for thread in threads: + thread.join() + time.sleep(max_interval_sec * 2) + num_span_received = server.get_num_spans_received() + self.assertEqual(num_spans_per_firehose * num_threads, num_span_received) + server.stop() + + def test_slow_enough_to_engage_timer(self): + server = OTLPServer() + server.start() + bsp = BatchSpanProcessor2(OTLPSpanExporter2()) + num_spans = 10 + sf = SpanFirehose(bsp, num_spans=num_spans, sleep_sec=1) + sf.run() + time.sleep(5) + num_span_received = server.get_num_spans_received() + self.assertEqual(num_spans, num_span_received) + server.stop() + + +class SpanFirehose: + + def __init__(self, sp: SpanProcessor, num_spans: int, sleep_sec: float): + self._sp = sp + self._num_spans = num_spans + self._sleep_sec = sleep_sec + + def run(self) -> float: + start = time.time() + span = util.mk_span('test-span') + for _ in range(self._num_spans): + time.sleep(self._sleep_sec) + self._sp.on_end(span) + return time.time() - start + + +class OTLPServer: + + def __init__(self): + self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + + self.trace_servicer = TraceServiceServicer() + trace_service_pb2_grpc.add_TraceServiceServicer_to_server(self.trace_servicer, self.server) + + metrics_servicer = MetricsServiceServicer() + metrics_service_pb2_grpc.add_MetricsServiceServicer_to_server(metrics_servicer, self.server) + + logs_servicer = LogsServiceServicer() + logs_service_pb2_grpc.add_LogsServiceServicer_to_server(logs_servicer, self.server) + + self.server.add_insecure_port('0.0.0.0:4317') + + def start(self): + self.server.start() + + def stop(self): + self.server.stop(0) + + def get_num_spans_received(self): + return self.trace_servicer.get_num_spans() + + +def serve_otel_grpc(): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + trace_service_pb2_grpc.add_TraceServiceServicer_to_server(TraceServiceServicer(), server) + metrics_service_pb2_grpc.add_MetricsServiceServicer_to_server(MetricsServiceServicer(), server) + logs_service_pb2_grpc.add_LogsServiceServicer_to_server(LogsServiceServicer(), server) + server.add_insecure_port('0.0.0.0:4317') + server.start() + return server + + +class LogsServiceServicer(logs_service_pb2_grpc.LogsServiceServicer): + + def __init__(self): + self.requests_received = [] + + def Export(self, request, context): + self.requests_received.append(request) + return logs_service_pb2.ExportLogsServiceResponse() + + +class TraceServiceServicer(trace_service_pb2_grpc.TraceServiceServicer): + + def __init__(self): + self.requests_received = [] + + def Export(self, request, context): + self.requests_received.append(request) + return trace_service_pb2.ExportTraceServiceResponse() + + def get_num_spans(self): + out = 0 + for req in self.requests_received: + out += len(req.resource_spans[0].scope_spans[0].spans) + return out + + +class MetricsServiceServicer(metrics_service_pb2_grpc.MetricsServiceServicer): + + def __init__(self): + self.requests_received = [] + + def Export(self, request, context): + self.requests_received.append(request) + return metrics_service_pb2.ExportMetricsServiceResponse() diff --git a/opentelemetry-sdk/tests/trace/export/util.py b/opentelemetry-sdk/tests/trace/export/util.py new file mode 100644 index 00000000000..784cd1a2d1f --- /dev/null +++ b/opentelemetry-sdk/tests/trace/export/util.py @@ -0,0 +1,29 @@ +from opentelemetry.sdk.trace import ReadableSpan, _Span +from opentelemetry.trace import SpanContext, TraceFlags + + +def mk_readable_span(): + ctx = SpanContext(0, 0, False) + return ReadableSpan(context=ctx, attributes={}) + + +def mk_spans(n): + span = mk_span('foo') + out = [] + for _ in range(n): + out.append(span) + return out + + +def create_start_and_end_span(name, span_processor): + span = _Span(name, mk_ctx(), span_processor=span_processor) + span.start() + span.end() + + +def mk_span(name): + return _Span(name=name, context=mk_ctx()) + + +def mk_ctx(): + return SpanContext(1, 2, False, trace_flags=TraceFlags(TraceFlags.SAMPLED))