From 04fbea548b3e6299f2a3cc3cfb398e3ea57b706f Mon Sep 17 00:00:00 2001 From: Alex Walker Date: Thu, 27 Jan 2022 12:26:00 +0000 Subject: [PATCH] Don't delete response collectors in a transaction (#203) ## What is the goal of this PR? We no longer delete response collectors in a transaction after receiving a response to a "single" request, or receiving a "DONE" message in a stream. This fixes a possible error when loading 50+ answers in one query and then performing a second query. ## What are the changes implemented in this PR? See https://github.com/vaticle/typedb-client-python/pull/250, which this PR is a copy of. --- .grabl/automation.yml | 36 +++++++++--------- VERSION | 2 +- api/connection/TypeDBTransaction.ts | 4 +- package.json | 1 + stream/BidirectionalStream.ts | 10 +---- stream/ResponsePartIterator.ts | 1 - test/integration/test-stream.js | 59 +++++++++++++++++++++++++++++ 7 files changed, 83 insertions(+), 30 deletions(-) create mode 100644 test/integration/test-stream.js diff --git a/.grabl/automation.yml b/.grabl/automation.yml index 1546baee3..aa1374120 100644 --- a/.grabl/automation.yml +++ b/.grabl/automation.yml @@ -77,13 +77,13 @@ build: export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD bazel run @vaticle_dependencies//distribution/artifact:create-netrc .grabl/test-core.sh //test/behaviour/connection/... --test_output=errors --jobs=1 - test-behaviour-connection-cluster: - image: vaticle-ubuntu-21.04 - command: | - export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME - export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD - bazel run @vaticle_dependencies//distribution/artifact:create-netrc - .grabl/test-cluster.sh //test/behaviour/connection/... --test_output=errors --jobs=1 +# test-behaviour-connection-cluster: +# image: vaticle-ubuntu-21.04 +# command: | +# export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME +# export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD +# bazel run @vaticle_dependencies//distribution/artifact:create-netrc +# .grabl/test-cluster.sh //test/behaviour/connection/... --test_output=errors --jobs=1 test-behaviour-concept-core: image: vaticle-ubuntu-21.04 command: | @@ -107,14 +107,14 @@ build: bazel run @vaticle_dependencies//distribution/artifact:create-netrc .grabl/test-core.sh //test/behaviour/typeql/language/match/... --test_output=errors --jobs=1 .grabl/test-core.sh //test/behaviour/typeql/language/get/... --test_output=errors --jobs=1 - test-behaviour-match-cluster: - image: vaticle-ubuntu-21.04 - command: | - export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME - export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD - bazel run @vaticle_dependencies//distribution/artifact:create-netrc - .grabl/test-cluster.sh //test/behaviour/typeql/language/match/... --test_output=errors --jobs=1 - .grabl/test-cluster.sh //test/behaviour/typeql/language/get/... --test_output=errors --jobs=1 +# test-behaviour-match-cluster: +# image: vaticle-ubuntu-21.04 +# command: | +# export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME +# export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD +# bazel run @vaticle_dependencies//distribution/artifact:create-netrc +# .grabl/test-cluster.sh //test/behaviour/typeql/language/match/... --test_output=errors --jobs=1 +# .grabl/test-cluster.sh //test/behaviour/typeql/language/get/... --test_output=errors --jobs=1 test-behaviour-writable-core: image: vaticle-ubuntu-21.04 command: | @@ -156,9 +156,11 @@ build: image: vaticle-ubuntu-21.04 dependencies: [ build, test-integration, - test-behaviour-connection-core, test-behaviour-connection-cluster, + test-behaviour-connection-core, +# test-behaviour-connection-cluster, test-behaviour-concept-core, test-behaviour-concept-cluster, - test-behaviour-match-core, test-behaviour-match-cluster, + test-behaviour-match-core, +# test-behaviour-match-cluster, test-behaviour-writable-core, test-behaviour-writable-cluster, test-behaviour-definable-core, test-behaviour-definable-cluster ] diff --git a/VERSION b/VERSION index 6a6a3d8e3..097a15a2a 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.6.1 +2.6.2 diff --git a/api/connection/TypeDBTransaction.ts b/api/connection/TypeDBTransaction.ts index 3e950285e..2eb8f7bdc 100644 --- a/api/connection/TypeDBTransaction.ts +++ b/api/connection/TypeDBTransaction.ts @@ -40,9 +40,9 @@ export interface TypeDBTransaction { readonly query: QueryManager; - commit(): void; + commit(): Promise; - rollback(): void; + rollback(): Promise; close(): Promise; } diff --git a/package.json b/package.json index d0c7029b9..07d4f93e0 100644 --- a/package.json +++ b/package.json @@ -19,6 +19,7 @@ "test-connection": "node test/integration/test-connection.js", "test-query": "node test/integration/test-query.js", "test-cluster-failover": "npm run build && node test/integration/test-cluster-failover.js", + "test-stream": "node test/integration/test-stream.js", "lint": "eslint . --ext .ts", "compile-tests": "tsc --build test/tsconfig.json" }, diff --git a/stream/BidirectionalStream.ts b/stream/BidirectionalStream.ts index 301a4ad7e..3ab9af9a9 100644 --- a/stream/BidirectionalStream.ts +++ b/stream/BidirectionalStream.ts @@ -63,11 +63,7 @@ export class BidirectionalStream { const responseQueue = this._responseCollector.queue(requestId); if (batch) this._dispatcher.dispatch(request); else this._dispatcher.dispatchNow(request); - try { - return await responseQueue.take() as Transaction.Res; - } finally { - this._responseCollector.remove(requestId); - } + return await responseQueue.take(); } stream(request: Transaction.Req): Stream { @@ -79,10 +75,6 @@ export class BidirectionalStream { return Stream.iterable(responseIterator); } - iteratorDone(requestId: string) { - this._responsePartCollector.remove(requestId); - } - isOpen(): boolean { return this._isOpen; } diff --git a/stream/ResponsePartIterator.ts b/stream/ResponsePartIterator.ts index 924e105dd..378540e32 100644 --- a/stream/ResponsePartIterator.ts +++ b/stream/ResponsePartIterator.ts @@ -60,7 +60,6 @@ export class ResponsePartIterator implements AsyncIterable case ResCase.STREAM_RES_PART : switch (res.getStreamResPart().getState()) { case Transaction.Stream.State.DONE: - this._stream.iteratorDone(this._requestId); return null; case Transaction.Stream.State.CONTINUE: this._stream.dispatcher().dispatch(RequestBuilder.Transaction.streamReq(this._requestId)) diff --git a/test/integration/test-stream.js b/test/integration/test-stream.js new file mode 100644 index 000000000..3500ea04b --- /dev/null +++ b/test/integration/test-stream.js @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2021 Vaticle + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +const { TypeDB, SessionType, TransactionType, TypeDBOptions } = require("../../dist"); +const TYPEDB = "typedb"; + +async function run() { + const client = TypeDB.coreClient(); + let session, tx; + try { + const typedb = (await client.databases.all()).find(x => x.name === TYPEDB); + if (typedb) await typedb.delete(); + await client.databases.create(TYPEDB); + + session = await client.session(TYPEDB, SessionType.SCHEMA); + tx = await session.transaction(TransactionType.WRITE); + + for (let i = 0; i < 51; i++) { + await tx.query.define(`define person sub entity, owns name${i}; name${i} sub attribute, value string;`); + } + await tx.commit(); + await session.close(); + + const txOptions = TypeDBOptions.core({ prefetch: true, prefetchSize: 50 }); + for (let i = 0; i < 50; i++) { + session = await client.session(TYPEDB, SessionType.DATA); + tx = await session.transaction(TransactionType.READ, txOptions); + const personType = (await tx.concepts.getEntityType("person")).asRemote(tx); + await personType.getOwns(false).collect(); + await tx.query.match("match $x sub thing; limit 1;").first(); + } + console.log("SUCCESS - completed 50 test runs"); + } catch (err) { + console.error(`ERROR: ${err.stack || err}`); + process.exit(1); + } finally { + await client.close(); + } +} + +run();