Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Implement and test configurable transaction timeouts #197

Merged
merged 41 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
213fae6
update deps
flyingsilverfin Dec 21, 2021
5caed4a
Implement timeout option and error draining
flyingsilverfin Dec 21, 2021
1ad353b
Implement new steps for session and transaction options
flyingsilverfin Dec 21, 2021
5edba1b
fix test runner and imports
flyingsilverfin Dec 21, 2021
929d7f8
use property setters
flyingsilverfin Dec 21, 2021
892b74a
Errors are captured on receive
flyingsilverfin Jan 10, 2022
a93032e
fix linter
flyingsilverfin Jan 10, 2022
7eed859
cleanup and bump to protocol from master
flyingsilverfin Jan 10, 2022
e77b84a
revert yarn.lock
flyingsilverfin Jan 10, 2022
4de4eb9
update yarn.lock
flyingsilverfin Jan 10, 2022
ec036f2
create UtilSteps to match other clients
flyingsilverfin Jan 11, 2022
b1fc1ae
fix build
flyingsilverfin Jan 11, 2022
7ab502d
regenerate yarn.lock
flyingsilverfin Jan 11, 2022
adba8df
update grpc and other deps
flyingsilverfin Jan 11, 2022
5b7781e
Revert updated deps
flyingsilverfin Jan 11, 2022
0a66f3e
add assertions
flyingsilverfin Jan 11, 2022
c074d17
Address comments
flyingsilverfin Jan 11, 2022
f14db6e
use 2.6.1 tag release
flyingsilverfin Jan 11, 2022
a13bfc6
fix error message
flyingsilverfin Jan 14, 2022
a35d1cd
Add async to client close() and close client after tests correctly
flyingsilverfin Jan 14, 2022
ae3c735
trigger ci
flyingsilverfin Jan 14, 2022
6c808c5
Add console.logs
flyingsilverfin Jan 14, 2022
2e16729
Add more error logging
flyingsilverfin Jan 14, 2022
11f67fd
add more prints
flyingsilverfin Jan 14, 2022
debad9b
Assert database is contained when created
flyingsilverfin Jan 14, 2022
513ca53
use property getter
flyingsilverfin Jan 14, 2022
1935793
trigger ci
flyingsilverfin Jan 14, 2022
70b1a52
Use server that logs database operations
flyingsilverfin Jan 14, 2022
3f31ea6
update dependencies of grpc and protobuf
flyingsilverfin Jan 14, 2022
9c5ac16
remove extra check on db create
flyingsilverfin Jan 14, 2022
e05c95a
Add sleep to only core tests
flyingsilverfin Jan 14, 2022
3efdfa9
add sandbox_debug
flyingsilverfin Jan 14, 2022
dbdff45
Add sandbox debug
flyingsilverfin Jan 14, 2022
2e77538
trigger ci
flyingsilverfin Jan 14, 2022
2291c2d
Only run match test
flyingsilverfin Jan 14, 2022
64305c9
trigger ci
flyingsilverfin Jan 14, 2022
c994ed9
Uncomment line
flyingsilverfin Jan 14, 2022
6000f1e
add more logging to server side
flyingsilverfin Jan 14, 2022
7937d34
Add print to databaseimpl delete()
flyingsilverfin Jan 14, 2022
81b6d02
add await() to Before() and After() methods and restore all ci tests
flyingsilverfin Jan 14, 2022
d0f135f
cleanup console logs for debugging
flyingsilverfin Jan 14, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions api/connection/TypeDBOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
* under the License.
*/

import { Options } from "typedb-protocol/common/options_pb";
import { ErrorMessage } from "../../common/errors/ErrorMessage";
import { TypeDBClientError } from "../../common/errors/TypeDBClientError";
import {Options} from "typedb-protocol/common/options_pb";
import {ErrorMessage} from "../../common/errors/ErrorMessage";
import {TypeDBClientError} from "../../common/errors/TypeDBClientError";
import NEGATIVE_VALUE_NOT_ALLOWED = ErrorMessage.Client.NEGATIVE_VALUE_NOT_ALLOWED;

namespace Opts {
Expand All @@ -33,6 +33,7 @@ namespace Opts {
prefetchSize?: number;
prefetch?: boolean;
sessionIdleTimeoutMillis?: number;
transactionTimeoutMillis?: number;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new transaction option: millis before timeout

schemaLockAcquireTimeoutMillis?: number;
}

Expand All @@ -50,6 +51,7 @@ namespace Opts {
if (options.prefetchSize != null) optionsProto.setPrefetchSize(options.prefetchSize);
if (options.prefetch != null) optionsProto.setPrefetch(options.prefetch);
if (options.sessionIdleTimeoutMillis != null) optionsProto.setSessionIdleTimeoutMillis(options.sessionIdleTimeoutMillis);
if (options.transactionTimeoutMillis != null) optionsProto.setTransactionTimeoutMillis(options.transactionTimeoutMillis);
if (options.schemaLockAcquireTimeoutMillis != null) optionsProto.setSchemaLockAcquireTimeoutMillis(options.schemaLockAcquireTimeoutMillis);
if (options.isCluster()) {
const clusterOptions = options as Opts.Cluster;
Expand All @@ -69,6 +71,7 @@ export class TypeDBOptions implements Opts.Core {
private _prefetchSize: number;
private _prefetch: boolean;
private _sessionIdleTimeoutMillis: number;
private _transactionTimeoutMillis: number;
private _schemaLockAcquireTimeoutMillis: number;

constructor(obj: { [K in keyof Opts.Core]: Opts.Core[K] } = {}) {
Expand Down Expand Up @@ -138,11 +141,22 @@ export class TypeDBOptions implements Opts.Core {
return this._sessionIdleTimeoutMillis;
}

set sessionIdleTimeoutMillis(value: number) {
if (value < 1) {
throw new TypeDBClientError(NEGATIVE_VALUE_NOT_ALLOWED.message(value));
set sessionIdleTimeoutMillis(millis: number) {
if (millis < 1) {
throw new TypeDBClientError(NEGATIVE_VALUE_NOT_ALLOWED.message(millis));
}
this._sessionIdleTimeoutMillis = millis;
}

get transactionTimeoutMillis() {
return this._transactionTimeoutMillis;
}

set transactionTimeoutMillis(millis: number) {
if (millis < 1) {
throw new TypeDBClientError(NEGATIVE_VALUE_NOT_ALLOWED.message(millis));
}
this._sessionIdleTimeoutMillis = value;
this._transactionTimeoutMillis = millis;
Comment on lines +151 to +159
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setter/getter for transaction timeouts

}

get schemaLockAcquireTimeoutMillis() {
Expand Down
32 changes: 17 additions & 15 deletions common/errors/ErrorMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,22 @@ export namespace ErrorMessage {
export const SESSION_ID_EXISTS = new Client(1, (args: Stringable[]) => `The newly opened session id '${args[0]}' already exists`);
export const SESSION_CLOSED = new Client(2, () => `Session is closed.`);
export const TRANSACTION_CLOSED = new Client(3, () => `The transaction has been closed and no further operation is allowed.`);
export const UNABLE_TO_CONNECT = new Client(4, () => `Unable to connect to TypeDB server.`);
export const NEGATIVE_VALUE_NOT_ALLOWED = new Client(5, (args: Stringable[]) => `Value cannot be less than 1, was: '${args[0]}'.`);
export const MISSING_DB_NAME = new Client(6, () => `Database name cannot be null.`);
export const DB_DOES_NOT_EXIST = new Client(7, (args: Stringable[]) => `The database '${args[0]}' does not exist.`);
export const UNKNOWN_STREAM_STATE = new Client(8, (args: Stringable[]) => `RPC transaction stream response '${args[0]}' is unknown.`);
export const MISSING_RESPONSE = new Client(9, (args: Stringable[]) => `The required field 'res' of type '${args[0]}' was not set.`);
export const UNKNOWN_REQUEST_ID = new Client(10, (args: Stringable[]) => `Received a response with unknown request id '${args[0]}'.`);
export const CLUSTER_NO_PRIMARY_REPLICA_YET = new Client(11, (args: Stringable[]) => `No replica has been marked as the primary replica for latest known term '${args[0]}'.`);
export const CLUSTER_UNABLE_TO_CONNECT = new Client(12, (args: Stringable[]) => `Unable to connect to TypeDB Cluster. Attempted connecting to the cluster members, but none are available: '${args[0]}'.`);
export const CLUSTER_REPLICA_NOT_PRIMARY = new Client(13, () => `The replica is not the primary replica.`);
export const CLUSTER_ALL_NODES_FAILED = new Client(14, (args: Stringable[]) => `Attempted connecting to all cluster members, but the following errors occurred: \n'${args[0]}'`);
export const CLUSTER_USER_DOES_NOT_EXIST = new Client(15, (args: Stringable[]) => `The user '${args[0]}' does not exist.`);
export const CLUSTER_TOKEN_CREDENTIAL_INVALID = new Client(16, (args: Stringable[]) => `Invalid token credential.`);
export const CLUSTER_INVALID_ROOT_CA_PATH = new Client(17, (args: Stringable[]) => `The provided Root CA path '${args[0]}' does not exist`);
export const UNRECOGNISED_SESSION_TYPE = new Client(18, (args: Stringable[]) => `Session type '${args[0]}' was not recognised.`);
export const TRANSACTION_CLOSED_WITH_ERRORS = new Client(4, (args) => `The transaction has been closed with error(s): \n${args[0]}.`)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new exception to report a transaction is closed with errors

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for consistency with other constants in this file let's add the type annotation of args:

Suggested change
export const TRANSACTION_CLOSED_WITH_ERRORS = new Client(4, (args) => `The transaction has been closed with error(s): \n${args[0]}.`)
export const TRANSACTION_CLOSED_WITH_ERRORS = new Client(4, (args: Stringable[]) => `The transaction has been closed with error(s): \n${args[0]}.`)

export const UNABLE_TO_CONNECT = new Client(5, () => `Unable to connect to TypeDB server.`);
export const NEGATIVE_VALUE_NOT_ALLOWED = new Client(6, (args: Stringable[]) => `Value cannot be less than 1, was: '${args[0]}'.`);
export const MISSING_DB_NAME = new Client(7, () => `Database name cannot be null.`);
export const DB_DOES_NOT_EXIST = new Client(8, (args: Stringable[]) => `The database '${args[0]}' does not exist.`);
export const UNKNOWN_STREAM_STATE = new Client(9, (args: Stringable[]) => `RPC transaction stream response '${args[0]}' is unknown.`);
export const MISSING_RESPONSE = new Client(10, (args: Stringable[]) => `The required field 'res' of type '${args[0]}' was not set.`);
export const UNKNOWN_REQUEST_ID = new Client(11, (args: Stringable[]) => `Received a response with unknown request id '${args[0]}'.`);
export const CLUSTER_NO_PRIMARY_REPLICA_YET = new Client(12, (args: Stringable[]) => `No replica has been marked as the primary replica for latest known term '${args[0]}'.`);
export const CLUSTER_UNABLE_TO_CONNECT = new Client(13, (args: Stringable[]) => `Unable to connect to TypeDB Cluster. Attempted connecting to the cluster members, but none are available: '${args[1]}'.`);
export const CLUSTER_REPLICA_NOT_PRIMARY = new Client(14, () => `The replica is not the primary replica.`);
export const CLUSTER_ALL_NODES_FAILED = new Client(15, (args: Stringable[]) => `Attempted connecting to all cluster members, but the following errors occurred: \n'${args[0]}'`);
export const CLUSTER_USER_DOES_NOT_EXIST = new Client(16, (args: Stringable[]) => `The user '${args[0]}' does not exist.`);
export const CLUSTER_TOKEN_CREDENTIAL_INVALID = new Client(17, (args: Stringable[]) => `Invalid token credential.`);
export const CLUSTER_INVALID_ROOT_CA_PATH = new Client(18, (args: Stringable[]) => `The provided Root CA path '${args[0]}' does not exist`);
export const UNRECOGNISED_SESSION_TYPE = new Client(19, (args: Stringable[]) => `Session type '${args[0]}' was not recognised.`);
}

export class Concept extends ErrorMessage {
Expand Down Expand Up @@ -132,5 +133,6 @@ export namespace ErrorMessage {
export namespace Internal {
export const ILLEGAL_CAST = new Internal(1, (args: Stringable[]) => `Illegal casting operation from '${args[0]}' to '${args[1]}'.`);
export const ILLEGAL_ARGUMENT = new Internal(2, (args: Stringable[]) => `Illegal argument provided: '${args[0]}'`);
export const ILLEGAL_STATE = new Internal(3, () => `Illegal state.`);
}
}
8 changes: 8 additions & 0 deletions common/util/BlockingQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ export class BlockingQueue<T> {
return this._promises.shift();
}

drain(): Promise<T>[] {
const values = [];
for (let i = 0; i < this._promises.length; i++) {
values.push(this._promises.shift());
}
return values;
}

private addPromise(): void {
this._promises.push(new Promise(resolve => {
this._resolvers.push(resolve);
Expand Down
43 changes: 26 additions & 17 deletions connection/TypeDBTransactionImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,24 @@
* under the License.
*/

import { Transaction } from "typedb-protocol/common/transaction_pb";
import { ConceptManager } from "../api/concept/ConceptManager";
import { TypeDBOptions } from "../api/connection/TypeDBOptions";
import { TransactionType, TypeDBTransaction } from "../api/connection/TypeDBTransaction";
import { LogicManager } from "../api/logic/LogicManager";
import { QueryManager } from "../api/query/QueryManager";
import { ErrorMessage } from "../common/errors/ErrorMessage";
import { TypeDBClientError } from "../common/errors/TypeDBClientError";
import { RequestBuilder } from "../common/rpc/RequestBuilder";
import { Stream } from "../common/util/Stream";
import { ConceptManagerImpl } from "../concept/ConceptManagerImpl";
import { LogicManagerImpl } from "../logic/LogicManagerImpl";
import { QueryManagerImpl } from "../query/QueryManagerImpl";
import { BidirectionalStream } from "../stream/BidirectionalStream";
import { TypeDBSessionImpl } from "./TypeDBSessionImpl";
import {Transaction} from "typedb-protocol/common/transaction_pb";
import {ConceptManager} from "../api/concept/ConceptManager";
import {TypeDBOptions} from "../api/connection/TypeDBOptions";
import {TransactionType, TypeDBTransaction} from "../api/connection/TypeDBTransaction";
import {LogicManager} from "../api/logic/LogicManager";
import {QueryManager} from "../api/query/QueryManager";
import {ErrorMessage} from "../common/errors/ErrorMessage";
import {TypeDBClientError} from "../common/errors/TypeDBClientError";
import {RequestBuilder} from "../common/rpc/RequestBuilder";
import {Stream} from "../common/util/Stream";
import {ConceptManagerImpl} from "../concept/ConceptManagerImpl";
import {LogicManagerImpl} from "../logic/LogicManagerImpl";
import {QueryManagerImpl} from "../query/QueryManagerImpl";
import {BidirectionalStream} from "../stream/BidirectionalStream";
import {TypeDBSessionImpl} from "./TypeDBSessionImpl";
import assert = require("assert");
Comment on lines +22 to +37
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto format....

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've never used assert before in Client NodeJS's production code.

Unlike in Java, there is no clean way to turn off assertions in NodeJS. (Methods typically used include hot module replacement - literally stripping the statements out of the source code before running it - and modifying the assert prototype to replace assert with a no-op function at runtime!)

I think we're better off replacing it with throw new TypeDBClientError. We've done this elsewhere (see TypeDBClientImpl.session which throws a Client NodeJS-only error if the session ID already exists).

flyingsilverfin marked this conversation as resolved.
Show resolved Hide resolved
import TRANSACTION_CLOSED = ErrorMessage.Client.TRANSACTION_CLOSED;
import TRANSACTION_CLOSED_WITH_ERRORS = ErrorMessage.Client.TRANSACTION_CLOSED_WITH_ERRORS;

export class TypeDBTransactionImpl implements TypeDBTransaction.Extended {
private readonly _session: TypeDBSessionImpl;
Expand Down Expand Up @@ -105,13 +107,20 @@ export class TypeDBTransactionImpl implements TypeDBTransaction.Extended {
}

public async rpcExecute(request: Transaction.Req, batch?: boolean): Promise<Transaction.Res> {
if (!this.isOpen()) throw new TypeDBClientError(TRANSACTION_CLOSED);
if (!this.isOpen()) this.throwTransactionClosed()
const useBatch = batch !== false;
return this._bidirectionalStream.single(request, useBatch);
}

public rpcStream(request: Transaction.Req): Stream<Transaction.ResPart> {
if (!this.isOpen()) throw new TypeDBClientError(TRANSACTION_CLOSED);
if (!this.isOpen()) this.throwTransactionClosed();
return this._bidirectionalStream.stream(request);
}

private throwTransactionClosed(): void {
assert(!this.isOpen());
const errors = this._bidirectionalStream.getErrors();
if (errors.length == 0) throw new TypeDBClientError(TRANSACTION_CLOSED);
else throw new TypeDBClientError(TRANSACTION_CLOSED_WITH_ERRORS.message(errors));
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can differentiate between a transaction closed with errors and one without errors

}
4 changes: 2 additions & 2 deletions dependencies/vaticle/artifacts.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def vaticle_typedb_artifacts():
artifact_name = "typedb-server-{platform}-{version}.{ext}",
tag_source = deployment["artifact.release"],
commit_source = deployment["artifact.snapshot"],
commit = "6ed020e52fe379d1100f64511805ed344c7a68db"
commit = "2367157bdd898e474198726d0ef5446372a73314",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update core artifact with transaction timeout

)

def vaticle_typedb_cluster_artifacts():
Expand All @@ -39,5 +39,5 @@ def vaticle_typedb_cluster_artifacts():
artifact_name = "typedb-cluster-all-{platform}-{version}.{ext}",
tag_source = deployment_private["artifact.release"],
commit_source = deployment_private["artifact.snapshot"],
commit = "f6d9eae58c83c165d932c0433a3a0d858e206fab",
commit = "f6d9eae58c83c165d932c0433a3a0d858e206fab"
)
4 changes: 2 additions & 2 deletions dependencies/vaticle/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ def vaticle_typedb_common():
git_repository(
name = "vaticle_typedb_common",
remote = "https://github.com/vaticle/typedb-common",
tag = "2.5.0" # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_typedb_common
commit = "a436f7c9b64060d7fed718a3be60895c15bd893f" # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_typedb_common
)

def vaticle_typedb_behaviour():
git_repository(
name = "vaticle_typedb_behaviour",
remote = "https://github.com/vaticle/typedb-behaviour",
commit = "042cbf2a6c3b8b9d7a8c3b8d74e796da968523fd", # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_typedb_behaviour
commit = "d92d840dd40cc8393936d6f6042820ebcd3a9cef", # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_typedb_behaviour
Comment on lines 32 to +42
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump dependencies

)
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"dependencies": {
"@grpc/grpc-js": "1.3.2",
"google-protobuf": "3.14.0",
"typedb-protocol": "2.5.0",
"typedb-protocol": "https://repo.vaticle.com/repository/npm-snapshot/typedb-protocol/-/typedb-protocol-0.0.0-4ce4e1339995c8982914e9fd3177273b693609b7.tgz",
"uuid": "8.3.2"
},
"devDependencies": {
Expand Down
4 changes: 4 additions & 0 deletions stream/BidirectionalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,8 @@ export class BidirectionalStream {
if (!queue) throw new TypeDBClientError(UNKNOWN_REQUEST_ID.message(requestId));
queue.put(res);
}

getErrors(): (Error|string)[] {
return (this._responseCollector.getErrors()).concat(this._responsePartCollector.getErrors());
}
Comment on lines +131 to +133
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retrieve errors from the different collectors

}
47 changes: 23 additions & 24 deletions stream/ResponseCollector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
* under the License.
*/

import { ErrorMessage } from "../common/errors/ErrorMessage";
import { TypeDBClientError } from "../common/errors/TypeDBClientError";
import { BlockingQueue } from "../common/util/BlockingQueue";
import {ErrorMessage} from "../common/errors/ErrorMessage";
import {TypeDBClientError} from "../common/errors/TypeDBClientError";
import {BlockingQueue} from "../common/util/BlockingQueue";

export class ResponseCollector<T> {

Expand All @@ -43,18 +43,27 @@ export class ResponseCollector<T> {

close(error?: Error | string) {
Object.values(this._collectors).forEach(collector => collector.close(error));
for (const requestId in this._collectors) delete this._collectors[requestId];
}

getErrors(): (Error | string)[] {
const errors: (Error | string)[] = [];
for (const requestId in this._collectors) {
const error = this._collectors[requestId].getError();
if (error) errors.push(error);
delete this._collectors[requestId];
}
return errors;
}
Comment on lines +48 to +56
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get errors from each collector, and delete the collector after

}

export namespace ResponseCollector {

import TRANSACTION_CLOSED = ErrorMessage.Client.TRANSACTION_CLOSED;
import ILLEGAL_STATE = ErrorMessage.Internal.ILLEGAL_STATE;

export class ResponseQueue<T> {

private readonly _queue: BlockingQueue<QueueElement>;
private _error: string | Error = null;

constructor() {
this._queue = new BlockingQueue<QueueElement>()
Expand All @@ -63,23 +72,23 @@ export namespace ResponseCollector {
async take(): Promise<T> {
const element = await this._queue.take();
if (element.isResponse()) return (element as Response<T>).value;
else {
if ((element as Done).hasError()) {
throw new TypeDBClientError((element as Done).error);
} else {
throw new TypeDBClientError(TRANSACTION_CLOSED);
}
}
else if (element.isDone() && !this._error) throw new TypeDBClientError(TRANSACTION_CLOSED);
else if (element.isDone() && this._error) throw new TypeDBClientError(this._error);
else throw new TypeDBClientError(ILLEGAL_STATE);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's a Done, and there is an error state, we throw the error. Otherwise we just say it's a closed transaction :)

}

put(element: T): void {
this._queue.add(new Response(element));
}

close(error?: Error | string): void {
this._queue.add(new Done(error));
this._error = error;
this._queue.add(new Done());
Comment on lines +86 to +87
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

major change: when we receive an error, we immediate put the ResponseCollector into a error state, and put Done message on the queue to unblock any waiting, which will return as an error above if there is an error set

}

getError(): string | Error {
return this._error;
}
}

class QueueElement {
Expand Down Expand Up @@ -113,19 +122,9 @@ export namespace ResponseCollector {
}

class Done extends QueueElement {
private readonly _error?: Error | string;

constructor(error?: Error | string) {
constructor() {
super();
this._error = error;
}

hasError(): boolean {
return this._error != null;
}

get error(): Error | string {
return this._error;
Comment on lines -118 to -128
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keeping an error inside the Done element is no longer necessary

}

isDone(): boolean {
Expand Down
Loading