diff --git a/google/cloud/ndb/_cache.py b/google/cloud/ndb/_cache.py index b611f8e9..13c16928 100644 --- a/google/cloud/ndb/_cache.py +++ b/google/cloud/ndb/_cache.py @@ -14,6 +14,7 @@ import functools import itertools +import logging import uuid import warnings @@ -22,6 +23,7 @@ from google.cloud.ndb import _batch from google.cloud.ndb import context as context_module from google.cloud.ndb import tasklets +from google.cloud.ndb import utils _LOCKED_FOR_READ = b"0-" _LOCKED_FOR_WRITE = b"00" @@ -29,6 +31,7 @@ _PREFIX = b"NDB30" warnings.filterwarnings("always", module=__name__) +log = logging.getLogger(__name__) class ContextCache(dict): @@ -583,20 +586,28 @@ def future_info(self, key, value): @tasklets.tasklet -def global_lock_for_read(key): +def global_lock_for_read(key, prev_value): """Lock a key for a read (lookup) operation by setting a special value. Lock may be preempted by a parallel write (put) operation. Args: key (bytes): The key to lock. + prev_value (bytes): The cache value previously read from the global cache. + Should be either :data:`None` or an empty bytes object if a key was written + recently. Returns: tasklets.Future: Eventual result will be lock value (``bytes``) written to Datastore for the given key, or :data:`None` if the lock was not acquired. """ lock = _LOCKED_FOR_READ + str(uuid.uuid4()).encode("ascii") - lock_acquired = yield global_set_if_not_exists(key, lock, expires=_LOCK_TIME) + if prev_value is not None: + yield global_watch(key, prev_value) + lock_acquired = yield global_compare_and_swap(key, lock, expires=_LOCK_TIME) + else: + lock_acquired = yield global_set_if_not_exists(key, lock, expires=_LOCK_TIME) + if lock_acquired: raise tasklets.Return(lock) @@ -618,6 +629,7 @@ def global_lock_for_write(key): """ lock = "." + str(uuid.uuid4()) lock = lock.encode("ascii") + utils.logging_debug(log, "lock for write: {}", lock) def new_value(old_value): if old_value and old_value.startswith(_LOCKED_FOR_WRITE): @@ -634,8 +646,7 @@ def new_value(old_value): def global_unlock_for_write(key, lock): """Remove a lock for key by updating or removing a lock value. - The lock represented by the ``lock`` argument will be released. If no other locks - remain, the key will be deleted. + The lock represented by the ``lock`` argument will be released. Args: key (bytes): The key to lock. @@ -645,9 +656,15 @@ def global_unlock_for_write(key, lock): Returns: tasklets.Future: Eventual result will be :data:`None`. """ + utils.logging_debug(log, "unlock for write: {}", lock) def new_value(old_value): - return old_value.replace(lock, b"") + assert lock in old_value, "attempt to remove lock that isn't present" + value = old_value.replace(lock, b"") + if value == _LOCKED_FOR_WRITE: + value = b"" + + return value cache = _global_cache() try: @@ -663,19 +680,22 @@ def _update_key(key, new_value): while not success: old_value = yield _global_get(key) + utils.logging_debug(log, "old value: {}", old_value) + value = new_value(old_value) - if value == _LOCKED_FOR_WRITE: - # No more locks for this key, we can delete - yield _global_delete(key) - break + utils.logging_debug(log, "new value: {}", value) - if old_value: + if old_value is not None: + utils.logging_debug(log, "compare and swap") yield _global_watch(key, old_value) success = yield _global_compare_and_swap(key, value, expires=_LOCK_TIME) else: + utils.logging_debug(log, "set if not exists") success = yield global_set_if_not_exists(key, value, expires=_LOCK_TIME) + utils.logging_debug(log, "success: {}", success) + def is_locked_value(value): """Check if the given value is the special reserved value for key lock. diff --git a/google/cloud/ndb/_datastore_api.py b/google/cloud/ndb/_datastore_api.py index 80c11546..74dfd73f 100644 --- a/google/cloud/ndb/_datastore_api.py +++ b/google/cloud/ndb/_datastore_api.py @@ -142,12 +142,12 @@ def lookup(key, options): result = yield _cache.global_get(cache_key) key_locked = _cache.is_locked_value(result) if not key_locked: - if result is not None: + if result: entity_pb = entity_pb2.Entity() entity_pb.MergeFromString(result) elif use_datastore: - lock = yield _cache.global_lock_for_read(cache_key) + lock = yield _cache.global_lock_for_read(cache_key, result) if lock: yield _cache.global_watch(cache_key, lock) diff --git a/google/cloud/ndb/context.py b/google/cloud/ndb/context.py index 91520609..fdfe0ccb 100644 --- a/google/cloud/ndb/context.py +++ b/google/cloud/ndb/context.py @@ -17,14 +17,43 @@ import collections import contextlib +import itertools +import os import six import threading +import uuid from google.cloud.ndb import _eventloop from google.cloud.ndb import exceptions from google.cloud.ndb import key as key_module +def _generate_context_ids(): + """Generate a sequence of context ids. + + Useful for debugging complicated interactions among concurrent processes and + threads. + + The return value is a generator for strings that include the machine's "node", + acquired via `uuid.getnode()`, the current process id, and a sequence number which + increases monotonically starting from one in each process. The combination of all + three is sufficient to uniquely identify the context in which a particular piece of + code is being run. Each context, as it is created, is assigned the next id in this + sequence. The context id is used by `utils.logging_debug` to grant insight into + where a debug logging statement is coming from in a cloud evironment. + + Returns: + Generator[str]: Sequence of context ids. + """ + prefix = "{}-{}-".format(uuid.getnode(), os.getpid()) + for sequence_number in itertools.count(1): # pragma NO BRANCH + # pragma is required because this loop never exits (infinite sequence) + yield prefix + str(sequence_number) + + +_context_ids = _generate_context_ids() + + try: # pragma: NO PY2 COVER import contextvars @@ -199,6 +228,7 @@ def policy(key): _ContextTuple = collections.namedtuple( "_ContextTuple", [ + "id", "client", "namespace", "eventloop", @@ -234,6 +264,7 @@ class _Context(_ContextTuple): def __new__( cls, client, + id=None, namespace=key_module.UNDEFINED, eventloop=None, batches=None, @@ -255,6 +286,9 @@ def __new__( # Prevent circular import in Python 2.7 from google.cloud.ndb import _cache + if id is None: + id = next(_context_ids) + if eventloop is None: eventloop = _eventloop.EventLoop() @@ -272,6 +306,7 @@ def __new__( context = super(_Context, cls).__new__( cls, + id=id, client=client, namespace=namespace, eventloop=eventloop, diff --git a/google/cloud/ndb/utils.py b/google/cloud/ndb/utils.py index 8853bd18..6b4c1535 100644 --- a/google/cloud/ndb/utils.py +++ b/google/cloud/ndb/utils.py @@ -87,6 +87,13 @@ def logging_debug(log, message, *args, **kwargs): message = str(message) if args or kwargs: message = message.format(*args, **kwargs) + + from google.cloud.ndb import context as context_module + + context = context_module.get_context(False) + if context: + message = "{}: {}".format(context.id, message) + log.debug(message) diff --git a/tests/system/test_crud.py b/tests/system/test_crud.py index 34d737a1..4b2d1249 100644 --- a/tests/system/test_crud.py +++ b/tests/system/test_crud.py @@ -96,7 +96,6 @@ class SomeKind(ndb.Model): baz = ndb.StringProperty() global_cache = global_cache_module._InProcessGlobalCache() - cache_dict = global_cache_module._InProcessGlobalCache.cache with client_context.new(global_cache=global_cache).use() as context: context.set_global_cache_policy(None) # Use default @@ -108,7 +107,9 @@ class SomeKind(ndb.Model): assert entity.baz == "night" cache_key = _cache.global_cache_key(key._key) - assert cache_key in cache_dict + cache_value = global_cache.get([cache_key])[0] + assert cache_value + assert not _cache.is_locked_value(cache_value) patch = mock.patch( "google.cloud.ndb._datastore_api._LookupBatch.add", @@ -140,7 +141,9 @@ class SomeKind(ndb.Model): assert entity.baz == "night" cache_key = _cache.global_cache_key(key._key) - assert redis_context.global_cache.redis.get(cache_key) is not None + cache_value = redis_context.global_cache.redis.get(cache_key) + assert cache_value + assert not _cache.is_locked_value(cache_value) patch = mock.patch( "google.cloud.ndb._datastore_api._LookupBatch.add", @@ -173,7 +176,9 @@ class SomeKind(ndb.Model): cache_key = _cache.global_cache_key(key._key) cache_key = global_cache_module.MemcacheCache._key(cache_key) - assert memcache_context.global_cache.client.get(cache_key) is not None + cache_value = memcache_context.global_cache.client.get(cache_key) + assert cache_value + assert not _cache.is_locked_value(cache_value) patch = mock.patch( "google.cloud.ndb._datastore_api._LookupBatch.add", @@ -574,7 +579,6 @@ class SomeKind(ndb.Model): bar = ndb.StringProperty() global_cache = global_cache_module._InProcessGlobalCache() - cache_dict = global_cache_module._InProcessGlobalCache.cache with client_context.new(global_cache=global_cache).use() as context: context.set_global_cache_policy(None) # Use default @@ -582,18 +586,22 @@ class SomeKind(ndb.Model): key = entity.put() dispose_of(key._key) cache_key = _cache.global_cache_key(key._key) - assert not cache_dict + cache_value = global_cache.get([cache_key])[0] + assert not cache_value retrieved = key.get() assert retrieved.foo == 42 assert retrieved.bar == "none" - assert cache_key in cache_dict + cache_value = global_cache.get([cache_key])[0] + assert cache_value + assert not _cache.is_locked_value(cache_value) entity.foo = 43 entity.put() - assert cache_key not in cache_dict + cache_value = global_cache.get([cache_key])[0] + assert not cache_value @pytest.mark.skipif(not USE_REDIS_CACHE, reason="Redis is not configured") @@ -606,18 +614,22 @@ class SomeKind(ndb.Model): key = entity.put() dispose_of(key._key) cache_key = _cache.global_cache_key(key._key) - assert redis_context.global_cache.redis.get(cache_key) is None + cache_value = redis_context.global_cache.redis.get(cache_key) + assert not cache_value retrieved = key.get() assert retrieved.foo == 42 assert retrieved.bar == "none" - assert redis_context.global_cache.redis.get(cache_key) is not None + cache_value = redis_context.global_cache.redis.get(cache_key) + assert cache_value + assert not _cache.is_locked_value(cache_value) entity.foo = 43 entity.put() - assert redis_context.global_cache.redis.get(cache_key) is None + cache_value = redis_context.global_cache.redis.get(cache_key) + assert not cache_value @pytest.mark.skipif(not USE_MEMCACHE, reason="Memcache is not configured") @@ -631,18 +643,22 @@ class SomeKind(ndb.Model): dispose_of(key._key) cache_key = _cache.global_cache_key(key._key) cache_key = global_cache_module.MemcacheCache._key(cache_key) - assert memcache_context.global_cache.client.get(cache_key) is None + cache_value = memcache_context.global_cache.client.get(cache_key) + assert not cache_value retrieved = key.get() assert retrieved.foo == 42 assert retrieved.bar == "none" - assert memcache_context.global_cache.client.get(cache_key) is not None + cache_value = memcache_context.global_cache.client.get(cache_key) + assert cache_value + assert not _cache.is_locked_value(cache_value) entity.foo = 43 entity.put() - assert memcache_context.global_cache.client.get(cache_key) is None + cache_value = memcache_context.global_cache.client.get(cache_key) + assert not cache_value @pytest.mark.usefixtures("client_context") @@ -771,19 +787,22 @@ class SomeKind(ndb.Model): key = ndb.Key(KIND, entity_id) cache_key = _cache.global_cache_key(key._key) global_cache = global_cache_module._InProcessGlobalCache() - cache_dict = global_cache_module._InProcessGlobalCache.cache with client_context.new(global_cache=global_cache).use(): assert key.get().foo == 42 - assert cache_key in cache_dict + cache_value = global_cache.get([cache_key])[0] + assert cache_value + assert not _cache.is_locked_value(cache_value) assert key.delete() is None - assert cache_key not in cache_dict + cache_value = global_cache.get([cache_key])[0] + assert not cache_value # This is py27 behavior. Not entirely sold on leaving _LOCKED value for # Datastore misses. assert key.get() is None - assert cache_dict[cache_key][0].startswith(b"0-") + cache_value = global_cache.get([cache_key])[0] + assert _cache.is_locked_value(cache_value) @pytest.mark.skipif(not USE_REDIS_CACHE, reason="Redis is not configured") @@ -798,15 +817,19 @@ class SomeKind(ndb.Model): cache_key = _cache.global_cache_key(key._key) assert key.get().foo == 42 - assert redis_context.global_cache.redis.get(cache_key) is not None + cache_value = redis_context.global_cache.redis.get(cache_key) + assert cache_value + assert not _cache.is_locked_value(cache_value) assert key.delete() is None - assert redis_context.global_cache.redis.get(cache_key) is None + cache_value = redis_context.global_cache.redis.get(cache_key) + assert not cache_value # This is py27 behavior. Not entirely sold on leaving _LOCKED value for # Datastore misses. assert key.get() is None - assert redis_context.global_cache.redis.get(cache_key).startswith(b"0-") + cache_value = redis_context.global_cache.redis.get(cache_key) + assert _cache.is_locked_value(cache_value) @pytest.mark.skipif(not USE_MEMCACHE, reason="Memcache is not configured") @@ -822,15 +845,19 @@ class SomeKind(ndb.Model): cache_key = global_cache_module.MemcacheCache._key(cache_key) assert key.get().foo == 42 - assert memcache_context.global_cache.client.get(cache_key) is not None + cache_value = memcache_context.global_cache.client.get(cache_key) + assert cache_value + assert not _cache.is_locked_value(cache_value) assert key.delete() is None - assert memcache_context.global_cache.client.get(cache_key) is None + cache_value = memcache_context.global_cache.client.get(cache_key) + assert not cache_value # This is py27 behavior. Not entirely sold on leaving _LOCKED value for # Datastore misses. assert key.get() is None - assert memcache_context.global_cache.client.get(cache_key).startswith(b"0-") + cache_value = memcache_context.global_cache.client.get(cache_key) + assert _cache.is_locked_value(cache_value) @pytest.mark.usefixtures("client_context") diff --git a/tests/unit/test__cache.py b/tests/unit/test__cache.py index 7eb3f519..bd222daf 100644 --- a/tests/unit/test__cache.py +++ b/tests/unit/test__cache.py @@ -827,17 +827,35 @@ class Test_global_lock_for_read: @mock.patch("google.cloud.ndb._cache.global_set_if_not_exists") def test_lock_acquired(global_set_if_not_exists): global_set_if_not_exists.return_value = _future_result(True) - assert ( - _cache.global_lock_for_read(b"key") - .result() - .startswith(_cache._LOCKED_FOR_READ) - ) + lock = _cache.global_lock_for_read(b"key", None).result() + assert lock.startswith(_cache._LOCKED_FOR_READ) @staticmethod @mock.patch("google.cloud.ndb._cache.global_set_if_not_exists") def test_lock_not_acquired(global_set_if_not_exists): global_set_if_not_exists.return_value = _future_result(False) - assert _cache.global_lock_for_read(b"key").result() is None + lock = _cache.global_lock_for_read(b"key", None).result() + assert lock is None + + @staticmethod + @mock.patch("google.cloud.ndb._cache.global_compare_and_swap") + @mock.patch("google.cloud.ndb._cache.global_watch") + def test_recently_written_and_lock_acquired(global_watch, global_compare_and_swap): + global_watch.return_value = _future_result(True) + global_compare_and_swap.return_value = _future_result(True) + lock = _cache.global_lock_for_read(b"key", _cache._LOCKED_FOR_WRITE).result() + assert lock.startswith(_cache._LOCKED_FOR_READ) + + @staticmethod + @mock.patch("google.cloud.ndb._cache.global_compare_and_swap") + @mock.patch("google.cloud.ndb._cache.global_watch") + def test_recently_written_and_lock_not_acquired( + global_watch, global_compare_and_swap + ): + global_watch.return_value = _future_result(True) + global_compare_and_swap.return_value = _future_result(False) + lock = _cache.global_lock_for_read(b"key", _cache._LOCKED_FOR_WRITE).result() + assert lock is None @pytest.mark.usefixtures("in_context") @@ -914,10 +932,13 @@ def test_not_first_time_fail_once( class Test_global_unlock_for_write: @staticmethod @mock.patch("google.cloud.ndb._cache.uuid") - @mock.patch("google.cloud.ndb._cache._global_delete") + @mock.patch("google.cloud.ndb._cache._global_compare_and_swap") + @mock.patch("google.cloud.ndb._cache._global_watch") @mock.patch("google.cloud.ndb._cache._global_get") @mock.patch("google.cloud.ndb._cache._global_cache") - def test_last_time(_global_cache, _global_get, _global_delete, uuid): + def test_last_time( + _global_cache, _global_get, _global_watch, _global_compare_and_swap, uuid + ): lock = b".arandomuuid" _global_cache.return_value = mock.Mock( @@ -928,18 +949,20 @@ def test_last_time(_global_cache, _global_get, _global_delete, uuid): lock_value = _cache._LOCKED_FOR_WRITE + lock _global_get.return_value = _future_result(lock_value) - _global_delete.return_value = _future_result(None) + _global_watch.return_value = _future_result(None) + _global_compare_and_swap.return_value = _future_result(True) assert _cache.global_unlock_for_write(b"key", lock).result() is None _global_get.assert_called_once_with(b"key") - _global_delete.assert_called_once_with(b"key") + _global_watch.assert_called_once_with(b"key", lock_value) + _global_compare_and_swap.assert_called_once_with(b"key", b"", expires=32) @staticmethod @mock.patch("google.cloud.ndb._cache.uuid") - @mock.patch("google.cloud.ndb._cache._global_delete") + @mock.patch("google.cloud.ndb._cache._global_watch") @mock.patch("google.cloud.ndb._cache._global_get") @mock.patch("google.cloud.ndb._cache._global_cache") - def test_transient_error(_global_cache, _global_get, _global_delete, uuid): + def test_transient_error(_global_cache, _global_get, _global_watch, uuid): class TransientError(Exception): pass @@ -953,11 +976,11 @@ class TransientError(Exception): lock_value = _cache._LOCKED_FOR_WRITE + lock _global_get.return_value = _future_result(lock_value) - _global_delete.return_value = _future_exception(TransientError()) + _global_watch.return_value = _future_exception(TransientError()) assert _cache.global_unlock_for_write(b"key", lock).result() is None _global_get.assert_called_once_with(b"key") - _global_delete.assert_called_once_with(b"key") + _global_watch.assert_called_once_with(b"key", lock_value) @staticmethod @mock.patch("google.cloud.ndb._cache.uuid") @@ -1009,6 +1032,7 @@ def test_not_last_time_fail_once( def test_is_locked_value(): assert _cache.is_locked_value(_cache._LOCKED_FOR_READ) assert _cache.is_locked_value(_cache._LOCKED_FOR_WRITE + b"whatever") + assert not _cache.is_locked_value(b"") assert not _cache.is_locked_value(b"new db, who dis?") assert not _cache.is_locked_value(None) diff --git a/tests/unit/test__datastore_api.py b/tests/unit/test__datastore_api.py index d2e3a0ee..f5cb0246 100644 --- a/tests/unit/test__datastore_api.py +++ b/tests/unit/test__datastore_api.py @@ -714,7 +714,7 @@ class SomeKind(model.Model): future = _api.put(model._entity_to_ds_entity(entity), _options.Options()) assert future.result() is None - assert global_cache.get([cache_key]) == [None] + assert not global_cache.get([cache_key])[0] @staticmethod @mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch") @@ -733,7 +733,7 @@ class SomeKind(model.Model): future = _api.put(model._entity_to_ds_entity(entity), _options.Options()) assert future.result() == key._key - assert global_cache.get([cache_key]) == [None] + assert not global_cache.get([cache_key])[0] @staticmethod @mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch") @@ -760,7 +760,8 @@ class SomeKind(model.Model): for callback in callbacks: callback() - assert cache_key not in global_cache.cache # unlocked by callback + # lock removed by callback + assert not global_cache.get([cache_key])[0] @staticmethod @mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch") @@ -869,7 +870,7 @@ def test_cache_enabled(Batch, global_cache): future = _api.delete(key._key, _options.Options()) assert future.result() is None - assert global_cache.get([cache_key]) == [None] + assert not global_cache.get([cache_key])[0] @staticmethod @mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch") @@ -892,7 +893,8 @@ def test_w_transaction(Batch, global_cache): for callback in callbacks: callback() - assert cache_key not in global_cache.cache # lock removed by callback + # lock removed by callback + assert not global_cache.get([cache_key])[0] @staticmethod @mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch") diff --git a/tests/unit/test_concurrency.py b/tests/unit/test_concurrency.py new file mode 100644 index 00000000..742cbc09 --- /dev/null +++ b/tests/unit/test_concurrency.py @@ -0,0 +1,151 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import logging + +try: + from unittest import mock +except ImportError: # pragma: NO PY3 COVER + import mock + +from google.cloud.ndb import _cache +from google.cloud.ndb import _eventloop +from google.cloud.ndb import global_cache as global_cache_module +from google.cloud.ndb import tasklets +from google.cloud.ndb import utils + + +log = logging.getLogger(__name__) + + +class Delay(object): + """A tasklet wrapper which delays the return of a tasklet. + + Used to orchestrate timing of events in async code to test particular scenarios + involving concurrency. Use with `mock.patch` to replace particular tasklets with + wrapped versions. When those tasklets are called, they will execute and then the + wrapper will hang on to the result until :meth:`Delay.advance()` is called, at which + time the tasklet's caller will receive the result. + + Args: + wrapped (tasklets.Tasklet): The tasklet to be delayed. + """ + + def __init__(self, wrapped): + self.wrapped = wrapped + self.info = "Delay {}".format(self.wrapped.__name__) + self._futures = collections.deque() + + @tasklets.tasklet + def __call__(self, *args, **kwargs): + future = tasklets.Future(self.info) + self._futures.append(future) + + result = yield self.wrapped(*args, **kwargs) + yield future + raise tasklets.Return(result) + + def advance(self): + """Allow a call to the wrapper to proceed. + + Calls are advanced in the order in which they were orignally made. + """ + self._futures.popleft().set_result(None) + + +def run_until(): + """Do all queued work on the event loop. + + This will allow any currently running tasklets to execute up to the point that they + hit a call to a tasklet that is delayed by :class:`Delay`. When this call is + finished, either all in progress tasklets will have been completed, or a call to + :class:`Delay.advance` will be required to move execution forward again. + """ + while _eventloop.run1(): + pass + + +def test_global_cache_concurrent_writes_692(in_context): + """Regression test for #692 + + https://github.com/googleapis/python-ndb/issues/692 + """ + key = b"somekey" + + @tasklets.tasklet + def run_test(): + lock1 = yield _cache.global_lock_for_write(key) + lock2, _ = yield ( + _cache.global_lock_for_write(key), + _cache.global_unlock_for_write(key, lock1), + ) + yield _cache.global_unlock_for_write(key, lock2) + + delay_global_get = Delay(_cache.global_get) + with mock.patch("google.cloud.ndb._cache._global_get", delay_global_get): + global_cache = global_cache_module._InProcessGlobalCache() + with in_context.new(global_cache=global_cache).use(): + future = run_test() + + # Run until the global_cache_get call in the first global_lock_for_write + # call + run_until() + utils.logging_debug(log, "zero") + + # Let the first global_cache_get call return and advance to the + # global_cache_get calls in the first call to global_unlock_for_write and + # second call to global_lock_for_write. They will have both gotten the same + # "old" value from the cache + delay_global_get.advance() + run_until() + utils.logging_debug(log, "one") + + # Let the global_cache_get call return in the second global_lock_for_write + # call. It should write a new lock value containing both locks. + delay_global_get.advance() + run_until() + utils.logging_debug(log, "two") + + # Let the global_cache_get call return in the first global_unlock_for_write + # call. Since its "old" cache value contained only the first lock, it might + # think it's done and delete the key, since as far as it's concerned, there + # are no more locks. This is the bug exposed by this test. + delay_global_get.advance() + run_until() + utils.logging_debug(log, "three") + + # Since we've fixed the bug now, what we expect it to do instead is attempt + # to write a new cache value that is a write lock value but contains no + # locks. This attempt will fail since the cache value was changed out from + # under it by the second global_lock_write call occurring in parallel. When + # this attempt fails it will call global_get again to get the new value + # containing both locks and recompute a value that only includes the second + # lock and write it. + delay_global_get.advance() + run_until() + utils.logging_debug(log, "four") + + # Now the last call to global_unlock_for_write will call global_get to get + # the current lock value with only one write lock, and then write an empty + # write lock. + delay_global_get.advance() + run_until() + utils.logging_debug(log, "five") + + # Make sure we can get to the end without raising an exception + future.result() + + # Make sure the empty write lock registers as "not locked". + assert not _cache.is_locked_value(_cache.global_get(key).result())