Skip to content

Commit

Permalink
Update tasks to inherit the lock manager from a parent transaction if…
Browse files Browse the repository at this point in the history
… present (#15505)
  • Loading branch information
desertaxle authored Sep 30, 2024
1 parent a55719c commit 2e695cc
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
10 changes: 10 additions & 0 deletions src/prefect/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ async def update_for_task(self: Self, task: "Task") -> Self:
Returns:
An updated result store.
"""
from prefect.transactions import get_transaction

update = {}
if task.result_storage is not None:
update["result_storage"] = await resolve_result_storage(task.result_storage)
Expand All @@ -330,12 +332,20 @@ async def update_for_task(self: Self, task: "Task") -> Self:
update["storage_key_fn"] = partial(
_format_user_supplied_storage_key, task.result_storage_key
)

# use the lock manager from a parent transaction if it exists
if (current_txn := get_transaction()) and isinstance(
current_txn.store, ResultStore
):
update["lock_manager"] = current_txn.store.lock_manager

if task.cache_policy is not None and task.cache_policy is not NotSet:
if task.cache_policy.key_storage is not None:
storage = task.cache_policy.key_storage
if isinstance(storage, str) and not len(storage.split("/")) == 2:
storage = Path(storage)
update["metadata_storage"] = await resolve_result_storage(storage)
# if the cache policy has a lock manager, it takes precedence over the parent transaction
if task.cache_policy.lock_manager is not None:
update["lock_manager"] = task.cache_policy.lock_manager

Expand Down
35 changes: 33 additions & 2 deletions tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@
from prefect.filesystems import LocalFileSystem
from prefect.futures import PrefectDistributedFuture, PrefectFuture
from prefect.locking.filesystem import FileSystemLockManager
from prefect.locking.memory import MemoryLockManager
from prefect.logging import get_run_logger
from prefect.results import ResultStore, get_or_create_default_task_scheduling_storage
from prefect.results import (
ResultStore,
get_or_create_default_task_scheduling_storage,
)
from prefect.runtime import task_run as task_run_ctx
from prefect.server import models
from prefect.settings import (
Expand All @@ -55,7 +59,13 @@
from prefect.states import State
from prefect.tasks import Task, task, task_input_hash
from prefect.testing.utilities import exceptions_equal
from prefect.transactions import CommitMode, IsolationLevel, Transaction, transaction
from prefect.transactions import (
CommitMode,
IsolationLevel,
Transaction,
get_transaction,
transaction,
)
from prefect.utilities.annotations import allow_failure, unmapped
from prefect.utilities.asyncutils import run_coro_as_sync
from prefect.utilities.collections import quote
Expand Down Expand Up @@ -5201,6 +5211,27 @@ def commit(txn):

assert "Running rollback hook" not in caplog.text

def test_run_task_in_serializable_transaction(self):
"""
Regression test for https://github.com/PrefectHQ/prefect/issues/15503
"""

@task
def my_task():
return get_transaction()

with transaction(
isolation_level=IsolationLevel.SERIALIZABLE,
store=ResultStore(lock_manager=MemoryLockManager()),
):
task_txn = my_task()

assert task_txn is not None
assert isinstance(task_txn.store, ResultStore)
# make sure the task's result store gets the lock manager from the parent transaction
assert task_txn.store.lock_manager == MemoryLockManager()
assert task_txn.isolation_level == IsolationLevel.SERIALIZABLE


class TestApplyAsync:
@pytest.mark.parametrize(
Expand Down

0 comments on commit 2e695cc

Please sign in to comment.