Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fixes for opentracing scopes #11869

Merged
merged 6 commits into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11869.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure that `opentracing` scopes are activated and closed at the right time.
29 changes: 23 additions & 6 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,14 @@ def start_active_span(
start_time=None,
ignore_active_span=False,
finish_on_close=True,
*,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this isn't a standard parameter for the underlying opentracing.tracer.start_active_span, I wanted to make it a kw-only param so that we can safely add more positional params to pass through in future.

tracer=None,
):
"""Starts an active opentracing span. Note, the scope doesn't become active
until it has been entered, however, the span starts from the time this
message is called.
"""Starts an active opentracing span.

Records the start time for the span, and sets it as the "active span" in the
scope manager.

Args:
See opentracing.tracer
Returns:
Expand All @@ -456,7 +460,11 @@ def start_active_span(
if opentracing is None:
return noop_context_manager() # type: ignore[unreachable]

return opentracing.tracer.start_active_span(
if tracer is None:
# use the global tracer by default
tracer = opentracing.tracer

return tracer.start_active_span(
operation_name,
child_of=child_of,
references=references,
Expand All @@ -468,7 +476,11 @@ def start_active_span(


def start_active_span_follows_from(
operation_name: str, contexts: Collection, inherit_force_tracing=False
operation_name: str,
contexts: Collection,
*,
inherit_force_tracing=False,
tracer=None,
):
"""Starts an active opentracing span, with additional references to previous spans

Expand All @@ -477,12 +489,17 @@ def start_active_span_follows_from(
contexts: the previous spans to inherit from
inherit_force_tracing: if set, and any of the previous contexts have had tracing
forced, the new span will also have tracing forced.
tracer: override the opentracing tracer. By default the global tracer is used.
"""
if opentracing is None:
return noop_context_manager() # type: ignore[unreachable]

references = [opentracing.follows_from(context) for context in contexts]
scope = start_active_span(operation_name, references=references)
scope = start_active_span(
operation_name,
references=references,
tracer=tracer,
)

if inherit_force_tracing and any(
is_context_forced_tracing(ctx) for ctx in contexts
Expand Down
76 changes: 47 additions & 29 deletions synapse/logging/scopecontextmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ class LogContextScopeManager(ScopeManager):
The LogContextScopeManager tracks the active scope in opentracing
by using the log contexts which are native to synapse. This is so
that the basic opentracing api can be used across twisted defereds.
(I would love to break logcontexts and this into an OS package. but
let's wait for twisted's contexts to be released.)

It would be nice just to use opentracing's ContextVarsScopeManager,
but currently that doesn't work due to https://twistedmatrix.com/trac/ticket/10301.
"""

def __init__(self, config):
Expand Down Expand Up @@ -65,29 +66,45 @@ def activate(self, span, finish_on_close):
Scope.close() on the returned instance.
"""

enter_logcontext = False
ctx = current_context()

if not ctx:
# We don't want this scope to affect.
logger.error("Tried to activate scope outside of loggingcontext")
return Scope(None, span) # type: ignore[arg-type]
elif ctx.scope is not None:
# We want the logging scope to look exactly the same so we give it
# a blank suffix

if ctx.scope is not None:
# start a new logging context as a child of the existing one.
# Doing so -- rather than updating the existing logcontext -- means that
# creating several concurrent spans under the same logcontext works
# correctly.
ctx = nested_logging_context("")
enter_logcontext = True
else:
# if there is no span currently associated with the current logcontext, we
# just store the scope in it.
#
# This feels a bit dubious, but it does hack around a problem where a
# span outlasts its parent logcontext (which would otherwise lead to
# "Re-starting finished log context" errors).
enter_logcontext = False

scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close)
ctx.scope = scope
if enter_logcontext:
ctx.__enter__()

return scope


class _LogContextScope(Scope):
"""
A custom opentracing scope. The only significant difference is that it will
close the log context it's related to if the logcontext was created specifically
for this scope.
A custom opentracing scope, associated with a LogContext

* filters out _DefGen_Return exceptions which arise from calling
`defer.returnValue` in Twisted code

* When the scope is closed, the logcontext's active scope is reset to None.
and - if enter_logcontext was set - the logcontext is finished too.
"""

def __init__(self, manager, span, logcontext, enter_logcontext, finish_on_close):
Expand All @@ -101,8 +118,7 @@ def __init__(self, manager, span, logcontext, enter_logcontext, finish_on_close)
logcontext (LogContext):
the logcontext to which this scope is attached.
enter_logcontext (Boolean):
if True the logcontext will be entered and exited when the scope
is entered and exited respectively
if True the logcontext will be exited when the scope is finished
finish_on_close (Boolean):
if True finish the span when the scope is closed
"""
Expand All @@ -111,26 +127,28 @@ def __init__(self, manager, span, logcontext, enter_logcontext, finish_on_close)
self._finish_on_close = finish_on_close
self._enter_logcontext = enter_logcontext

def __enter__(self):
if self._enter_logcontext:
self.logcontext.__enter__()
def __exit__(self, exc_type, value, traceback):
if exc_type == twisted.internet.defer._DefGen_Return:
# filter out defer.returnValue() calls
exc_type = value = traceback = None
super().__exit__(exc_type, value, traceback)

return self

def __exit__(self, type, value, traceback):
if type == twisted.internet.defer._DefGen_Return:
super().__exit__(None, None, None)
else:
super().__exit__(type, value, traceback)
if self._enter_logcontext:
self.logcontext.__exit__(type, value, traceback)
else: # the logcontext existed before the creation of the scope
self.logcontext.scope = None
def __str__(self):
return f"Scope<{self.span}>"

def close(self):
if self.manager.active is not self:
logger.error("Tried to close a non-active scope!")
return
active_scope = self.manager.active
if active_scope is not self:
logger.error(
"Closing scope %s which is not the currently-active one %s",
self,
active_scope,
)

if self._finish_on_close:
self.span.finish()

self.logcontext.scope = None

if self._enter_logcontext:
self.logcontext.__exit__(None, None, None)
184 changes: 184 additions & 0 deletions tests/logging/test_opentracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from twisted.internet import defer
from twisted.test.proto_helpers import MemoryReactorClock

from synapse.logging.context import (
LoggingContext,
make_deferred_yieldable,
run_in_background,
)
from synapse.logging.opentracing import (
start_active_span,
start_active_span_follows_from,
)
from synapse.util import Clock

try:
from synapse.logging.scopecontextmanager import LogContextScopeManager
except ImportError:
LogContextScopeManager = None # type: ignore

try:
import jaeger_client
except ImportError:
jaeger_client = None # type: ignore

from tests.unittest import TestCase


class LogContextScopeManagerTestCase(TestCase):
if LogContextScopeManager is None:
skip = "Requires opentracing" # type: ignore[unreachable]
if jaeger_client is None:
skip = "Requires jaeger_client" # type: ignore[unreachable]

def setUp(self) -> None:
# since this is a unit test, we don't really want to mess around with the
# global variables that power opentracing. We create our own tracer instance
# and test with it.

scope_manager = LogContextScopeManager({})
config = jaeger_client.config.Config(
config={}, service_name="test", scope_manager=scope_manager
)

self._reporter = jaeger_client.reporter.InMemoryReporter()

self._tracer = config.create_tracer(
sampler=jaeger_client.ConstSampler(True),
reporter=self._reporter,
)

def test_start_active_span(self) -> None:
# the scope manager assumes a logging context of some sort.
with LoggingContext("root context"):
self.assertIsNone(self._tracer.active_span)

# start_active_span should start and activate a span.
scope = start_active_span("span", tracer=self._tracer)
span = scope.span
self.assertEqual(self._tracer.active_span, span)
self.assertIsNotNone(span.start_time)

# entering the context doesn't actually do a whole lot.
with scope as ctx:
self.assertIs(ctx, scope)
self.assertEqual(self._tracer.active_span, span)

# ... but leaving it unsets the active span, and finishes the span.
self.assertIsNone(self._tracer.active_span)
self.assertIsNotNone(span.end_time)

# the span should have been reported
self.assertEqual(self._reporter.get_spans(), [span])

def test_nested_spans(self) -> None:
"""Starting two spans off inside each other should work"""

with LoggingContext("root context"):
with start_active_span("root span", tracer=self._tracer) as root_scope:
self.assertEqual(self._tracer.active_span, root_scope.span)

scope1 = start_active_span(
"child1",
tracer=self._tracer,
)
self.assertEqual(
self._tracer.active_span, scope1.span, "child1 was not activated"
)
self.assertEqual(
scope1.span.context.parent_id, root_scope.span.context.span_id
)

scope2 = start_active_span_follows_from(
"child2",
contexts=(scope1,),
tracer=self._tracer,
)
self.assertEqual(self._tracer.active_span, scope2.span)
self.assertEqual(
scope2.span.context.parent_id, scope1.span.context.span_id
)

with scope1, scope2:
pass

# the root scope should be restored
babolivier marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(self._tracer.active_span, root_scope.span)
self.assertIsNotNone(scope2.span.end_time)
self.assertIsNotNone(scope1.span.end_time)

self.assertIsNone(self._tracer.active_span)

# the spans should be reported in order of their finishing.
self.assertEqual(
self._reporter.get_spans(), [scope2.span, scope1.span, root_scope.span]
)

def test_overlapping_spans(self) -> None:
"""Overlapping spans which are not neatly nested should work"""
reactor = MemoryReactorClock()
clock = Clock(reactor)

scopes = []

async def task(i: int):
scope = start_active_span(
f"task{i}",
tracer=self._tracer,
)
scopes.append(scope)

self.assertEqual(self._tracer.active_span, scope.span)
await clock.sleep(4)
self.assertEqual(self._tracer.active_span, scope.span)
scope.close()

async def root():
with start_active_span("root span", tracer=self._tracer) as root_scope:
self.assertEqual(self._tracer.active_span, root_scope.span)
scopes.append(root_scope)

d1 = run_in_background(task, 1)
await clock.sleep(2)
d2 = run_in_background(task, 2)

# because we did run_in_background, the active span should still be the
# root.
self.assertEqual(self._tracer.active_span, root_scope.span)

await make_deferred_yieldable(
defer.gatherResults([d1, d2], consumeErrors=True)
)

self.assertEqual(self._tracer.active_span, root_scope.span)

with LoggingContext("root context"):
# start the test off
d1 = defer.ensureDeferred(root())

# let the tasks complete
reactor.pump((2,) * 8)

self.successResultOf(d1)
self.assertIsNone(self._tracer.active_span)

# the spans should be reported in order of their finishing: task 1, task 2,
# root.
self.assertEqual(
self._reporter.get_spans(),
[scopes[1].span, scopes[2].span, scopes[0].span],
)