Skip to content

Commit

Permalink
Accept args in execute() and use the new Execute message (#310)
Browse files Browse the repository at this point in the history
* Accept args in execute()
* Rename OptimisticExecute to Execute
* Implement execute() with io_format=DISCARD
* Rename IoFormat to OutputFormat
* Rename DISCARD to NONE
* Make simple_query legacy
  • Loading branch information
fantix authored Jun 13, 2022
1 parent 7a393ec commit 5298701
Show file tree
Hide file tree
Showing 12 changed files with 268 additions and 169 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
strategy:
matrix:
python-version: ["3.6", "3.7", "3.8", "3.9", "3.10"]
edgedb-version: [stable, nightly]
edgedb-version: [stable] # , nightly]
os: [ubuntu-latest, macos-latest, windows-2019]
loop: [asyncio, uvloop]
exclude:
Expand Down
37 changes: 26 additions & 11 deletions edgedb/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class QueryCache(typing.NamedTuple):


class QueryOptions(typing.NamedTuple):
io_format: protocol.IoFormat
output_format: protocol.OutputFormat
expect_one: bool
required_one: bool

Expand All @@ -41,33 +41,38 @@ class QueryContext(typing.NamedTuple):
retry_options: typing.Optional[options.RetryOptions]


class ScriptContext(typing.NamedTuple):
query: QueryWithArgs
cache: QueryCache


_query_opts = QueryOptions(
io_format=protocol.IoFormat.BINARY,
output_format=protocol.OutputFormat.BINARY,
expect_one=False,
required_one=False,
)
_query_single_opts = QueryOptions(
io_format=protocol.IoFormat.BINARY,
output_format=protocol.OutputFormat.BINARY,
expect_one=True,
required_one=False,
)
_query_required_single_opts = QueryOptions(
io_format=protocol.IoFormat.BINARY,
output_format=protocol.OutputFormat.BINARY,
expect_one=True,
required_one=True,
)
_query_json_opts = QueryOptions(
io_format=protocol.IoFormat.JSON,
output_format=protocol.OutputFormat.JSON,
expect_one=False,
required_one=False,
)
_query_single_json_opts = QueryOptions(
io_format=protocol.IoFormat.JSON,
output_format=protocol.OutputFormat.JSON,
expect_one=True,
required_one=False,
)
_query_required_single_json_opts = QueryOptions(
io_format=protocol.IoFormat.JSON,
output_format=protocol.OutputFormat.JSON,
expect_one=True,
required_one=True,
)
Expand Down Expand Up @@ -143,11 +148,16 @@ def query_required_single_json(self, query: str, *args, **kwargs) -> str:
retry_options=self._get_retry_options(),
))

# TODO(tailhook) add *args, **kwargs, when they are supported
@abc.abstractmethod
def execute(self, query: str) -> None:
def _execute(self, script: ScriptContext):
...

def execute(self, query: str, *args, **kwargs) -> None:
self._execute(ScriptContext(
query=QueryWithArgs(query, args, kwargs),
cache=self._get_query_cache(),
))


class Executor(ReadOnlyExecutor):
"""Subclasses can execute both read-only and modification queries"""
Expand Down Expand Up @@ -222,11 +232,16 @@ async def query_required_single_json(
retry_options=self._get_retry_options(),
))

# TODO(tailhook) add *args, **kwargs, when they are supported
@abc.abstractmethod
async def execute(self, query: str) -> None:
async def _execute(self, script: ScriptContext) -> None:
...

async def execute(self, query: str, *args, **kwargs) -> None:
await self._execute(ScriptContext(
query=QueryWithArgs(query, args, kwargs),
cache=self._get_query_cache(),
))


class AsyncIOExecutor(AsyncIOReadOnlyExecutor):
"""Subclasses can execute both read-only and modification queries"""
Expand Down
52 changes: 40 additions & 12 deletions edgedb/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,21 @@ async def connect(self, *, single_attempt=False):
iteration += 1
await self.sleep(0.01 + random.random() * 0.2)

async def privileged_execute(self, query):
await self._protocol.simple_query(query, enums.Capability.ALL)
async def privileged_execute(self, script: abstract.ScriptContext):
if self._protocol.is_legacy:
await self._protocol.legacy_simple_query(
script.query.query, enums.Capability.ALL
)
else:
await self._protocol.execute(
query=script.query.query,
args=script.query.args,
kwargs=script.query.kwargs,
reg=script.cache.codecs_registry,
qc=script.cache.query_cache,
output_format=protocol.OutputFormat.NONE,
allow_capabilities=enums.Capability.ALL,
)

