Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes to stabilize asyncioreactor for multiple versions of python #1189

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions cassandra/io/asyncioreactor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from cassandra.connection import Connection, ConnectionShutdown
import threading

from cassandra.connection import Connection, ConnectionShutdown
import sys
import asyncio
import logging
import os
Expand Down Expand Up @@ -88,9 +90,11 @@ def __init__(self, *args, **kwargs):

self._connect_socket()
self._socket.setblocking(0)

self._write_queue = asyncio.Queue()
self._write_queue_lock = asyncio.Lock()
loop_args = dict()
if sys.version_info[0] == 3 and sys.version_info[1] < 10:
loop_args['loop'] = self._loop
self._write_queue = asyncio.Queue(**loop_args)
self._write_queue_lock = asyncio.Lock(**loop_args)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This came up when we were working on PYTHON-1313. Docs I found at the time indicated that since Python 3.7 the event loop had been derived from the current threads event loop and the "loop" param was essentially ignored. We're currently looking at 3.8 through 3.12 so this block only applies to Python 3.8.x and 3.9.x. Do you think it's worth it to continue to pass a "loop" param for those cases?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it seems to be needed, I.e. taking it down got it broke.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


# see initialize_reactor -- loop is running in a separate thread, so we
# have to use a threadsafe call
Expand All @@ -108,8 +112,11 @@ def initialize_reactor(cls):
if cls._pid != os.getpid():
cls._loop = None
if cls._loop is None:
cls._loop = asyncio.new_event_loop()
asyncio.set_event_loop(cls._loop)
try:
cls._loop = asyncio.get_running_loop()
Copy link
Collaborator

@absurdfarce absurdfarce May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 get_running_loop() is preferred as the entry point over get_event_loop() per asyncio docs. No obvious reason it wouldn't be preferred to new_event_loop() + set_event_loop() as well.

Copy link

@aisven aisven Jul 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@absurdfarce turns out that in the meantime in a fork this part was refined ⚠️

see code change at

https://github.com/scylladb/python-driver/pull/327/files#diff-932c40bc0835ee500494816d5cfa78f820240c9265c6582cd789356ea56f6447R124

and respective explanation at

scylladb#327 (comment)

except RuntimeError:
cls._loop = asyncio.new_event_loop()
asyncio.set_event_loop(cls._loop)

if not cls._loop_thread:
# daemonize so the loop will be shut down on interpreter
Expand Down Expand Up @@ -162,7 +169,7 @@ def push(self, data):
else:
chunks = [data]

if self._loop_thread.ident != get_ident():
if self._loop_thread != threading.current_thread():
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Since self._loop_thread != threading.current_thread() is a common and direct way to verify that the current thread is not the specified thread, no matter how the thread idents are typed, created, stored, updated.

asyncio.run_coroutine_threadsafe(
self._push_msg(chunks),
loop=self._loop
Expand All @@ -173,7 +180,7 @@ def push(self, data):

async def _push_msg(self, chunks):
# This lock ensures all chunks of a message are sequential in the Queue
with await self._write_queue_lock:
async with self._write_queue_lock:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 async with seems to be the correct syntax to use that lock in terms of an asyncio context manager.

for chunk in chunks:
self._write_queue.put_nowait(chunk)

Expand Down
1 change: 1 addition & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,4 @@ def is_windows():

notwindows = unittest.skipUnless(not is_windows(), "This test is not adequate for windows")
notpypy = unittest.skipUnless(not platform.python_implementation() == 'PyPy', "This tests is not suitable for pypy")
notasyncio = unittest.skipUnless(not EVENT_LOOP_MANAGER == 'asyncio', "This tests is not suitable for EVENT_LOOP_MANAGER=asyncio")
5 changes: 2 additions & 3 deletions tests/integration/cqlengine/model/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,9 @@ class SensitiveModel(Model):
rows[-1]
rows[-1:]

# Asyncio complains loudly about old syntax on python 3.7+, so get rid of all of those
relevant_warnings = [warn for warn in w if "with (yield from lock)" not in str(warn.message)]
# ignore DeprecationWarning('The loop argument is deprecated since Python 3.8, and scheduled for removal in Python 3.10.')
relevant_warnings = [warn for warn in w if "The loop argument is deprecated" not in str(warn.message)]

self.assertEqual(len(relevant_warnings), 4)
self.assertIn("__table_name_case_sensitive__ will be removed in 4.0.", str(relevant_warnings[0].message))
self.assertIn("__table_name_case_sensitive__ will be removed in 4.0.", str(relevant_warnings[1].message))
self.assertIn("ModelQuerySet indexing with negative indices support will be removed in 4.0.",
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/standard/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from cassandra import connection
from cassandra.connection import DefaultEndPoint

from tests import notwindows
from tests import notwindows, notasyncio
from tests.integration import use_singledc, get_server_versions, CASSANDRA_VERSION, \
execute_until_pass, execute_with_long_wait_retry, get_node, MockLoggingHandler, get_unsupported_lower_protocol, \
get_unsupported_upper_protocol, protocolv6, local, CASSANDRA_IP, greaterthanorequalcass30, lessthanorequalcass40, \
Expand Down Expand Up @@ -1108,6 +1108,7 @@ def test_add_profile_timeout(self):
raise Exception("add_execution_profile didn't timeout after {0} retries".format(max_retry_count))

@notwindows
@notasyncio # asyncio can't do timeouts smaller than 1ms, as this test requires
def test_execute_query_timeout(self):
with TestCluster() as cluster:
session = cluster.connect(wait_for_all_pools=True)
Expand Down