Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Traces should include SQL executed by subtasks created with asyncio.gather #1576

Closed
simonw opened this issue Dec 22, 2021 · 12 comments
Closed

Comments

@simonw
Copy link
Owner

simonw commented Dec 22, 2021

I tried running some parallel SQL queries using asyncio.gather() but the SQL that was executed didn't show up in the trace rendered by https://datasette.io/plugins/datasette-pretty-traces

I realized that was because traces are keyed against the current task ID, which changes when a sub-task is run using asyncio.gather or similar.

The faceting and suggest faceting queries are missing from this trace:

image

The reason they aren't showing up in the traces is that traces are stored just for the currently executing asyncio task ID:

# asyncio.current_task was introduced in Python 3.7:
for obj in (asyncio, asyncio.Task):
current_task = getattr(obj, "current_task", None)
if current_task is not None:
break
def get_task_id():
try:
loop = asyncio.get_event_loop()
except RuntimeError:
return None
return id(current_task(loop=loop))

This is so traces for other incoming requests don't end up mixed together. But there's no current mechanism to track async tasks that are effectively "child tasks" of the current request, and hence should be tracked the same.

https://stackoverflow.com/a/69349501/6083 suggests that you pass the task ID as an argument to the child tasks that are executed using asyncio.gather() to work around this kind of problem.

Originally posted by @simonw in #1518 (comment)

@simonw
Copy link
Owner Author

simonw commented Dec 22, 2021

Here's the full current relevant code from tracer.py:

tracers = {}
TRACE_RESERVED_KEYS = {"type", "start", "end", "duration_ms", "traceback"}
# asyncio.current_task was introduced in Python 3.7:
for obj in (asyncio, asyncio.Task):
current_task = getattr(obj, "current_task", None)
if current_task is not None:
break
def get_task_id():
try:
loop = asyncio.get_event_loop()
except RuntimeError:
return None
return id(current_task(loop=loop))
@contextmanager
def trace(type, **kwargs):
assert not TRACE_RESERVED_KEYS.intersection(
kwargs.keys()
), f".trace() keyword parameters cannot include {TRACE_RESERVED_KEYS}"
task_id = get_task_id()
if task_id is None:
yield kwargs
return
tracer = tracers.get(task_id)
if tracer is None:
yield kwargs
return
start = time.perf_counter()
yield kwargs
end = time.perf_counter()
trace_info = {
"type": type,
"start": start,
"end": end,
"duration_ms": (end - start) * 1000,
"traceback": traceback.format_list(traceback.extract_stack(limit=6)[:-3]),
}
trace_info.update(kwargs)
tracer.append(trace_info)
@contextmanager
def capture_traces(tracer):
# tracer is a list
task_id = get_task_id()
if task_id is None:
yield
return
tracers[task_id] = tracer
yield
del tracers[task_id]

@simonw
Copy link
Owner Author

simonw commented Dec 22, 2021

One way to solve this would be to introduce a set_task_id() method, which sets an ID which will be returned by get_task_id() instead of using id(current_task(loop=loop)).

It would be really nice if I could solve this using with syntax somehow. Something like:

with trace_child_tasks():
    (
        suggested_facets,
        (facet_results, facets_timed_out),
    ) = await asyncio.gather(
        execute_suggested_facets(),
        execute_facets(),
    )

@simonw
Copy link
Owner Author

simonw commented Dec 22, 2021

This article is relevant: Context information storage for asyncio - in particular the section https://blog.sqreen.com/asyncio/#context-inheritance-between-tasks which describes exactly the problem I have and their solution, which involves this trickery:

def request_task_factory(loop, coro):
    child_task = asyncio.tasks.Task(coro, loop=loop)
    parent_task = asyncio.Task.current_task(loop=loop)
    current_request = getattr(parent_task, 'current_request', None)
    setattr(child_task, 'current_request', current_request)
    return child_task

loop = asyncio.get_event_loop()
loop.set_task_factory(request_task_factory)

