Skip to content

Commit

Permalink
Implementation refactor for Client-based interface (#286)
Browse files Browse the repository at this point in the history
* Implementation refactor with dropping old code for non-Client APIs
* Implemented blocking client
* Don't acquire loop before the first acquire
* Protect the first pool connection with a lock
* Drop on_connect, on_acquire and on_release hooks
* Rename concurrnecy to max_concurrency
  • Loading branch information
fantix authored Feb 2, 2022
1 parent be2c868 commit 33175f1
Show file tree
Hide file tree
Showing 30 changed files with 3,133 additions and 2,894 deletions.
4 changes: 0 additions & 4 deletions docs/api/asyncio_con.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ Client connection pool

.. py:function:: create_async_client(dsn=None, *, \
host=None, port=None, \
admin=None, \
user=None, password=None, \
database=None, \
timeout=60, \
Expand Down Expand Up @@ -100,9 +99,6 @@ Client connection pool
or the value of the ``EDGEDB_PORT`` environment variable, or ``5656``
if neither is specified.

:param admin:
If ``True``, try to connect to the special administration socket.

:param user:
The name of the database role used for authentication.

Expand Down
9 changes: 5 additions & 4 deletions edgedb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@
)
from edgedb.datatypes.datatypes import Set, Object, Array, Link, LinkSet

from .abstract import Executor, AsyncIOExecutor
from .abstract import (
Executor, AsyncIOExecutor, ReadOnlyExecutor, AsyncIOReadOnlyExecutor
)

from .asyncio_con import async_connect_raw, AsyncIOConnection
from .asyncio_pool import (
from .asyncio_client import (
create_async_client,
AsyncIOClient
)

from .blocking_con import connect, BlockingIOConnection
from .blocking_client import create_client, Client
from .options import RetryCondition, IsolationLevel, default_backoff
from .options import RetryOptions, TransactionOptions

Expand Down
110 changes: 77 additions & 33 deletions edgedb/_testbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import unittest

import edgedb
from edgedb import asyncio_client
from edgedb import blocking_client


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -163,18 +165,19 @@ def _start_cluster(*, cleanup_atexit=True):
else:
con_args['tls_ca_file'] = data['tls_cert_file']

con = edgedb.connect(password='test', **con_args)
client = edgedb.create_client(password='test', **con_args)
client.ensure_connected()
_default_cluster = {
'proc': p,
'con': con,
'client': client,
'con_args': con_args,
}

if 'tls_cert_file' in data:
# Keep the temp dir which we also copied the cert from WSL
_default_cluster['_tmpdir'] = tmpdir

atexit.register(con.close)
atexit.register(client.close)
except Exception as e:
_default_cluster = e
raise e
Expand Down Expand Up @@ -225,7 +228,7 @@ def wrapper(self, *args, __meth__=meth, **kwargs):
if try_no == 3:
raise
else:
self.loop.run_until_complete(self.con.execute(
self.loop.run_until_complete(self.client.execute(
'ROLLBACK;'
))
try_no += 1
Expand Down Expand Up @@ -319,17 +322,43 @@ def setUpClass(cls):
cls.cluster = _start_cluster(cleanup_atexit=True)


class TestAsyncIOClient(edgedb.AsyncIOClient):
def _clear_codecs_cache(self):
self._impl.codecs_registry.clear_cache()

@property
def connection(self):
return self._impl._holders[0]._con

@property
def dbname(self):
return self._impl._working_params.database


class TestClient(edgedb.Client):
@property
def connection(self):
return self._impl._holders[0]._con


class ConnectedTestCaseMixin:

@classmethod
async def connect(cls, *,
cluster=None,
database='edgedb',
user='edgedb',
password='test'):
def test_client(
cls, *,
cluster=None,
database='edgedb',
user='edgedb',
password='test',
connection_class=asyncio_client.AsyncIOConnection,
):
conargs = cls.get_connect_args(
cluster=cluster, database=database, user=user, password=password)
return await edgedb.async_connect_raw(**conargs)
return TestAsyncIOClient(
connection_class=connection_class,
max_concurrency=1,
**conargs,
)

@classmethod
def get_connect_args(cls, *,
Expand Down Expand Up @@ -358,33 +387,34 @@ class DatabaseTestCase(ClusterTestCase, ConnectedTestCaseMixin):
INTERNAL_TESTMODE = True

BASE_TEST_CLASS = True
TEARDOWN_RETRY_DROP_DB = 1

def setUp(self):
if self.INTERNAL_TESTMODE:
self.loop.run_until_complete(
self.con.execute(
self.client.execute(
'CONFIGURE SESSION SET __internal_testmode := true;'))

if self.SETUP_METHOD:
self.loop.run_until_complete(
self.con.execute(self.SETUP_METHOD))
self.client.execute(self.SETUP_METHOD))

super().setUp()

def tearDown(self):
try:
if self.TEARDOWN_METHOD:
self.loop.run_until_complete(
self.con.execute(self.TEARDOWN_METHOD))
self.client.execute(self.TEARDOWN_METHOD))
finally:
try:
if self.con.is_in_transaction():
if self.client.connection.is_in_transaction():
raise AssertionError(
'test connection is still in transaction '
'*after* the test')

self.loop.run_until_complete(
self.con.execute('RESET ALIAS *;'))
self.client.execute('RESET ALIAS *;'))

finally:
super().tearDown()
Expand All @@ -394,26 +424,25 @@ def setUpClass(cls):
super().setUpClass()
dbname = cls.get_database_name()

cls.admin_conn = None
cls.con = None
cls.admin_client = None

class_set_up = os.environ.get('EDGEDB_TEST_CASES_SET_UP')

# Only open an extra admin connection if necessary.
if not class_set_up:
script = f'CREATE DATABASE {dbname};'
cls.admin_conn = cls.loop.run_until_complete(cls.connect())
cls.loop.run_until_complete(cls.admin_conn.execute(script))
cls.admin_client = cls.test_client()
cls.loop.run_until_complete(cls.admin_client.execute(script))

cls.con = cls.loop.run_until_complete(cls.connect(database=dbname))
cls.client = cls.test_client(database=dbname)

if not class_set_up:
script = cls.get_setup_script()
if script:
# The setup is expected to contain a CREATE MIGRATION,
# which needs to be wrapped in a transaction.
async def execute():
async for tr in cls.con.transaction():
async for tr in cls.client.transaction():
async with tr:
await tr.execute(script)
cls.loop.run_until_complete(execute())
Expand Down Expand Up @@ -482,27 +511,37 @@ def tearDownClass(cls):
try:
if script:
cls.loop.run_until_complete(
cls.con.execute(script))
cls.client.execute(script))
finally:
try:
cls.loop.run_until_complete(cls.con.aclose())
cls.loop.run_until_complete(cls.client.aclose())

if not class_set_up:
dbname = cls.get_database_name()
script = f'DROP DATABASE {dbname};'

cls.loop.run_until_complete(
cls.admin_conn.execute(script))
retry = cls.TEARDOWN_RETRY_DROP_DB
for i in range(retry):
try:
cls.loop.run_until_complete(
cls.admin_client.execute(script))
except edgedb.errors.ExecutionError:
if i < retry - 1:
time.sleep(0.1)
else:
raise
except edgedb.errors.UnknownDatabaseError:
break

except Exception:
log.exception('error running teardown')
# skip the exception so that original error is shown instead
# of finalizer error
finally:
try:
if cls.admin_conn is not None:
if cls.admin_client is not None:
cls.loop.run_until_complete(
cls.admin_conn.aclose())
cls.admin_client.aclose())
finally:
super().tearDownClass()

Expand All @@ -513,23 +552,28 @@ class AsyncQueryTestCase(DatabaseTestCase):

class SyncQueryTestCase(DatabaseTestCase):
BASE_TEST_CLASS = True
TEARDOWN_RETRY_DROP_DB = 5

def setUp(self):
super().setUp()

cls = type(self)
cls.async_con = cls.con
cls.async_client = cls.client

conargs = cls.get_connect_args().copy()
conargs.update(dict(database=cls.async_con.dbname))
conargs.update(dict(database=cls.async_client.dbname))

cls.con = edgedb.connect(**conargs)
cls.client = TestClient(
connection_class=blocking_client.BlockingIOConnection,
max_concurrency=1,
**conargs
)

def tearDown(self):
cls = type(self)
cls.con.close()
cls.con = cls.async_con
del cls.async_con
cls.client.close()
cls.client = cls.async_client
del cls.async_client


_lock_cnt = 0
Expand Down
Loading

0 comments on commit 33175f1

Please sign in to comment.