def is_in_transaction(self) -> bool:
"""Return True if Connection is currently inside a transaction.
Expand All @@ -199,14 +212,14 @@ async def raw_query(self, query_context: abstract.QueryContext):
if self._protocol.is_legacy:
execute = self._protocol.legacy_execute_anonymous
else:
execute = self._protocol.execute_anonymous
execute = self._protocol.query
return await execute(
query=query_context.query.query,
args=query_context.query.args,
kwargs=query_context.query.kwargs,
reg=query_context.cache.codecs_registry,
qc=query_context.cache.query_cache,
io_format=query_context.query_options.io_format,
output_format=query_context.query_options.output_format,
expect_one=query_context.query_options.expect_one,
required_one=query_context.query_options.required_one,
allow_capabilities=enums.Capability.EXECUTE,
Expand All @@ -218,8 +231,8 @@ async def raw_query(self, query_context: abstract.QueryContext):
raise e
if capabilities is None:
cache_item = query_context.cache.query_cache.get(
query=query_context.query.query,
io_format=query_context.query_options.io_format,
query_context.query.query,
query_context.query_options.output_format,
implicit_limit=0,
inline_typenames=False,
inline_typeids=False,
Expand All @@ -241,10 +254,25 @@ async def raw_query(self, query_context: abstract.QueryContext):
await self.sleep(rule.backoff(i))
reconnect = self.is_closed()

async def execute(self, query: str) -> None:
await self._protocol.simple_query(
query, enums.Capability.EXECUTE
)
async def _execute(self, script: abstract.ScriptContext) -> None:
if self._protocol.is_legacy:
if script.query.args or script.query.kwargs:
raise errors.InterfaceError(
"Legacy protocol doesn't support arguments in execute()"
)
await self._protocol.legacy_simple_query(
script.query.query, enums.Capability.EXECUTE
)
else:
await self._protocol.execute(
query=script.query.query,
args=script.query.args,
kwargs=script.query.kwargs,
reg=script.cache.codecs_registry,
qc=script.cache.query_cache,
output_format=protocol.OutputFormat.NONE,
allow_capabilities=enums.Capability.EXECUTE,
)

def terminate(self):
if not self.is_closed():
Expand Down Expand Up @@ -689,10 +717,10 @@ async def _query(self, query_context: abstract.QueryContext):
finally:
await self._impl.release(con)

async def execute(self, query: str) -> None:
async def _execute(self, script: abstract.ScriptContext) -> None:
con = await self._impl.acquire()
try:
await con.execute(query)
await con._execute(script)
finally:
await self._impl.release(con)

Expand Down
8 changes: 4 additions & 4 deletions edgedb/blocking_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ async def _ensure_transaction(self):
def _query(self, query_context: abstract.QueryContext):
return self._client._iter_coroutine(super()._query(query_context))

def execute(self, query: str) -> None:
self._client._iter_coroutine(super().execute(query))
def _execute(self, script: abstract.ScriptContext) -> None:
self._client._iter_coroutine(super()._execute(script))


class Retry(transaction.BaseRetry):
Expand Down Expand Up @@ -301,8 +301,8 @@ def _iter_coroutine(self, coro):
def _query(self, query_context: abstract.QueryContext):
return self._iter_coroutine(super()._query(query_context))

def execute(self, query: str) -> None:
self._iter_coroutine(super().execute(query))
def _execute(self, script: abstract.ScriptContext) -> None:
self._iter_coroutine(super()._execute(script))

def ensure_connected(self):
self._iter_coroutine(self._impl.ensure_connected())
Expand Down
4 changes: 2 additions & 2 deletions edgedb/protocol/consts.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ DEF PREPARE_MSG = b'P'
DEF PREPARE_COMPLETE_MSG = b'1'
DEF DESCRIBE_STMT_MSG = b'D'
DEF STMT_DATA_DESC_MSG = b'T'
DEF EXECUTE_MSG = b'E'
DEF OPTIMISTIC_EXECUTE_MSG = b'O'
DEF LEGACY_EXECUTE_MSG = b'E'
DEF EXECUTE_MSG = b'O'
DEF EXECUTE_SCRIPT_MSG = b'Q'
DEF TERMINATE_MSG = b'X'

Expand Down
12 changes: 9 additions & 3 deletions edgedb/protocol/protocol.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ include "./codecs/codecs.pxd"
ctypedef object (*decode_row_method)(BaseCodec, FRBuffer *buf)


cpdef enum IoFormat:
cpdef enum OutputFormat:
BINARY = b'b'
JSON = b'j'
JSON_ELEMENTS = b'J'
NONE = b'n'


cdef enum TransactionStatus:
Expand Down Expand Up @@ -71,7 +72,7 @@ cdef class QueryCodecsCache:
cdef:
LRUMapping queries

cdef set(self, str query, IoFormat io_format,
cdef set(self, str query, OutputFormat output_format,
int implicit_limit, bint inline_typenames, bint inline_typeids,
bint expect_one, bint has_na_cardinality,
BaseCodec in_type, BaseCodec out_type, int capabilities)
Expand Down Expand Up @@ -109,7 +110,12 @@ cdef class SansIOProtocol:
cdef parse_describe_type_message(self, CodecsRegistry reg)
cdef parse_type_data(self, CodecsRegistry reg)
cdef _amend_parse_error(
self, exc, IoFormat io_format, bint expect_one, bint required_one)
self,
exc,
OutputFormat output_format,
bint expect_one,
bint required_one,
)

cdef inline ignore_headers(self)
cdef dict parse_headers(self)
Expand Down
Loading

0 comments on commit 5298701

Please sign in to comment.