They released their solution as a library: https://pypi.org/project/aiocontext/ and https://github.com/sqreen/AioContext - but that company was acquired by Datadog back in April and doesn't seem to be actively maintaining their open source stuff any more: https://twitter.com/SqreenIO/status/1384906075506364417

@simonw
Copy link
Owner Author

simonw commented Dec 22, 2021

context_vars can solve this but they were introduced in Python 3.7: https://www.python.org/dev/peps/pep-0567/

Python 3.6 support ends in a few days time, and it looks like Glitch has updated to 3.7 now - so maybe I can get away with Datasette needing 3.7 these days?

Tweeted about that here: https://twitter.com/simonw/status/1473761478155010048

@simonw
Copy link
Owner Author

simonw commented Dec 23, 2021

Another option: https://github.com/Skyscanner/aiotask-context - looks like it might be better as it's been updated for Python 3.7 in this commit Skyscanner/aiotask-context@67108c9

The Skyscanner one doesn't attempt to wrap any existing factories, but that's OK for my purposes since I don't need to handle arbitrary asyncio code written by other people.

@simonw
Copy link
Owner Author

simonw commented Dec 23, 2021

It's tiny: I'm tempted to vendor it. https://github.com/Skyscanner/aiotask-context/blob/master/aiotask_context/__init__.py

No, I'll add it as a pinned dependency, which I can then drop when I drop 3.6 support.

@simonw
Copy link
Owner Author

simonw commented Dec 24, 2021

Another option would be to attempt to import contextvars and, if the import fails (for Python 3.6) continue using the current mechanism - then let Python 3.6 users know in the documentation that under Python 3.6 they will miss out on nested traces.

@simonw
Copy link
Owner Author

simonw commented Jan 20, 2022

I dropped support for Python 3.6 in fae3983 so now free to use contextvars for this.

@simonw
Copy link
Owner Author

simonw commented Feb 2, 2022

Here's what I was hacking around with when I uncovered this problem:

diff --git a/datasette/views/table.py b/datasette/views/table.py
index 77fb285..8c57d08 100644
--- a/datasette/views/table.py
+++ b/datasette/views/table.py
@@ -1,3 +1,4 @@
+import asyncio
 import urllib
 import itertools
 import json
@@ -615,44 +616,37 @@ class TableView(RowTableShared):
         if request.args.get("_timelimit"):
             extra_args["custom_time_limit"] = int(request.args.get("_timelimit"))
 
-        # Execute the main query!
-        results = await db.execute(sql, params, truncate=True, **extra_args)
-
-        # Calculate the total count for this query
-        filtered_table_rows_count = None
-        if (
-            not db.is_mutable
-            and self.ds.inspect_data
-            and count_sql == f"select count(*) from {table} "
-        ):
-            # We can use a previously cached table row count
-            try:
-                filtered_table_rows_count = self.ds.inspect_data[database]["tables"][
-                    table
-                ]["count"]
-            except KeyError:
-                pass
-
-        # Otherwise run a select count(*) ...
-        if count_sql and filtered_table_rows_count is None and not nocount:
-            try:
-                count_rows = list(await db.execute(count_sql, from_sql_params))
-                filtered_table_rows_count = count_rows[0][0]
-            except QueryInterrupted:
-                pass
-
-        # Faceting
-        if not self.ds.setting("allow_facet") and any(
-            arg.startswith("_facet") for arg in request.args
-        ):
-            raise BadRequest("_facet= is not allowed")
+        async def execute_count():
+            # Calculate the total count for this query
+            filtered_table_rows_count = None
+            if (
+                not db.is_mutable
+                and self.ds.inspect_data
+                and count_sql == f"select count(*) from {table} "
+            ):
+                # We can use a previously cached table row count
+                try:
+                    filtered_table_rows_count = self.ds.inspect_data[database][
+                        "tables"
+                    ][table]["count"]
+                except KeyError:
+                    pass
+
+            if count_sql and filtered_table_rows_count is None and not nocount:
+                try:
+                    count_rows = list(await db.execute(count_sql, from_sql_params))
+                    filtered_table_rows_count = count_rows[0][0]
+                except QueryInterrupted:
+                    pass
+
+            return filtered_table_rows_count
+
+        filtered_table_rows_count = await execute_count()
 
         # pylint: disable=no-member
         facet_classes = list(
             itertools.chain.from_iterable(pm.hook.register_facet_classes())
         )
