From b0c26feefd311028829b4b000b1b6d3333c4965b Mon Sep 17 00:00:00 2001 From: Joshua Send Date: Fri, 21 Jan 2022 17:13:24 +0000 Subject: [PATCH] Store error on transaction stream and clean up completed query state (#202) ## What is the goal of this PR? Multiplexed query streams over the transaction stream are cleaned up when complete. We also store any exceptions received against the transaction stream (as well as query streams) in order to propagate the error to any future transaction operations, not just open query streams. ## What are the changes implemented in this PR? * Remove single and iterator response queues when they are finished * store errors against query streams and transaction streams, so we can error on transaction operations against a closed transaction --- .grabl/automation.yml | 34 ++++++++++++------------ connection/TypeDBTransactionImpl.ts | 7 +++-- query/QueryManagerImpl.ts | 34 ++++++++++++------------ stream/BidirectionalStream.ts | 40 +++++++++++++++++++---------- stream/ResponseCollector.ts | 18 +++---------- stream/ResponsePartIterator.ts | 11 +++++--- 6 files changed, 75 insertions(+), 69 deletions(-) diff --git a/.grabl/automation.yml b/.grabl/automation.yml index 5f6060e5b..1546baee3 100644 --- a/.grabl/automation.yml +++ b/.grabl/automation.yml @@ -77,13 +77,13 @@ build: export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD bazel run @vaticle_dependencies//distribution/artifact:create-netrc .grabl/test-core.sh //test/behaviour/connection/... --test_output=errors --jobs=1 -# test-behaviour-connection-cluster: -# image: vaticle-ubuntu-21.04 -# command: | -# export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME -# export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD -# bazel run @vaticle_dependencies//distribution/artifact:create-netrc -# .grabl/test-cluster.sh //test/behaviour/connection/... --test_output=errors --jobs=1 + test-behaviour-connection-cluster: + image: vaticle-ubuntu-21.04 + command: | + export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME + export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD + bazel run @vaticle_dependencies//distribution/artifact:create-netrc + .grabl/test-cluster.sh //test/behaviour/connection/... --test_output=errors --jobs=1 test-behaviour-concept-core: image: vaticle-ubuntu-21.04 command: | @@ -107,14 +107,14 @@ build: bazel run @vaticle_dependencies//distribution/artifact:create-netrc .grabl/test-core.sh //test/behaviour/typeql/language/match/... --test_output=errors --jobs=1 .grabl/test-core.sh //test/behaviour/typeql/language/get/... --test_output=errors --jobs=1 -# test-behaviour-match-cluster: -# image: vaticle-ubuntu-21.04 -# command: | -# export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME -# export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD -# bazel run @vaticle_dependencies//distribution/artifact:create-netrc -# .grabl/test-cluster.sh //test/behaviour/typeql/language/match/... --test_output=errors --jobs=1 -# .grabl/test-cluster.sh //test/behaviour/typeql/language/get/... --test_output=errors --jobs=1 + test-behaviour-match-cluster: + image: vaticle-ubuntu-21.04 + command: | + export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME + export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD + bazel run @vaticle_dependencies//distribution/artifact:create-netrc + .grabl/test-cluster.sh //test/behaviour/typeql/language/match/... --test_output=errors --jobs=1 + .grabl/test-cluster.sh //test/behaviour/typeql/language/get/... --test_output=errors --jobs=1 test-behaviour-writable-core: image: vaticle-ubuntu-21.04 command: | @@ -156,9 +156,9 @@ build: image: vaticle-ubuntu-21.04 dependencies: [ build, test-integration, - test-behaviour-connection-core, #test-behaviour-connection-cluster, + test-behaviour-connection-core, test-behaviour-connection-cluster, test-behaviour-concept-core, test-behaviour-concept-cluster, - test-behaviour-match-core, #test-behaviour-match-cluster, + test-behaviour-match-core, test-behaviour-match-cluster, test-behaviour-writable-core, test-behaviour-writable-cluster, test-behaviour-definable-core, test-behaviour-definable-cluster ] diff --git a/connection/TypeDBTransactionImpl.ts b/connection/TypeDBTransactionImpl.ts index e9c692fa2..f5ea5868a 100644 --- a/connection/TypeDBTransactionImpl.ts +++ b/connection/TypeDBTransactionImpl.ts @@ -34,7 +34,6 @@ import {LogicManagerImpl} from "../logic/LogicManagerImpl"; import {QueryManagerImpl} from "../query/QueryManagerImpl"; import {BidirectionalStream} from "../stream/BidirectionalStream"; import {TypeDBSessionImpl} from "./TypeDBSessionImpl"; -import assert = require("assert"); import TRANSACTION_CLOSED = ErrorMessage.Client.TRANSACTION_CLOSED; import TRANSACTION_CLOSED_WITH_ERRORS = ErrorMessage.Client.TRANSACTION_CLOSED_WITH_ERRORS; import ILLEGAL_STATE = ErrorMessage.Internal.ILLEGAL_STATE; @@ -120,8 +119,8 @@ export class TypeDBTransactionImpl implements TypeDBTransaction.Extended { private throwTransactionClosed(): void { if (this.isOpen()) throw new TypeDBClientError(ILLEGAL_STATE); - const errors = this._bidirectionalStream.getErrors(); - if (errors.length == 0) throw new TypeDBClientError(TRANSACTION_CLOSED); - else throw new TypeDBClientError(TRANSACTION_CLOSED_WITH_ERRORS.message(errors)); + const error = this._bidirectionalStream.getError(); + if (!error) throw new TypeDBClientError(TRANSACTION_CLOSED); + else throw new TypeDBClientError(TRANSACTION_CLOSED_WITH_ERRORS.message(error)); } } diff --git a/query/QueryManagerImpl.ts b/query/QueryManagerImpl.ts index 5a5f270f5..18f82745c 100644 --- a/query/QueryManagerImpl.ts +++ b/query/QueryManagerImpl.ts @@ -19,23 +19,23 @@ * under the License. */ -import { QueryManager as QueryProto } from "typedb-protocol/common/query_pb"; -import { Transaction as TransactionProto } from "typedb-protocol/common/transaction_pb"; -import { ConceptMap } from "../api/answer/ConceptMap"; -import { ConceptMapGroup } from "../api/answer/ConceptMapGroup"; -import { Numeric } from "../api/answer/Numeric"; -import { NumericGroup } from "../api/answer/NumericGroup"; -import { TypeDBOptions } from "../api/connection/TypeDBOptions"; -import { TypeDBTransaction } from "../api/connection/TypeDBTransaction"; -import { Explanation } from "../api/logic/Explanation"; -import { QueryManager } from "../api/query/QueryManager"; -import { RequestBuilder } from "../common/rpc/RequestBuilder"; -import { Stream } from "../common/util/Stream"; -import { ConceptMapGroupImpl } from "../concept/answer/ConceptMapGroupImpl"; -import { ConceptMapImpl } from "../concept/answer/ConceptMapImpl"; -import { NumericGroupImpl } from "../concept/answer/NumericGroupImpl"; -import { NumericImpl } from "../concept/answer/NumericImpl"; -import { ExplanationImpl } from "../logic/ExplanationImpl"; +import {QueryManager as QueryProto} from "typedb-protocol/common/query_pb"; +import {Transaction as TransactionProto} from "typedb-protocol/common/transaction_pb"; +import {ConceptMap} from "../api/answer/ConceptMap"; +import {ConceptMapGroup} from "../api/answer/ConceptMapGroup"; +import {Numeric} from "../api/answer/Numeric"; +import {NumericGroup} from "../api/answer/NumericGroup"; +import {TypeDBOptions} from "../api/connection/TypeDBOptions"; +import {TypeDBTransaction} from "../api/connection/TypeDBTransaction"; +import {Explanation} from "../api/logic/Explanation"; +import {QueryManager} from "../api/query/QueryManager"; +import {RequestBuilder} from "../common/rpc/RequestBuilder"; +import {Stream} from "../common/util/Stream"; +import {ConceptMapGroupImpl} from "../concept/answer/ConceptMapGroupImpl"; +import {ConceptMapImpl} from "../concept/answer/ConceptMapImpl"; +import {NumericGroupImpl} from "../concept/answer/NumericGroupImpl"; +import {NumericImpl} from "../concept/answer/NumericImpl"; +import {ExplanationImpl} from "../logic/ExplanationImpl"; export class QueryManagerImpl implements QueryManager { diff --git a/stream/BidirectionalStream.ts b/stream/BidirectionalStream.ts index c7a1f1978..301a4ad7e 100644 --- a/stream/BidirectionalStream.ts +++ b/stream/BidirectionalStream.ts @@ -19,16 +19,16 @@ * under the License. */ -import { ClientDuplexStream } from "@grpc/grpc-js"; -import { Transaction } from "typedb-protocol/common/transaction_pb"; +import {ClientDuplexStream} from "@grpc/grpc-js"; +import {Transaction} from "typedb-protocol/common/transaction_pb"; import * as uuid from "uuid"; -import { ErrorMessage } from "../common/errors/ErrorMessage"; -import { TypeDBClientError } from "../common/errors/TypeDBClientError"; -import { TypeDBStub } from "../common/rpc/TypeDBStub"; -import { Stream } from "../common/util/Stream"; -import { BatchDispatcher, RequestTransmitter } from "./RequestTransmitter"; -import { ResponseCollector } from "./ResponseCollector"; -import { ResponsePartIterator } from "./ResponsePartIterator"; +import {ErrorMessage} from "../common/errors/ErrorMessage"; +import {TypeDBClientError} from "../common/errors/TypeDBClientError"; +import {TypeDBStub} from "../common/rpc/TypeDBStub"; +import {Stream} from "../common/util/Stream"; +import {BatchDispatcher, RequestTransmitter} from "./RequestTransmitter"; +import {ResponseCollector} from "./ResponseCollector"; +import {ResponsePartIterator} from "./ResponsePartIterator"; import MISSING_RESPONSE = ErrorMessage.Client.MISSING_RESPONSE; import UNKNOWN_REQUEST_ID = ErrorMessage.Client.UNKNOWN_REQUEST_ID; import ResponseQueue = ResponseCollector.ResponseQueue; @@ -41,6 +41,7 @@ export class BidirectionalStream { private readonly _responsePartCollector: ResponseCollector; private _stub: TypeDBStub; private _isOpen: boolean; + private _error: Error | string; constructor(stub: TypeDBStub, requestTransmitter: RequestTransmitter) { this._requestTransmitter = requestTransmitter; @@ -62,24 +63,33 @@ export class BidirectionalStream { const responseQueue = this._responseCollector.queue(requestId); if (batch) this._dispatcher.dispatch(request); else this._dispatcher.dispatchNow(request); - return (await responseQueue.take() as Transaction.Res); + try { + return await responseQueue.take() as Transaction.Res; + } finally { + this._responseCollector.remove(requestId); + } } stream(request: Transaction.Req): Stream { const requestId = uuid.v4(); request.setReqId(uuid.parse(requestId) as Uint8Array); const responseQueue = this._responsePartCollector.queue(requestId) as ResponseQueue; - const responseIterator = new ResponsePartIterator(requestId, responseQueue, this._dispatcher); + const responseIterator = new ResponsePartIterator(requestId, responseQueue, this); this._dispatcher.dispatch(request); return Stream.iterable(responseIterator); } + iteratorDone(requestId: string) { + this._responsePartCollector.remove(requestId); + } + isOpen(): boolean { return this._isOpen; } async close(error?: Error | string): Promise { this._isOpen = false; + this._error = error; this._responseCollector.close(error); this._responsePartCollector.close(error); this._dispatcher.close(); @@ -128,7 +138,11 @@ export class BidirectionalStream { queue.put(res); } - getErrors(): (Error|string)[] { - return (this._responseCollector.getErrors()).concat(this._responsePartCollector.getErrors()); + dispatcher(): BatchDispatcher { + return this._dispatcher; + } + + getError(): Error | string { + return this._error; } } diff --git a/stream/ResponseCollector.ts b/stream/ResponseCollector.ts index a801af38f..4ded210a7 100644 --- a/stream/ResponseCollector.ts +++ b/stream/ResponseCollector.ts @@ -41,18 +41,12 @@ export class ResponseCollector { return this._response_queues[uuid]; } - close(error?: Error | string) { - Object.values(this._response_queues).forEach(collector => collector.close(error)); + remove(requestId: string) { + delete this._response_queues[requestId]; } - getErrors(): (Error | string)[] { - const errors: (Error | string)[] = []; - for (const requestId in this._response_queues) { - const error = this._response_queues[requestId].getError(); - if (error) errors.push(error); - delete this._response_queues[requestId]; - } - return errors; + close(error?: Error | string) { + Object.values(this._response_queues).forEach(collector => collector.close(error)); } } @@ -86,10 +80,6 @@ export namespace ResponseCollector { this._error = error; this._queue.add(new Done()); } - - getError(): string | Error { - return this._error; - } } class QueueElement { diff --git a/stream/ResponsePartIterator.ts b/stream/ResponsePartIterator.ts index d43470dc4..924e105dd 100644 --- a/stream/ResponsePartIterator.ts +++ b/stream/ResponsePartIterator.ts @@ -28,18 +28,19 @@ import { ResponseCollector } from "./ResponseCollector"; import MISSING_RESPONSE = ErrorMessage.Client.MISSING_RESPONSE; import UNKNOWN_STREAM_STATE = ErrorMessage.Client.UNKNOWN_STREAM_STATE; import ResCase = Transaction.ResPart.ResCase; +import {BidirectionalStream} from "./BidirectionalStream"; export class ResponsePartIterator implements AsyncIterable { private readonly _requestId: string; - private readonly _dispatcher: BatchDispatcher; private readonly _responseCollector: ResponseCollector.ResponseQueue; + private readonly _stream: BidirectionalStream; constructor(requestId: string, responseCollector: ResponseCollector.ResponseQueue, - dispatcher: BatchDispatcher) { + stream: BidirectionalStream) { this._requestId = requestId; - this._dispatcher = dispatcher; this._responseCollector = responseCollector; + this._stream = stream; } async* [Symbol.asyncIterator](): AsyncIterator { @@ -51,6 +52,7 @@ export class ResponsePartIterator implements AsyncIterable } async next(): Promise { + if (this._stream.getError()) throw this._stream.getError(); const res = await this._responseCollector.take(); switch (res.getResCase()) { case ResCase.RES_NOT_SET: @@ -58,9 +60,10 @@ export class ResponsePartIterator implements AsyncIterable case ResCase.STREAM_RES_PART : switch (res.getStreamResPart().getState()) { case Transaction.Stream.State.DONE: + this._stream.iteratorDone(this._requestId); return null; case Transaction.Stream.State.CONTINUE: - this._dispatcher.dispatch(RequestBuilder.Transaction.streamReq(this._requestId)) + this._stream.dispatcher().dispatch(RequestBuilder.Transaction.streamReq(this._requestId)) return this.next(); default: throw new TypeDBClientError(UNKNOWN_STREAM_STATE.message(res.getStreamResPart()));