Skip to content

Commit

Permalink
Pause() and Cancel() exceptions, closes #56
Browse files Browse the repository at this point in the history
Also modified _enrichment_progress to use a new 2025 epoch timestamp

Job detail page now shows progress bar more neatly

Job detail page shows logged pause/cancel messages - refs #51
  • Loading branch information
simonw committed Jan 15, 2025
1 parent 04da41d commit 631e256
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 38 deletions.
77 changes: 65 additions & 12 deletions datasette_enrichments/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from abc import ABC, abstractmethod
import asyncio
import datetime
from datasette import hookimpl, NotFound, Response
from datasette.utils import async_call_with_supported_arguments, tilde_encode, sqlite3
from datasette_secrets import Secret, get_secret
import json
import secrets
import sys
import time
import traceback
import urllib
from datasette.plugins import pm
Expand All @@ -30,6 +32,15 @@

IdType = Union[int, str, Tuple[Union[int, str], ...]]

# Custom epoch to save space in the _enrichment_progress table
JAN_1_2025_EPOCH = int(datetime.datetime(2025, 1, 1).timestamp() * 1000)


def ms_since_2025_to_datetime(ms_since_2025):
unix_ms = JAN_1_2025_EPOCH + ms_since_2025
unix_seconds = unix_ms / 1000
return datetime.datetime.fromtimestamp(unix_seconds, tz=datetime.timezone.utc)


async def get_enrichments(datasette):
enrichments = []
Expand Down Expand Up @@ -64,8 +75,10 @@ async def get_enrichments(datasette):
create table if not exists _enrichment_progress (
id integer primary key,
job_id integer references _enrichment_jobs(id),
timestamp_ms_2025 integer, -- milliseconds since 2025-01-01
success_count integer,
error_count integer
error_count integer,
message text
)
""".strip()

Expand All @@ -91,7 +104,7 @@ async def set_job_status(
job_id: int,
status: str,
allowed_statuses: Optional[Tuple[str]] = None,
reason: Optional[str] = None,
message: Optional[str] = None,
):
if allowed_statuses:
# First check the current status
Expand All @@ -111,19 +124,36 @@ async def set_job_status(
{}
where id = :job_id
""".format(
", cancel_reason = :cancel_reason" if reason else ""
", cancel_reason = :cancel_reason"
if (message and status == "cancelled")
else ""
),
{"status": status, "job_id": job_id, "cancel_reason": reason},
{"status": status, "job_id": job_id, "cancel_reason": message},
)
progress_message = status
if message:
progress_message += ": " + message
await record_progress(db, job_id, 0, 0, progress_message)


