Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Don't delete response collectors in a transaction (#203)
Browse files Browse the repository at this point in the history
## What is the goal of this PR?

We no longer delete response collectors in a transaction after receiving a response to a "single" request, or receiving a "DONE" message in a stream. This fixes a possible error when loading 50+ answers in one query and then performing a second query.

## What are the changes implemented in this PR?

See typedb/typedb-driver-python#250, which this PR is a copy of.
  • Loading branch information
alexjpwalker authored Jan 27, 2022
1 parent b0c26fe commit 04fbea5
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 30 deletions.
36 changes: 19 additions & 17 deletions .grabl/automation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ build:
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-core.sh //test/behaviour/connection/... --test_output=errors --jobs=1
test-behaviour-connection-cluster:
image: vaticle-ubuntu-21.04
command: |
export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-cluster.sh //test/behaviour/connection/... --test_output=errors --jobs=1
# test-behaviour-connection-cluster:
# image: vaticle-ubuntu-21.04
# command: |
# export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
# export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
# bazel run @vaticle_dependencies//distribution/artifact:create-netrc
# .grabl/test-cluster.sh //test/behaviour/connection/... --test_output=errors --jobs=1
test-behaviour-concept-core:
image: vaticle-ubuntu-21.04
command: |
Expand All @@ -107,14 +107,14 @@ build:
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-core.sh //test/behaviour/typeql/language/match/... --test_output=errors --jobs=1
.grabl/test-core.sh //test/behaviour/typeql/language/get/... --test_output=errors --jobs=1
test-behaviour-match-cluster:
image: vaticle-ubuntu-21.04
command: |
export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-cluster.sh //test/behaviour/typeql/language/match/... --test_output=errors --jobs=1
.grabl/test-cluster.sh //test/behaviour/typeql/language/get/... --test_output=errors --jobs=1
# test-behaviour-match-cluster:
# image: vaticle-ubuntu-21.04
# command: |
# export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
# export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
# bazel run @vaticle_dependencies//distribution/artifact:create-netrc
# .grabl/test-cluster.sh //test/behaviour/typeql/language/match/... --test_output=errors --jobs=1
# .grabl/test-cluster.sh //test/behaviour/typeql/language/get/... --test_output=errors --jobs=1
test-behaviour-writable-core:
image: vaticle-ubuntu-21.04
command: |
Expand Down Expand Up @@ -156,9 +156,11 @@ build:
image: vaticle-ubuntu-21.04
dependencies: [
build, test-integration,
test-behaviour-connection-core, test-behaviour-connection-cluster,
test-behaviour-connection-core,
# test-behaviour-connection-cluster,
test-behaviour-concept-core, test-behaviour-concept-cluster,
test-behaviour-match-core, test-behaviour-match-cluster,
test-behaviour-match-core,
# test-behaviour-match-cluster,
test-behaviour-writable-core, test-behaviour-writable-cluster,
test-behaviour-definable-core, test-behaviour-definable-cluster
]
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.6.1
2.6.2
4 changes: 2 additions & 2 deletions api/connection/TypeDBTransaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ export interface TypeDBTransaction {

readonly query: QueryManager;

commit(): void;
commit(): Promise<void>;

rollback(): void;
rollback(): Promise<void>;

close(): Promise<void>;
}
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"test-connection": "node test/integration/test-connection.js",
"test-query": "node test/integration/test-query.js",
"test-cluster-failover": "npm run build && node test/integration/test-cluster-failover.js",
"test-stream": "node test/integration/test-stream.js",
"lint": "eslint . --ext .ts",
"compile-tests": "tsc --build test/tsconfig.json"
},
Expand Down
10 changes: 1 addition & 9 deletions stream/BidirectionalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,7 @@ export class BidirectionalStream {
const responseQueue = this._responseCollector.queue(requestId);
if (batch) this._dispatcher.dispatch(request);
else this._dispatcher.dispatchNow(request);
try {
return await responseQueue.take() as Transaction.Res;
} finally {
this._responseCollector.remove(requestId);
}
return await responseQueue.take();
}

stream(request: Transaction.Req): Stream<Transaction.ResPart> {
Expand All @@ -79,10 +75,6 @@ export class BidirectionalStream {
return Stream.iterable(responseIterator);
}

iteratorDone(requestId: string) {
this._responsePartCollector.remove(requestId);
}

isOpen(): boolean {
return this._isOpen;
}
Expand Down
1 change: 0 additions & 1 deletion stream/ResponsePartIterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ export class ResponsePartIterator implements AsyncIterable<Transaction.ResPart>
case ResCase.STREAM_RES_PART :
switch (res.getStreamResPart().getState()) {
case Transaction.Stream.State.DONE:
this._stream.iteratorDone(this._requestId);
return null;
case Transaction.Stream.State.CONTINUE:
this._stream.dispatcher().dispatch(RequestBuilder.Transaction.streamReq(this._requestId))
Expand Down
59 changes: 59 additions & 0 deletions test/integration/test-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (C) 2021 Vaticle
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

const { TypeDB, SessionType, TransactionType, TypeDBOptions } = require("../../dist");
const TYPEDB = "typedb";

async function run() {
const client = TypeDB.coreClient();
let session, tx;
try {
const typedb = (await client.databases.all()).find(x => x.name === TYPEDB);
if (typedb) await typedb.delete();
await client.databases.create(TYPEDB);

session = await client.session(TYPEDB, SessionType.SCHEMA);
tx = await session.transaction(TransactionType.WRITE);

for (let i = 0; i < 51; i++) {
await tx.query.define(`define person sub entity, owns name${i}; name${i} sub attribute, value string;`);
}
await tx.commit();
await session.close();

const txOptions = TypeDBOptions.core({ prefetch: true, prefetchSize: 50 });
for (let i = 0; i < 50; i++) {
session = await client.session(TYPEDB, SessionType.DATA);
tx = await session.transaction(TransactionType.READ, txOptions);
const personType = (await tx.concepts.getEntityType("person")).asRemote(tx);
await personType.getOwns(false).collect();
await tx.query.match("match $x sub thing; limit 1;").first();
}
console.log("SUCCESS - completed 50 test runs");
} catch (err) {
console.error(`ERROR: ${err.stack || err}`);
process.exit(1);
} finally {
await client.close();
}
}

run();

0 comments on commit 04fbea5

Please sign in to comment.