-        facet_results = {}
-        facets_timed_out = []
         facet_instances = []
         for klass in facet_classes:
             facet_instances.append(
@@ -668,33 +662,58 @@ class TableView(RowTableShared):
                 )
             )
 
-        if not nofacet:
-            for facet in facet_instances:
-                (
-                    instance_facet_results,
-                    instance_facets_timed_out,
-                ) = await facet.facet_results()
-                for facet_info in instance_facet_results:
-                    base_key = facet_info["name"]
-                    key = base_key
-                    i = 1
-                    while key in facet_results:
-                        i += 1
-                        key = f"{base_key}_{i}"
-                    facet_results[key] = facet_info
-                facets_timed_out.extend(instance_facets_timed_out)
-
-        # Calculate suggested facets
-        suggested_facets = []
-        if (
-            self.ds.setting("suggest_facets")
-            and self.ds.setting("allow_facet")
-            and not _next
-            and not nofacet
-            and not nosuggest
-        ):
-            for facet in facet_instances:
-                suggested_facets.extend(await facet.suggest())
+        async def execute_suggested_facets():
+            # Calculate suggested facets
+            suggested_facets = []
+            if (
+                self.ds.setting("suggest_facets")
+                and self.ds.setting("allow_facet")
+                and not _next
+                and not nofacet
+                and not nosuggest
+            ):
+                for facet in facet_instances:
+                    suggested_facets.extend(await facet.suggest())
+            return suggested_facets
+
+        async def execute_facets():
+            facet_results = {}
+            facets_timed_out = []
+            if not self.ds.setting("allow_facet") and any(
+                arg.startswith("_facet") for arg in request.args
+            ):
+                raise BadRequest("_facet= is not allowed")
+
+            if not nofacet:
+                for facet in facet_instances:
+                    (
+                        instance_facet_results,
+                        instance_facets_timed_out,
+                    ) = await facet.facet_results()
+                    for facet_info in instance_facet_results:
+                        base_key = facet_info["name"]
+                        key = base_key
+                        i = 1
+                        while key in facet_results:
+                            i += 1
+                            key = f"{base_key}_{i}"
+                        facet_results[key] = facet_info
+                    facets_timed_out.extend(instance_facets_timed_out)
+
+            return facet_results, facets_timed_out
+
+        # Execute the main query, facets and facet suggestions in parallel:
+        (
+            results,
+            suggested_facets,
+            (facet_results, facets_timed_out),
+        ) = await asyncio.gather(
+            db.execute(sql, params, truncate=True, **extra_args),
+            execute_suggested_facets(),
+            execute_facets(),
+        )
+
+        results = await db.execute(sql, params, truncate=True, **extra_args)
 
         # Figure out columns and rows for the query
         columns = [r[0] for r in results.description]

It's a hacky attempt at running some of the table page queries in parallel to see what happens.

@simonw simonw added this to the Datasette 1.0 milestone Feb 5, 2022
@simonw
Copy link
Owner Author

simonw commented Feb 5, 2022

Got a prototype working with contextvars - it identified two parallel executing queries using the patch from above:

CleanShot 2022-02-04 at 20 41 50@2x

@simonw
Copy link
Owner Author

simonw commented Feb 5, 2022

Needs documentation. I'll document from datasette.tracer import trace too.

@simonw simonw closed this as completed in da53e03 Feb 5, 2022
@simonw
Copy link
Owner Author

simonw commented Feb 5, 2022

simonw added a commit that referenced this issue Feb 23, 2022
simonw added a commit that referenced this issue Mar 20, 2022
simonw added a commit that referenced this issue Mar 23, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant