Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Store error on transaction stream and clean up completed query state (#…
Browse files Browse the repository at this point in the history
…202)

## What is the goal of this PR?

Multiplexed query streams over the transaction stream are cleaned up when complete. We also store any exceptions received against the transaction stream (as well as query streams) in order to propagate the error to any future transaction operations, not just open query streams.

## What are the changes implemented in this PR?

* Remove single and iterator response queues when they are finished
* store errors against query streams and transaction streams, so we can error on transaction operations against a closed transaction
  • Loading branch information
flyingsilverfin authored Jan 21, 2022
1 parent 27e5ff1 commit b0c26fe
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 69 deletions.
34 changes: 17 additions & 17 deletions .grabl/automation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ build:
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-core.sh //test/behaviour/connection/... --test_output=errors --jobs=1
# test-behaviour-connection-cluster:
# image: vaticle-ubuntu-21.04
# command: |
# export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
# export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
# bazel run @vaticle_dependencies//distribution/artifact:create-netrc
# .grabl/test-cluster.sh //test/behaviour/connection/... --test_output=errors --jobs=1
test-behaviour-connection-cluster:
image: vaticle-ubuntu-21.04
command: |
export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-cluster.sh //test/behaviour/connection/... --test_output=errors --jobs=1
test-behaviour-concept-core:
image: vaticle-ubuntu-21.04
command: |
Expand All @@ -107,14 +107,14 @@ build:
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-core.sh //test/behaviour/typeql/language/match/... --test_output=errors --jobs=1
.grabl/test-core.sh //test/behaviour/typeql/language/get/... --test_output=errors --jobs=1
# test-behaviour-match-cluster:
# image: vaticle-ubuntu-21.04
# command: |
# export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
# export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
# bazel run @vaticle_dependencies//distribution/artifact:create-netrc
# .grabl/test-cluster.sh //test/behaviour/typeql/language/match/... --test_output=errors --jobs=1
# .grabl/test-cluster.sh //test/behaviour/typeql/language/get/... --test_output=errors --jobs=1
test-behaviour-match-cluster:
image: vaticle-ubuntu-21.04
command: |
export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-cluster.sh //test/behaviour/typeql/language/match/... --test_output=errors --jobs=1
.grabl/test-cluster.sh //test/behaviour/typeql/language/get/... --test_output=errors --jobs=1
test-behaviour-writable-core:
image: vaticle-ubuntu-21.04
command: |
Expand Down Expand Up @@ -156,9 +156,9 @@ build:
image: vaticle-ubuntu-21.04
dependencies: [
build, test-integration,
test-behaviour-connection-core, #test-behaviour-connection-cluster,
test-behaviour-connection-core, test-behaviour-connection-cluster,
test-behaviour-concept-core, test-behaviour-concept-cluster,
test-behaviour-match-core, #test-behaviour-match-cluster,
test-behaviour-match-core, test-behaviour-match-cluster,
test-behaviour-writable-core, test-behaviour-writable-cluster,
test-behaviour-definable-core, test-behaviour-definable-cluster
]
Expand Down
7 changes: 3 additions & 4 deletions connection/TypeDBTransactionImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import {LogicManagerImpl} from "../logic/LogicManagerImpl";
import {QueryManagerImpl} from "../query/QueryManagerImpl";
import {BidirectionalStream} from "../stream/BidirectionalStream";
import {TypeDBSessionImpl} from "./TypeDBSessionImpl";
import assert = require("assert");
import TRANSACTION_CLOSED = ErrorMessage.Client.TRANSACTION_CLOSED;
import TRANSACTION_CLOSED_WITH_ERRORS = ErrorMessage.Client.TRANSACTION_CLOSED_WITH_ERRORS;
import ILLEGAL_STATE = ErrorMessage.Internal.ILLEGAL_STATE;
Expand Down Expand Up @@ -120,8 +119,8 @@ export class TypeDBTransactionImpl implements TypeDBTransaction.Extended {

private throwTransactionClosed(): void {
if (this.isOpen()) throw new TypeDBClientError(ILLEGAL_STATE);
const errors = this._bidirectionalStream.getErrors();
if (errors.length == 0) throw new TypeDBClientError(TRANSACTION_CLOSED);
else throw new TypeDBClientError(TRANSACTION_CLOSED_WITH_ERRORS.message(errors));
const error = this._bidirectionalStream.getError();
if (!error) throw new TypeDBClientError(TRANSACTION_CLOSED);
else throw new TypeDBClientError(TRANSACTION_CLOSED_WITH_ERRORS.message(error));
}
}
34 changes: 17 additions & 17 deletions query/QueryManagerImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@
* under the License.
*/