async def record_progress(db, job_id, success_count, error_count):
async def record_progress(db, job_id, success_count, error_count, message=""):
await db.execute_write(
"""
insert into _enrichment_progress (job_id, success_count, error_count)
values (?, ?, ?)
""",
(job_id, success_count, error_count),
insert into _enrichment_progress (
job_id, timestamp_ms_2025, success_count, error_count, message
) values (
:job_id, :timestamp_ms_2025, :success_count, :error_count, {}
)
""".format(
":message" if message else "null"
),
{
"job_id": job_id,
"timestamp_ms_2025": int(time.time() * 1000) - JAN_1_2025_EPOCH,
"success_count": success_count,
"error_count": error_count,
"message": message,
},
)


Expand All @@ -148,6 +178,20 @@ class Enrichment(ABC):
default_max_errors: int = 5
log_traceback: bool = False

class Cancel(Exception):
def __init__(self, reason: Optional[str] = None):
self.reason = reason

def __str__(self) -> str:
return self.reason or "Cancelled by enrichment"

class Pause(Exception):
def __init__(self, reason: Optional[str] = None):
self.reason = reason

def __str__(self) -> str:
return self.reason or "Paused by enrichment"

@property
@abstractmethod
def slug(self):
Expand Down Expand Up @@ -408,6 +452,12 @@ async def run_enrichment():
if success_count is None:
success_count = len(rows)
await record_progress(db, job_id, success_count, 0)
except self.Cancel as ex:
await set_job_status(db, job_id, "cancelled", message=str(ex))
return
except self.Pause as ex:
await set_job_status(db, job_id, "paused", message=str(ex))
return
except Exception as ex:
await self.log_error(db, job_id, pks_for_rows(rows, pks), str(ex))
# Update next_cursor
Expand Down Expand Up @@ -730,11 +780,13 @@ class JobProgress extends HTMLElement {
`;
const template = document.createElement('template');
let h2 = `<h2><a href="#" class="job-link"></a></h2>`;
if (this.getAttribute('hide-title')) {
h2 = h2.replace('<h2>', '<h2 style="display: none;">');
}
template.innerHTML = `
<div class="container" role="region" aria-label="Task enrichment progress">
<h2>
<a href="#" class="job-link"></a>
</h2>
${h2}
<div class="progress-wrapper" role="progressbar" aria-valuemin="0" aria-valuemax="100" aria-valuenow="0">
</div>
<div class="progress-stats" aria-live="polite">
Expand Down Expand Up @@ -804,6 +856,7 @@ class JobProgress extends HTMLElement {
}
};
this.pollInterval = setInterval(poll, pollMs);
poll();
}
stopPolling() {
Expand Down
31 changes: 27 additions & 4 deletions datasette_enrichments/templates/enrichment_job.html
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ <h1>Enrichment run against <a href="{{ urls.table(job.database_name, job.table_n
{% if enrichment %}
<p>Ran <strong>{{ enrichment.name }}</strong> at {{ job.started_at }}, status is <strong>{{ job.status }}</strong>.
{% if job.error_count %}
<a href="{{ urls.table(job.database_name, "_enrichment_errors") }}?_sort_desc=id&amp;job_id={{ job.id }}">{{ job.error_count }} error{{ job.error_count == 1 and "" or "s" }}</a>
<a href="{{ urls.table(job.database_name, "_enrichment_errors") }}?_sort_desc=id&amp;job_id={{ job.id }}">{{ job.error_count }} error{% if job.error_count != 1 %}s{% endif %}</a>
{% endif %}
</p>
{% endif %}
Expand All @@ -51,10 +51,14 @@ <h1>Enrichment run against <a href="{{ urls.table(job.database_name, job.table_n
<input type="hidden" name="csrftoken" value="{{ csrftoken() }}">
<input type="submit" class="core" value="Resume">
</form>
<form style="display: inline-block;" action="{{ urls.path("/-/enrich/") }}{{ job.database_name }}/-/jobs/{{ job.id }}/cancel" method="post">
<input type="hidden" name="csrftoken" value="{{ csrftoken() }}">
<input type="submit" class="core" value="Cancel">
</form>
{% endif %}
</p>

<job-progress{% if job.status != "running" %} poll-interval="10000"{% endif %} api-url="{{ urls.path("/-/enrichment-jobs/" + job.database_name) }}/{{ job.id }}"></job-progress>
<job-progress{% if job.status != "running" %} poll-interval="5000"{% endif %} api-url="{{ urls.path("/-/enrichment-jobs/" + job.database_name) }}/{{ job.id }}" hide-title="1"></job-progress>

<style>
dt {
Expand All @@ -66,8 +70,18 @@ <h1>Enrichment run against <a href="{{ urls.table(job.database_name, job.table_n
dd dl {
margin-left: 1em;
}
.twocols {
display: flex;
gap: 2rem;
margin-top: 1rem;
}
.twocols > dl {
flex: 1;
}
</style>

<div class="twocols">

<dl>
<dt>id</dt>
<dd>{{ job.id }}</dd>
Expand All @@ -88,7 +102,6 @@ <h1>Enrichment run against <a href="{{ urls.table(job.database_name, job.table_n
<li><strong>{{ key }}</strong>: {{ value }}</li>
{% endfor %}
</ul>
</dl>
</dd>
<dt>started_at</dt>
<dd>{{ job.started_at }}</dd>
Expand All @@ -108,8 +121,18 @@ <h1>Enrichment run against <a href="{{ urls.table(job.database_name, job.table_n
<dd>{{ job.actor_id }}</dd>
</dl>

{% if messages %}
<dl>
{% for message in messages %}
<dt>{{ message.timestamp.isoformat().split('.')[0] }}</dt>
<dd>{{ message.message }}</dd>
{% endfor %}
</dl>
{% endif %}
</div>

<script>
{{ custom_element }}
{{ custom_element|safe }}
</script>

{% endblock %}
52 changes: 42 additions & 10 deletions datasette_enrichments/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ async def check_permissions(datasette, request, database):

async def job_view(datasette, request):
"Page showing details of an enrichment job"
from . import get_enrichments, ensure_tables, CUSTOM_ELEMENT_JS
from . import (
get_enrichments,
ensure_tables,
CUSTOM_ELEMENT_JS,
JAN_1_2025_EPOCH,
ms_since_2025_to_datetime,
)

job_id = request.url_vars["job_id"]
database = request.url_vars["database"]
Expand All @@ -43,6 +49,24 @@ async def job_view(datasette, request):
job["enrichment"]
) # May be None if plugin not installed

messages = [
dict(row)
for row in (
await db.execute(
"""
select timestamp_ms_2025, message
from _enrichment_progress
where job_id = ?
and message is not null
order by id
""",
(job_id,),
)
).rows
]
for message in messages:
message["timestamp"] = ms_since_2025_to_datetime(message["timestamp_ms_2025"])

job = dict(job)
config = json.loads(job["config"])
return Response.html(
Expand All @@ -54,6 +78,7 @@ async def job_view(datasette, request):
"config": config,
"enrichment": enrichment,
"custom_element": CUSTOM_ELEMENT_JS,
"messages": messages,
},
request,
)
Expand Down Expand Up @@ -360,16 +385,20 @@ async def job_progress_view(datasette, request):
)


async def pause_job(db, job_id):
async def pause_job(db, job_id, message):
from . import set_job_status

await set_job_status(db, job_id, "paused", allowed_statuses=("running",))
await set_job_status(
db, job_id, "paused", allowed_statuses=("running",), message=message
)


async def resume_job(datasette, db, job_id):
async def resume_job(datasette, db, job_id, message):
from . import set_job_status, get_enrichments

await set_job_status(db, job_id, "running", allowed_statuses=("paused",))
await set_job_status(
db, job_id, "running", allowed_statuses=("paused",), message=message
)
all_enrichments = await get_enrichments(datasette)
job = dict(
(
Expand All @@ -380,30 +409,33 @@ async def resume_job(datasette, db, job_id):
await enrichment.start_enrichment_in_process(datasette, db, job_id)


async def cancel_job(db, job_id, reason: Optional[str] = None):
async def cancel_job(db, job_id, message):
from . import set_job_status

await set_job_status(
db,
job_id,
"cancelled",
allowed_statuses=("running", "paused", "pending"),
reason=reason,
message=message,
)


async def update_job_status_view(datasette, request, action):
db = datasette.get_database(request.url_vars["database"])
message = ""
if request.actor and request.actor.get("id"):
message = "by {}".format(request.actor.get("id"))
job_id = int(request.url_vars["job_id"])
if request.method != "POST":
return Response("POST required", status=400)
try:
if action == "pause":
await pause_job(db, job_id)
await pause_job(db, job_id, message)
elif action == "resume":
await resume_job(datasette, db, job_id)
await resume_job(datasette, db, job_id, message)
elif action == "cancel":
await cancel_job(db, job_id)
await cancel_job(db, job_id, message)
except ValueError as ve:
return Response(str(ve), status=400)
return Response.redirect(
Expand Down
16 changes: 16 additions & 0 deletions docs/developing.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,22 @@ Your method can optionally return an integer count of the number of rows that we

If you do not return a count, the system will assume that a call to `enrich_batch()` which did not raise an exception processed all of the rows that were passed to it.

#### Pausing or cancelling the run

Code inside a `enrich_batch()` method can request that the run be paused or cancelled by raising special exceptions.

```python
async def enrich_batch(...):
...
if no_tokens_left:
raise self.Pause("Ran out of tokens")
```
Or to cancel the run entirely:
```python
raise self.Cancel("Code snippet did not compile")
```
Messages logged here will be visible on the job detail page.

### get_config_form()

The `get_config_form()` method can optionally be implemented to return a [WTForms](https://wtforms.readthedocs.io/) form class that the user can use to configure the enrichment.
Expand Down
4 changes: 4 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ async def enrich_batch(
):
row = rows[0]
result = await datasette.enrichment_queue.get()
if result == "pause":
raise self.Pause("pause message")
if result == "cancel":
raise self.Cancel("cancel message")
datasette.enrichment_processed_count += 1
wheres = " and ".join(f'"{pk}" = ?' for pk in pks)
await db.execute_write(
Expand Down
Loading

0 comments on commit 631e256

Please sign in to comment.