Skip to content

Commit

Permalink
Experimental execute_isolated_fn method
Browse files Browse the repository at this point in the history
  • Loading branch information
simonw committed Dec 19, 2023
1 parent 067cc75 commit f27bef4
Showing 1 changed file with 42 additions and 11 deletions.
53 changes: 42 additions & 11 deletions datasette/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,25 +159,43 @@ def count_params(params):
kwargs["count"] = count
return results

async def execute_isolated_fn(self, fn):
# Open a new connection just for the duration of this function
# blocking the write queue to avoid any writes occurring during it
if self.ds.executor is None:
# non-threaded mode
isolated_connection = self.connect(write=True)
try:
result = fn(isolated_connection)
finally:
isolated_connection.close()
self._all_file_connections.remove(isolated_connection)
return result
else:
# Threaded mode - send to write thread
return await self._send_to_write_thread(fn, isolated_connection=True)

async def execute_write_fn(self, fn, block=True):
if self.ds.executor is None:
# non-threaded mode
if self._write_connection is None:
self._write_connection = self.connect(write=True)
self.ds._prepare_connection(self._write_connection, self.name)
return fn(self._write_connection)
else:
return await self._send_to_write_thread(fn, block)

# threaded mode
task_id = uuid.uuid5(uuid.NAMESPACE_DNS, "datasette.io")
async def _send_to_write_thread(self, fn, block=True, isolated_connection=False):
if self._write_queue is None:
self._write_queue = queue.Queue()
if self._write_thread is None:
self._write_thread = threading.Thread(
target=self._execute_writes, daemon=True
)
self._write_thread.start()
task_id = uuid.uuid5(uuid.NAMESPACE_DNS, "datasette.io")
reply_queue = janus.Queue()
self._write_queue.put(WriteTask(fn, task_id, reply_queue))
self._write_queue.put(WriteTask(fn, task_id, reply_queue, isolated_connection))
if block:
result = await reply_queue.async_q.get()
if isinstance(result, Exception):
Expand All @@ -202,12 +220,24 @@ def _execute_writes(self):
if conn_exception is not None:
result = conn_exception
else:
try:
result = task.fn(conn)
except Exception as e:
sys.stderr.write("{}\n".format(e))
sys.stderr.flush()
result = e
if task.isolated_connection:
isolated_connection = self.connect(write=True)
try:
result = task.fn(isolated_connection)
except Exception as e:
sys.stderr.write("{}\n".format(e))
sys.stderr.flush()
result = e
finally:
isolated_connection.close()
self._all_file_connections.remove(isolated_connection)
else:
try:
result = task.fn(conn)
except Exception as e:
sys.stderr.write("{}\n".format(e))
sys.stderr.flush()
result = e
task.reply_queue.sync_q.put(result)

async def execute_fn(self, fn):
Expand Down Expand Up @@ -515,12 +545,13 @@ def __repr__(self):


class WriteTask:
__slots__ = ("fn", "task_id", "reply_queue")
__slots__ = ("fn", "task_id", "reply_queue", "isolated_connection")

def __init__(self, fn, task_id, reply_queue):
def __init__(self, fn, task_id, reply_queue, isolated_connection):
self.fn = fn
self.task_id = task_id
self.reply_queue = reply_queue
self.isolated_connection = isolated_connection


class QueryInterrupted(Exception):
Expand Down

1 comment on commit f27bef4

@simonw
Copy link
Owner Author

@simonw simonw commented on f27bef4 Dec 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.