import { QueryManager as QueryProto } from "typedb-protocol/common/query_pb";
import { Transaction as TransactionProto } from "typedb-protocol/common/transaction_pb";
import { ConceptMap } from "../api/answer/ConceptMap";
import { ConceptMapGroup } from "../api/answer/ConceptMapGroup";
import { Numeric } from "../api/answer/Numeric";
import { NumericGroup } from "../api/answer/NumericGroup";
import { TypeDBOptions } from "../api/connection/TypeDBOptions";
import { TypeDBTransaction } from "../api/connection/TypeDBTransaction";
import { Explanation } from "../api/logic/Explanation";
import { QueryManager } from "../api/query/QueryManager";
import { RequestBuilder } from "../common/rpc/RequestBuilder";
import { Stream } from "../common/util/Stream";
import { ConceptMapGroupImpl } from "../concept/answer/ConceptMapGroupImpl";
import { ConceptMapImpl } from "../concept/answer/ConceptMapImpl";
import { NumericGroupImpl } from "../concept/answer/NumericGroupImpl";
import { NumericImpl } from "../concept/answer/NumericImpl";
import { ExplanationImpl } from "../logic/ExplanationImpl";
import {QueryManager as QueryProto} from "typedb-protocol/common/query_pb";
import {Transaction as TransactionProto} from "typedb-protocol/common/transaction_pb";
import {ConceptMap} from "../api/answer/ConceptMap";
import {ConceptMapGroup} from "../api/answer/ConceptMapGroup";
import {Numeric} from "../api/answer/Numeric";
import {NumericGroup} from "../api/answer/NumericGroup";
import {TypeDBOptions} from "../api/connection/TypeDBOptions";
import {TypeDBTransaction} from "../api/connection/TypeDBTransaction";
import {Explanation} from "../api/logic/Explanation";
import {QueryManager} from "../api/query/QueryManager";
import {RequestBuilder} from "../common/rpc/RequestBuilder";
import {Stream} from "../common/util/Stream";
import {ConceptMapGroupImpl} from "../concept/answer/ConceptMapGroupImpl";
import {ConceptMapImpl} from "../concept/answer/ConceptMapImpl";
import {NumericGroupImpl} from "../concept/answer/NumericGroupImpl";
import {NumericImpl} from "../concept/answer/NumericImpl";
import {ExplanationImpl} from "../logic/ExplanationImpl";


