Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Don't delete response collectors in a transaction #250

Merged
merged 4 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 34 additions & 32 deletions .grabl/automation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,20 @@ build:
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporarily comment these tests while they are failing due to unreleased Cluster

bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-core.sh //tests/behaviour/connection/... --test_output=errors --jobs=1
test-behaviour-connection-cluster:
image: vaticle-ubuntu-21.04
type: foreground
command: |
pyenv global 3.6.10
pip3 install -U pip
pip install -r requirements_dev.txt
sudo unlink /usr/bin/python3
sudo ln -s $(which python3) /usr/bin/python3
sudo ln -s /usr/share/pyshared/lsb_release.py /opt/pyenv/versions/3.6.10/lib/python3.6/site-packages/lsb_release.py
export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-cluster.sh //tests/behaviour/connection/... --test_output=errors --jobs=1
# test-behaviour-connection-cluster:
# image: vaticle-ubuntu-21.04
# type: foreground
# command: |
# pyenv global 3.6.10
# pip3 install -U pip
# pip install -r requirements_dev.txt
# sudo unlink /usr/bin/python3
# sudo ln -s $(which python3) /usr/bin/python3
# sudo ln -s /usr/share/pyshared/lsb_release.py /opt/pyenv/versions/3.6.10/lib/python3.6/site-packages/lsb_release.py
# export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
# export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
# bazel run @vaticle_dependencies//distribution/artifact:create-netrc
# .grabl/test-cluster.sh //tests/behaviour/connection/... --test_output=errors --jobs=1
test-behaviour-concept-core:
image: vaticle-ubuntu-21.04
type: foreground
Expand Down Expand Up @@ -121,21 +121,21 @@ build:
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-core.sh //tests/behaviour/typeql/language/match/... --test_output=errors
.grabl/test-core.sh //tests/behaviour/typeql/language/get/... --test_output=errors
test-behaviour-match-cluster:
image: vaticle-ubuntu-21.04
type: foreground
command: |
pyenv global 3.6.10
pip3 install -U pip
pip install -r requirements_dev.txt
sudo unlink /usr/bin/python3
sudo ln -s $(which python3) /usr/bin/python3
sudo ln -s /usr/share/pyshared/lsb_release.py /opt/pyenv/versions/3.6.10/lib/python3.6/site-packages/lsb_release.py
export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-cluster.sh //tests/behaviour/typeql/language/match/... --test_output=errors
.grabl/test-cluster.sh //tests/behaviour/typeql/language/get/... --test_output=errors
# test-behaviour-match-cluster:
# image: vaticle-ubuntu-21.04
# type: foreground
# command: |
# pyenv global 3.6.10
# pip3 install -U pip
# pip install -r requirements_dev.txt
# sudo unlink /usr/bin/python3
# sudo ln -s $(which python3) /usr/bin/python3
# sudo ln -s /usr/share/pyshared/lsb_release.py /opt/pyenv/versions/3.6.10/lib/python3.6/site-packages/lsb_release.py
# export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
# export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
# bazel run @vaticle_dependencies//distribution/artifact:create-netrc
# .grabl/test-cluster.sh //tests/behaviour/typeql/language/match/... --test_output=errors
# .grabl/test-cluster.sh //tests/behaviour/typeql/language/get/... --test_output=errors
test-behaviour-writable-core:
image: vaticle-ubuntu-21.04
type: foreground
Expand Down Expand Up @@ -212,14 +212,16 @@ build:
export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
bazel test //tests:test_cluster_failover --test_output=errors
bazel test //tests/integration:test_cluster_failover --test_output=errors
deploy-pip-snapshot:
image: vaticle-ubuntu-21.04
dependencies: [
build,
test-behaviour-connection-core, test-behaviour-connection-cluster,
test-behaviour-connection-core,
# test-behaviour-connection-cluster,
test-behaviour-concept-core, test-behaviour-concept-cluster,
test-behaviour-match-core, test-behaviour-match-cluster,
test-behaviour-match-core,
# test-behaviour-match-cluster,
test-behaviour-writable-core, test-behaviour-writable-cluster,
test-behaviour-definable-core, test-behaviour-definable-cluster,
test-failover-cluster
Expand Down
36 changes: 0 additions & 36 deletions tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ load("//tools:cluster_test_rule.bzl", "typedb_cluster_py_test")
load("@vaticle_bazel_distribution//artifact:rules.bzl", "artifact_extractor")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We restructured the test package a little, introducing a specific BUILD file for the integration tests only which did not exist before

load("@vaticle_typedb_common//test:rules.bzl", "native_typedb_artifact")
load("@vaticle_dependencies//tool/checkstyle:rules.bzl", "checkstyle_test")
load("@rules_python//python:defs.bzl", "py_library", "py_test")

native_typedb_artifact(
name = "native-typedb-artifact",
Expand All @@ -49,46 +48,11 @@ checkstyle_test(
include = glob([
"*",
"deployment/*",
"integration/*",
]),
license_type = "apache",
size = "small",
)

py_test(
name = "test_debug",
srcs = [
"integration/test_debug.py",
],
deps = [
"//:client_python",
],
python_version = "PY3"
)

typedb_cluster_py_test(
name = "test_cluster_failover",
srcs = [
"integration/test_cluster_failover.py",
],
deps = [
"//:client_python",
],
size = "medium",
native_typedb_cluster_artifact = ":native-typedb-cluster-artifact",
)

py_test(
name = "test_concurrent",
srcs = [
"integration/test_concurrent.py",
],
deps = [
"//:client_python",
],
python_version = "PY3"
)

artifact_extractor(
name = "typedb-extractor",
artifact = ":native-typedb-artifact",
Expand Down
53 changes: 53 additions & 0 deletions tests/integration/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new BUILD file that was introduced specifically for integration tests

# Copyright (C) 2021 Vaticle
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

load("//tools:cluster_test_rule.bzl", "typedb_cluster_py_test")
load("@vaticle_dependencies//tool/checkstyle:rules.bzl", "checkstyle_test")
load("@rules_python//python:defs.bzl", "py_test")

typedb_cluster_py_test(
name = "test_cluster_failover",
srcs = ["test_cluster_failover.py"],
deps = ["//:client_python"],
size = "medium",
native_typedb_cluster_artifact = "//tests:native-typedb-cluster-artifact",
)

py_test(
name = "test_debug",
srcs = ["test_debug.py"],
deps = ["//:client_python"],
python_version = "PY3"
)

py_test(
name = "test_stream",
srcs = ["test_stream.py"],
deps = ["//:client_python"],
python_version = "PY3"
)

checkstyle_test(
name = "checkstyle",
include = glob(["*"]),
license_type = "apache",
size = "small",
)
55 changes: 0 additions & 55 deletions tests/integration/test_concurrent.py

This file was deleted.

59 changes: 59 additions & 0 deletions tests/integration/test_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We deleted test_concurrent because it had no purpose other than a debugging playground, and we already have test_debug for that. Now we introduce test_stream, with the goal of one day running it in CI as it contains a useful test case (that failed without this PR, but passes with this PR). However we're not adding it to CI just yet as there is substantial technical debt in the test infrastructure blocking it and we have other priorities.

# Copyright (C) 2021 Vaticle
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

import unittest
from unittest import TestCase

from typedb.client import *

TYPEDB = "typedb"
SCHEMA = SessionType.SCHEMA
DATA = SessionType.DATA
READ = TransactionType.READ
WRITE = TransactionType.WRITE


class TestStream(TestCase):

def setUp(self):
with TypeDB.core_client("127.0.0.1:1729") as client:
if TYPEDB not in [db.name() for db in client.databases().all()]:
client.databases().create(TYPEDB)

def test_multiple_done_response_handling(self):
with TypeDB.core_client(TypeDB.DEFAULT_ADDRESS) as client:
with client.session(TYPEDB, SCHEMA) as session, session.transaction(WRITE) as tx:
for i in range(51):
tx.query().define(f"define person sub entity, owns name{i}; name{i} sub attribute, value string;")
tx.commit()
# With these options (the default in TypeDB at time of writing), the server may respond with:
# 50 answers -> CONTINUE -> 1 answer [compensating for latency] -> DONE. The client will respond to
# CONTINUE with STREAM to keep iterating, and the server responds to STREAM with a 2nd DONE message.
# This is expected and should be handled correctly (ie: ignored) by the client.
tx_options = TypeDBOptions.core().set_prefetch(True).set_prefetch_size(50)
for i in range(50):
with client.session(TYPEDB, DATA) as session, session.transaction(READ, tx_options) as tx:
person_type = tx.concepts().get_thing_type("person").as_entity_type().as_remote(tx)
_attrs = list(person_type.get_owns(keys_only=False))
next(tx.query().match("match $x sub thing; limit 1;"))

if __name__ == "__main__":
unittest.main(verbosity=2)
2 changes: 1 addition & 1 deletion typedb/common/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(self, code: int, message: str):
MISSING_DB_NAME = ClientErrorMessage(7, "Database name cannot be empty.")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If UNKNOWN_REQUEST_ID had printed out the response, we could've saved some time debugging this time round. We're now printing out the response to aid debugging in the future.

Of course, if it ends up printing sensitive data, the reporter can mask it when reporting the error.

DB_DOES_NOT_EXIST = ClientErrorMessage(8, "The database '%s' does not exist.")
MISSING_RESPONSE = ClientErrorMessage(9, "Unexpected empty response for request ID '%s'.")
UNKNOWN_REQUEST_ID = ClientErrorMessage(10, "Received a response with unknown request id '%s'.")
UNKNOWN_REQUEST_ID = ClientErrorMessage(10, "Received a response with unknown request id '%s':\n%s")
CLUSTER_NO_PRIMARY_REPLICA_YET = ClientErrorMessage(11, "No replica has been marked as the primary replica for latest known term '%d'.")
CLUSTER_UNABLE_TO_CONNECT = ClientErrorMessage(12, "Unable to connect to TypeDB Cluster. Attempted connecting to the cluster members, but none are available: '%s'.")
CLUSTER_REPLICA_NOT_PRIMARY = ClientErrorMessage(13, "The replica is not the primary replica.")
Expand Down
6 changes: 1 addition & 5 deletions typedb/stream/bidirectional_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ def stream(self, req: transaction_proto.Transaction.Req) -> Iterator[transaction
self._dispatcher.dispatch(req)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we remove done from BidirectionalStream. Response Collectors will now be bound to the lifetime of the transaction. We noticed this is also more in line with the terminology we use, where you "close" a Collector precisely when the enclosing transaction is closed.

return ResponsePartIterator(request_id, self)

def done(self, request_id: UUID):
self._response_collector.remove(request_id)

def is_open(self) -> bool:
return self._is_open.get()

Expand Down Expand Up @@ -103,7 +100,7 @@ def _collect(self, response: Union[transaction_proto.Transaction.Res, transactio
if collector:
collector.put(response)
else:
raise TypeDBClientException.of(UNKNOWN_REQUEST_ID, request_id)
raise TypeDBClientException.of(UNKNOWN_REQUEST_ID, (request_id, str(response)))

def dispatcher(self):
return self._dispatcher
Expand Down Expand Up @@ -137,7 +134,6 @@ def __init__(self, request_id: UUID, stream: "BidirectionalStream"):

def get(self) -> T:
value = self._stream.fetch(self._request_id)
self._stream.done(self._request_id)
return value


Expand Down
10 changes: 5 additions & 5 deletions typedb/stream/response_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@ def get(self, block: bool) -> R:
response = self._response_queue.get(block=block)
if response.is_value():
return response.value
elif response.is_done() and self._error is None:
raise TypeDBClientException.of(TRANSACTION_CLOSED)
elif response.is_done() and self._error is not None:
raise TypeDBClientException.of(TRANSACTION_CLOSED_WITH_ERRORS, self._error)
elif response.is_done():
self._raise_transaction_closed_error()
else:
raise TypeDBClientException.of(ILLEGAL_STATE)

def _raise_transaction_closed_error(self):
raise TypeDBClientException.of(TRANSACTION_CLOSED_WITH_ERRORS, self._error) if self._error else TypeDBClientException.of(TRANSACTION_CLOSED)

def put(self, response: R):
self._response_queue.put(ValueResponse(response))

Expand All @@ -79,7 +80,6 @@ def close(self, error: Optional[TypeDBClientException]):
self._response_queue.put(DoneResponse())



class Response:

def is_value(self):
Expand Down
1 change: 0 additions & 1 deletion typedb/stream/response_part_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ def __next__(self) -> transaction_proto.Transaction.ResPart:
if self._bidirectional_stream.get_error() is not None:
raise self._bidirectional_stream.get_error()
elif not self._has_next():
self._bidirectional_stream.done(self._request_id)
raise StopIteration
else:
self._state = ResponsePartIterator.State.EMPTY
Expand Down