export class QueryManagerImpl implements QueryManager {
Expand Down
40 changes: 27 additions & 13 deletions stream/BidirectionalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@
* under the License.
*/

import { ClientDuplexStream } from "@grpc/grpc-js";
import { Transaction } from "typedb-protocol/common/transaction_pb";
import {ClientDuplexStream} from "@grpc/grpc-js";
import {Transaction} from "typedb-protocol/common/transaction_pb";
import * as uuid from "uuid";
import { ErrorMessage } from "../common/errors/ErrorMessage";
import { TypeDBClientError } from "../common/errors/TypeDBClientError";
import { TypeDBStub } from "../common/rpc/TypeDBStub";
import { Stream } from "../common/util/Stream";
import { BatchDispatcher, RequestTransmitter } from "./RequestTransmitter";
import { ResponseCollector } from "./ResponseCollector";
import { ResponsePartIterator } from "./ResponsePartIterator";
import {ErrorMessage} from "../common/errors/ErrorMessage";
import {TypeDBClientError} from "../common/errors/TypeDBClientError";
import {TypeDBStub} from "../common/rpc/TypeDBStub";
import {Stream} from "../common/util/Stream";
import {BatchDispatcher, RequestTransmitter} from "./RequestTransmitter";
import {ResponseCollector} from "./ResponseCollector";
import {ResponsePartIterator} from "./ResponsePartIterator";
import MISSING_RESPONSE = ErrorMessage.Client.MISSING_RESPONSE;
import UNKNOWN_REQUEST_ID = ErrorMessage.Client.UNKNOWN_REQUEST_ID;
import ResponseQueue = ResponseCollector.ResponseQueue;
Expand All @@ -41,6 +41,7 @@ export class BidirectionalStream {
private readonly _responsePartCollector: ResponseCollector<Transaction.ResPart>;
private _stub: TypeDBStub;
private _isOpen: boolean;
private _error: Error | string;

constructor(stub: TypeDBStub, requestTransmitter: RequestTransmitter) {
this._requestTransmitter = requestTransmitter;
Expand All @@ -62,24 +63,33 @@ export class BidirectionalStream {
const responseQueue = this._responseCollector.queue(requestId);
if (batch) this._dispatcher.dispatch(request);
else this._dispatcher.dispatchNow(request);
return (await responseQueue.take() as Transaction.Res);
try {
return await responseQueue.take() as Transaction.Res;
} finally {
this._responseCollector.remove(requestId);
}
}

stream(request: Transaction.Req): Stream<Transaction.ResPart> {
const requestId = uuid.v4();
request.setReqId(uuid.parse(requestId) as Uint8Array);
const responseQueue = this._responsePartCollector.queue(requestId) as ResponseQueue<Transaction.ResPart>;
const responseIterator = new ResponsePartIterator(requestId, responseQueue, this._dispatcher);
const responseIterator = new ResponsePartIterator(requestId, responseQueue, this);
this._dispatcher.dispatch(request);
return Stream.iterable(responseIterator);
}

iteratorDone(requestId: string) {
this._responsePartCollector.remove(requestId);
}

isOpen(): boolean {
return this._isOpen;
}

async close(error?: Error | string): Promise<void> {
this._isOpen = false;
this._error = error;
this._responseCollector.close(error);
this._responsePartCollector.close(error);
this._dispatcher.close();
Expand Down Expand Up @@ -128,7 +138,11 @@ export class BidirectionalStream {
queue.put(res);
}

getErrors(): (Error|string)[] {
return (this._responseCollector.getErrors()).concat(this._responsePartCollector.getErrors());
dispatcher(): BatchDispatcher {
return this._dispatcher;
}

getError(): Error | string {
return this._error;
}
}
18 changes: 4 additions & 14 deletions stream/ResponseCollector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,12 @@ export class ResponseCollector<T> {
return this._response_queues[uuid];
}

close(error?: Error | string) {
Object.values(this._response_queues).forEach(collector => collector.close(error));
remove(requestId: string) {
delete this._response_queues[requestId];
}

getErrors(): (Error | string)[] {
const errors: (Error | string)[] = [];
for (const requestId in this._response_queues) {
const error = this._response_queues[requestId].getError();
if (error) errors.push(error);
delete this._response_queues[requestId];
}
return errors;
close(error?: Error | string) {
Object.values(this._response_queues).forEach(collector => collector.close(error));
}
}

Expand Down Expand Up @@ -86,10 +80,6 @@ export namespace ResponseCollector {
this._error = error;
this._queue.add(new Done());
}

getError(): string | Error {
return this._error;
}
}

class QueueElement {
Expand Down
11 changes: 7 additions & 4 deletions stream/ResponsePartIterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,19 @@ import { ResponseCollector } from "./ResponseCollector";
import MISSING_RESPONSE = ErrorMessage.Client.MISSING_RESPONSE;
import UNKNOWN_STREAM_STATE = ErrorMessage.Client.UNKNOWN_STREAM_STATE;
import ResCase = Transaction.ResPart.ResCase;
import {BidirectionalStream} from "./BidirectionalStream";

export class ResponsePartIterator implements AsyncIterable<Transaction.ResPart> {

private readonly _requestId: string;
private readonly _dispatcher: BatchDispatcher;
private readonly _responseCollector: ResponseCollector.ResponseQueue<Transaction.ResPart>;
private readonly _stream: BidirectionalStream;

constructor(requestId: string, responseCollector: ResponseCollector.ResponseQueue<Transaction.ResPart>,
dispatcher: BatchDispatcher) {
stream: BidirectionalStream) {
this._requestId = requestId;
this._dispatcher = dispatcher;
this._responseCollector = responseCollector;
this._stream = stream;
}

async* [Symbol.asyncIterator](): AsyncIterator<Transaction.ResPart, any, undefined> {
Expand All @@ -51,16 +52,18 @@ export class ResponsePartIterator implements AsyncIterable<Transaction.ResPart>
}

async next(): Promise<Transaction.ResPart> {
if (this._stream.getError()) throw this._stream.getError();
const res = await this._responseCollector.take();
switch (res.getResCase()) {
case ResCase.RES_NOT_SET:
throw new TypeDBClientError(MISSING_RESPONSE.message(this._requestId));
case ResCase.STREAM_RES_PART :
switch (res.getStreamResPart().getState()) {
case Transaction.Stream.State.DONE:
this._stream.iteratorDone(this._requestId);
return null;
case Transaction.Stream.State.CONTINUE:
this._dispatcher.dispatch(RequestBuilder.Transaction.streamReq(this._requestId))
this._stream.dispatcher().dispatch(RequestBuilder.Transaction.streamReq(this._requestId))
return this.next();
default:
throw new TypeDBClientError(UNKNOWN_STREAM_STATE.message(res.getStreamResPart()));
Expand Down

0 comments on commit b0c26fe

Please sign in to comment.