Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Implement and test configurable transaction timeouts #197

Merged
merged 41 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
213fae6
update deps
flyingsilverfin Dec 21, 2021
5caed4a
Implement timeout option and error draining
flyingsilverfin Dec 21, 2021
1ad353b
Implement new steps for session and transaction options
flyingsilverfin Dec 21, 2021
5edba1b
fix test runner and imports
flyingsilverfin Dec 21, 2021
929d7f8
use property setters
flyingsilverfin Dec 21, 2021
892b74a
Errors are captured on receive
flyingsilverfin Jan 10, 2022
a93032e
fix linter
flyingsilverfin Jan 10, 2022
7eed859
cleanup and bump to protocol from master
flyingsilverfin Jan 10, 2022
e77b84a
revert yarn.lock
flyingsilverfin Jan 10, 2022
4de4eb9
update yarn.lock
flyingsilverfin Jan 10, 2022
ec036f2
create UtilSteps to match other clients
flyingsilverfin Jan 11, 2022
b1fc1ae
fix build
flyingsilverfin Jan 11, 2022
7ab502d
regenerate yarn.lock
flyingsilverfin Jan 11, 2022
adba8df
update grpc and other deps
flyingsilverfin Jan 11, 2022
5b7781e
Revert updated deps
flyingsilverfin Jan 11, 2022
0a66f3e
add assertions
flyingsilverfin Jan 11, 2022
c074d17
Address comments
flyingsilverfin Jan 11, 2022
f14db6e
use 2.6.1 tag release
flyingsilverfin Jan 11, 2022
a13bfc6
fix error message
flyingsilverfin Jan 14, 2022
a35d1cd
Add async to client close() and close client after tests correctly
flyingsilverfin Jan 14, 2022
ae3c735
trigger ci
flyingsilverfin Jan 14, 2022
6c808c5
Add console.logs
flyingsilverfin Jan 14, 2022
2e16729
Add more error logging
flyingsilverfin Jan 14, 2022
11f67fd
add more prints
flyingsilverfin Jan 14, 2022
debad9b
Assert database is contained when created
flyingsilverfin Jan 14, 2022
513ca53
use property getter
flyingsilverfin Jan 14, 2022
1935793
trigger ci
flyingsilverfin Jan 14, 2022
70b1a52
Use server that logs database operations
flyingsilverfin Jan 14, 2022
3f31ea6
update dependencies of grpc and protobuf
flyingsilverfin Jan 14, 2022
9c5ac16
remove extra check on db create
flyingsilverfin Jan 14, 2022
e05c95a
Add sleep to only core tests
flyingsilverfin Jan 14, 2022
3efdfa9
add sandbox_debug
flyingsilverfin Jan 14, 2022
dbdff45
Add sandbox debug
flyingsilverfin Jan 14, 2022
2e77538
trigger ci
flyingsilverfin Jan 14, 2022
2291c2d
Only run match test
flyingsilverfin Jan 14, 2022
64305c9
trigger ci
flyingsilverfin Jan 14, 2022
c994ed9
Uncomment line
flyingsilverfin Jan 14, 2022
6000f1e
add more logging to server side
flyingsilverfin Jan 14, 2022
7937d34
Add print to databaseimpl delete()
flyingsilverfin Jan 14, 2022
81b6d02
add await() to Before() and After() methods and restore all ci tests
flyingsilverfin Jan 14, 2022
d0f135f
cleanup console logs for debugging
flyingsilverfin Jan 14, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ behaviour_steps_common = [
"//test/behaviour/connection/session:SessionSteps.ts",
"//test/behaviour/connection/transaction:TransactionSteps.ts",
"//test/behaviour/typeql:TypeQLSteps.ts",
"//test/behaviour/util:UtilSteps.ts",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

following other clients' we extract the wait steps into their own UtilSteps file

"//test/behaviour/util:Util.ts",
"//test:tsconfig.json"
] + glob(["node_modules/**"])
Expand Down
2 changes: 1 addition & 1 deletion api/connection/TypeDBClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export interface TypeDBClient {

asCluster(): TypeDBClient.Cluster;

close(): void;
close(): Promise<void>;
}

export namespace TypeDBClient {
Expand Down
28 changes: 21 additions & 7 deletions api/connection/TypeDBOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
* under the License.
*/

import { Options } from "typedb-protocol/common/options_pb";
import { ErrorMessage } from "../../common/errors/ErrorMessage";
import { TypeDBClientError } from "../../common/errors/TypeDBClientError";
import {Options} from "typedb-protocol/common/options_pb";
import {ErrorMessage} from "../../common/errors/ErrorMessage";
import {TypeDBClientError} from "../../common/errors/TypeDBClientError";
import NEGATIVE_VALUE_NOT_ALLOWED = ErrorMessage.Client.NEGATIVE_VALUE_NOT_ALLOWED;

namespace Opts {
Expand All @@ -33,6 +33,7 @@ namespace Opts {
prefetchSize?: number;
prefetch?: boolean;
sessionIdleTimeoutMillis?: number;
transactionTimeoutMillis?: number;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new transaction option: millis before timeout

schemaLockAcquireTimeoutMillis?: number;
}

Expand All @@ -50,6 +51,7 @@ namespace Opts {
if (options.prefetchSize != null) optionsProto.setPrefetchSize(options.prefetchSize);
if (options.prefetch != null) optionsProto.setPrefetch(options.prefetch);
if (options.sessionIdleTimeoutMillis != null) optionsProto.setSessionIdleTimeoutMillis(options.sessionIdleTimeoutMillis);
if (options.transactionTimeoutMillis != null) optionsProto.setTransactionTimeoutMillis(options.transactionTimeoutMillis);
if (options.schemaLockAcquireTimeoutMillis != null) optionsProto.setSchemaLockAcquireTimeoutMillis(options.schemaLockAcquireTimeoutMillis);
if (options.isCluster()) {
const clusterOptions = options as Opts.Cluster;
Expand All @@ -69,6 +71,7 @@ export class TypeDBOptions implements Opts.Core {
private _prefetchSize: number;
private _prefetch: boolean;
private _sessionIdleTimeoutMillis: number;
private _transactionTimeoutMillis: number;
private _schemaLockAcquireTimeoutMillis: number;

constructor(obj: { [K in keyof Opts.Core]: Opts.Core[K] } = {}) {
Expand Down Expand Up @@ -138,11 +141,22 @@ export class TypeDBOptions implements Opts.Core {
return this._sessionIdleTimeoutMillis;
}

set sessionIdleTimeoutMillis(value: number) {
if (value < 1) {
throw new TypeDBClientError(NEGATIVE_VALUE_NOT_ALLOWED.message(value));
set sessionIdleTimeoutMillis(millis: number) {
if (millis < 1) {
throw new TypeDBClientError(NEGATIVE_VALUE_NOT_ALLOWED.message(millis));
}
this._sessionIdleTimeoutMillis = millis;
}

get transactionTimeoutMillis() {
return this._transactionTimeoutMillis;
}

set transactionTimeoutMillis(millis: number) {
if (millis < 1) {
throw new TypeDBClientError(NEGATIVE_VALUE_NOT_ALLOWED.message(millis));
}
this._sessionIdleTimeoutMillis = value;
this._transactionTimeoutMillis = millis;
Comment on lines +151 to +159
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setter/getter for transaction timeouts

}

get schemaLockAcquireTimeoutMillis() {
Expand Down
2 changes: 1 addition & 1 deletion api/connection/TypeDBTransaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export interface TypeDBTransaction {

rollback(): void;

close(): void;
close(): Promise<void>;
}

export interface TransactionType {
Expand Down
32 changes: 17 additions & 15 deletions common/errors/ErrorMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,22 @@ export namespace ErrorMessage {
export const SESSION_ID_EXISTS = new Client(1, (args: Stringable[]) => `The newly opened session id '${args[0]}' already exists`);
export const SESSION_CLOSED = new Client(2, () => `Session is closed.`);
export const TRANSACTION_CLOSED = new Client(3, () => `The transaction has been closed and no further operation is allowed.`);
export const UNABLE_TO_CONNECT = new Client(4, () => `Unable to connect to TypeDB server.`);
export const NEGATIVE_VALUE_NOT_ALLOWED = new Client(5, (args: Stringable[]) => `Value cannot be less than 1, was: '${args[0]}'.`);
export const MISSING_DB_NAME = new Client(6, () => `Database name cannot be null.`);
export const DB_DOES_NOT_EXIST = new Client(7, (args: Stringable[]) => `The database '${args[0]}' does not exist.`);
export const UNKNOWN_STREAM_STATE = new Client(8, (args: Stringable[]) => `RPC transaction stream response '${args[0]}' is unknown.`);
export const MISSING_RESPONSE = new Client(9, (args: Stringable[]) => `The required field 'res' of type '${args[0]}' was not set.`);
export const UNKNOWN_REQUEST_ID = new Client(10, (args: Stringable[]) => `Received a response with unknown request id '${args[0]}'.`);
export const CLUSTER_NO_PRIMARY_REPLICA_YET = new Client(11, (args: Stringable[]) => `No replica has been marked as the primary replica for latest known term '${args[0]}'.`);
export const CLUSTER_UNABLE_TO_CONNECT = new Client(12, (args: Stringable[]) => `Unable to connect to TypeDB Cluster. Attempted connecting to the cluster members, but none are available: '${args[0]}'.`);
export const CLUSTER_REPLICA_NOT_PRIMARY = new Client(13, () => `The replica is not the primary replica.`);
export const CLUSTER_ALL_NODES_FAILED = new Client(14, (args: Stringable[]) => `Attempted connecting to all cluster members, but the following errors occurred: \n'${args[0]}'`);
export const CLUSTER_USER_DOES_NOT_EXIST = new Client(15, (args: Stringable[]) => `The user '${args[0]}' does not exist.`);
export const CLUSTER_TOKEN_CREDENTIAL_INVALID = new Client(16, (args: Stringable[]) => `Invalid token credential.`);
export const CLUSTER_INVALID_ROOT_CA_PATH = new Client(17, (args: Stringable[]) => `The provided Root CA path '${args[0]}' does not exist`);
export const UNRECOGNISED_SESSION_TYPE = new Client(18, (args: Stringable[]) => `Session type '${args[0]}' was not recognised.`);
export const TRANSACTION_CLOSED_WITH_ERRORS = new Client(4, (args: Stringable[]) => `The transaction has been closed with error(s): \n${args[0]}.`)
export const UNABLE_TO_CONNECT = new Client(5, () => `Unable to connect to TypeDB server.`);
export const NEGATIVE_VALUE_NOT_ALLOWED = new Client(6, (args: Stringable[]) => `Value cannot be less than 1, was: '${args[0]}'.`);
export const MISSING_DB_NAME = new Client(7, () => `Database name cannot be null.`);
export const DB_DOES_NOT_EXIST = new Client(8, (args: Stringable[]) => `The database '${args[0]}' does not exist.`);
export const UNKNOWN_STREAM_STATE = new Client(9, (args: Stringable[]) => `RPC transaction stream response '${args[0]}' is unknown.`);
export const MISSING_RESPONSE = new Client(10, (args: Stringable[]) => `The required field 'res' of type '${args[0]}' was not set.`);
export const UNKNOWN_REQUEST_ID = new Client(11, (args: Stringable[]) => `Received a response with unknown request id '${args[0]}'.`);
export const CLUSTER_NO_PRIMARY_REPLICA_YET = new Client(12, (args: Stringable[]) => `No replica has been marked as the primary replica for latest known term '${args[0]}'.`);
export const CLUSTER_UNABLE_TO_CONNECT = new Client(13, (args: Stringable[]) => `Unable to connect to TypeDB Cluster. Attempted connecting to the cluster members, but none are available: '${args[1]}'.`);
export const CLUSTER_REPLICA_NOT_PRIMARY = new Client(14, () => `The replica is not the primary replica.`);
export const CLUSTER_ALL_NODES_FAILED = new Client(15, (args: Stringable[]) => `Attempted connecting to all cluster members, but the following errors occurred: \n'${args[0]}'`);
export const CLUSTER_USER_DOES_NOT_EXIST = new Client(16, (args: Stringable[]) => `The user '${args[0]}' does not exist.`);
export const CLUSTER_TOKEN_CREDENTIAL_INVALID = new Client(17, (args: Stringable[]) => `Invalid token credential.`);
export const CLUSTER_INVALID_ROOT_CA_PATH = new Client(18, (args: Stringable[]) => `The provided Root CA path '${args[0]}' does not exist`);
export const UNRECOGNISED_SESSION_TYPE = new Client(19, (args: Stringable[]) => `Session type '${args[0]}' was not recognised.`);
}

export class Concept extends ErrorMessage {
Expand Down Expand Up @@ -132,5 +133,6 @@ export namespace ErrorMessage {
export namespace Internal {
export const ILLEGAL_CAST = new Internal(1, (args: Stringable[]) => `Illegal casting operation from '${args[0]}' to '${args[1]}'.`);
export const ILLEGAL_ARGUMENT = new Internal(2, (args: Stringable[]) => `Illegal argument provided: '${args[0]}'`);
export const ILLEGAL_STATE = new Internal(3, () => `Illegal state.`);
}
}
13 changes: 8 additions & 5 deletions common/rpc/TypeDBStub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
*/


import {ClientDuplexStream, ServiceError} from "@grpc/grpc-js";
import { Session } from "typedb-protocol/common/session_pb";
import { CoreDatabase as CoreDatabaseProto, CoreDatabaseManager as CoreDatabaseMgrProto } from "typedb-protocol/core/core_database_pb";
import { TypeDBClient } from "typedb-protocol/core/core_service_grpc_pb";
import { TypeDBDatabaseImpl } from "../../connection/TypeDBDatabaseImpl";
import {ClientDuplexStream} from "@grpc/grpc-js";
import {Session} from "typedb-protocol/common/session_pb";
import {
CoreDatabase as CoreDatabaseProto,
CoreDatabaseManager as CoreDatabaseMgrProto
} from "typedb-protocol/core/core_database_pb";
import {TypeDBClient} from "typedb-protocol/core/core_service_grpc_pb";
import {TypeDBDatabaseImpl} from "../../connection/TypeDBDatabaseImpl";
import * as common_transaction_pb from "typedb-protocol/common/transaction_pb";
import {TypeDBClientError} from "../errors/TypeDBClientError";

Expand Down
6 changes: 4 additions & 2 deletions connection/TypeDBClientImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ export abstract class TypeDBClientImpl implements TypeDBClient {
throw new TypeDBClientError(ILLEGAL_CAST.message(this.constructor.toString(), "ClusterClient"));
}

close(): void {
async close(): Promise<void> {
if (this._isOpen) {
this._isOpen = false;
Object.values(this._sessions).forEach(s => s.close());
for (const session of Object.values(Object.values(this._sessions))) {
await session.close();
}
this._requestTransmitter.close();
}
}
Expand Down
10 changes: 5 additions & 5 deletions connection/TypeDBDatabaseImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
* under the License.
*/

import { Database } from "../api/connection/database/Database";
import { ErrorMessage } from "../common/errors/ErrorMessage";
import { TypeDBClientError } from "../common/errors/TypeDBClientError";
import { RequestBuilder } from "../common/rpc/RequestBuilder";
import { TypeDBStub } from "../common/rpc/TypeDBStub";
import {Database} from "../api/connection/database/Database";
import {ErrorMessage} from "../common/errors/ErrorMessage";
import {TypeDBClientError} from "../common/errors/TypeDBClientError";
import {RequestBuilder} from "../common/rpc/RequestBuilder";
import {TypeDBStub} from "../common/rpc/TypeDBStub";

export class TypeDBDatabaseImpl implements Database {

Expand Down
19 changes: 10 additions & 9 deletions connection/TypeDBDatabaseManagerImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
* under the License.
*/

import { Database } from "../api/connection/database/Database";
import { DatabaseManager } from "../api/connection/database/DatabaseManager";
import { ErrorMessage } from "../common/errors/ErrorMessage";
import { TypeDBClientError } from "../common/errors/TypeDBClientError";
import { RequestBuilder } from "../common/rpc/RequestBuilder";
import { TypeDBStub } from "../common/rpc/TypeDBStub";
import { TypeDBDatabaseImpl } from "./TypeDBDatabaseImpl";
import {Database} from "../api/connection/database/Database";
import {DatabaseManager} from "../api/connection/database/DatabaseManager";
import {ErrorMessage} from "../common/errors/ErrorMessage";
import {TypeDBClientError} from "../common/errors/TypeDBClientError";
import {RequestBuilder} from "../common/rpc/RequestBuilder";
import {TypeDBStub} from "../common/rpc/TypeDBStub";
import {TypeDBDatabaseImpl} from "./TypeDBDatabaseImpl";

export class TypeDBDatabaseManagerImpl implements DatabaseManager {

Expand All @@ -36,8 +36,9 @@ export class TypeDBDatabaseManagerImpl implements DatabaseManager {
}

public async get(name: string): Promise<Database> {
if (await this.contains(name)) return new TypeDBDatabaseImpl(name, this._stub);
else throw new TypeDBClientError(ErrorMessage.Client.DB_DOES_NOT_EXIST);
if (await this.contains(name)) {
return new TypeDBDatabaseImpl(name, this._stub);
} else throw new TypeDBClientError(ErrorMessage.Client.DB_DOES_NOT_EXIST.message(name));
}

public create(name: string): Promise<void> {
Expand Down
27 changes: 14 additions & 13 deletions connection/TypeDBSessionImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@
* under the License.
*/

import { Database } from "../api/connection/database/Database";
import { TypeDBOptions } from "../api/connection/TypeDBOptions";
import { SessionType, TypeDBSession } from "../api/connection/TypeDBSession";
import { TransactionType, TypeDBTransaction } from "../api/connection/TypeDBTransaction";
import { ErrorMessage } from "../common/errors/ErrorMessage";
import { TypeDBClientError } from "../common/errors/TypeDBClientError";
import { RequestBuilder } from "../common/rpc/RequestBuilder";
import { TypeDBStub } from "../common/rpc/TypeDBStub";
import { RequestTransmitter } from "../stream/RequestTransmitter";
import { CoreClient } from "./core/CoreClient";
import { TypeDBTransactionImpl } from "./TypeDBTransactionImpl";
import SESSION_CLOSED = ErrorMessage.Client.SESSION_CLOSED;
import {Database} from "../api/connection/database/Database";
import {TypeDBOptions} from "../api/connection/TypeDBOptions";
import {SessionType, TypeDBSession} from "../api/connection/TypeDBSession";
import {TransactionType, TypeDBTransaction} from "../api/connection/TypeDBTransaction";
import {ErrorMessage} from "../common/errors/ErrorMessage";
import {TypeDBClientError} from "../common/errors/TypeDBClientError";
import {RequestBuilder} from "../common/rpc/RequestBuilder";
import {TypeDBStub} from "../common/rpc/TypeDBStub";
import {RequestTransmitter} from "../stream/RequestTransmitter";
import {TypeDBTransactionImpl} from "./TypeDBTransactionImpl";
import {TypeDBClientImpl} from "./TypeDBClientImpl";
import SESSION_CLOSED = ErrorMessage.Client.SESSION_CLOSED;

export class TypeDBSessionImpl implements TypeDBSession {

Expand Down Expand Up @@ -70,7 +69,9 @@ export class TypeDBSessionImpl implements TypeDBSession {
async close(): Promise<void> {
if (this._isOpen) {
this._isOpen = false;
this._transactions.forEach(tx => tx.close());
for (const tx of this._transactions) {
await tx.close();
}
this._client.closeSession(this);
clearTimeout(this._pulse);
const req = RequestBuilder.Session.closeReq(this._id);
Expand Down
44 changes: 27 additions & 17 deletions connection/TypeDBTransactionImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,25 @@
* under the License.
*/

import { Transaction } from "typedb-protocol/common/transaction_pb";
import { ConceptManager } from "../api/concept/ConceptManager";
import { TypeDBOptions } from "../api/connection/TypeDBOptions";
import { TransactionType, TypeDBTransaction } from "../api/connection/TypeDBTransaction";
import { LogicManager } from "../api/logic/LogicManager";
import { QueryManager } from "../api/query/QueryManager";
import { ErrorMessage } from "../common/errors/ErrorMessage";
import { TypeDBClientError } from "../common/errors/TypeDBClientError";
import { RequestBuilder } from "../common/rpc/RequestBuilder";
import { Stream } from "../common/util/Stream";
import { ConceptManagerImpl } from "../concept/ConceptManagerImpl";
import { LogicManagerImpl } from "../logic/LogicManagerImpl";
import { QueryManagerImpl } from "../query/QueryManagerImpl";
import { BidirectionalStream } from "../stream/BidirectionalStream";
import { TypeDBSessionImpl } from "./TypeDBSessionImpl";
import {Transaction} from "typedb-protocol/common/transaction_pb";
import {ConceptManager} from "../api/concept/ConceptManager";
import {TypeDBOptions} from "../api/connection/TypeDBOptions";
import {TransactionType, TypeDBTransaction} from "../api/connection/TypeDBTransaction";
import {LogicManager} from "../api/logic/LogicManager";
import {QueryManager} from "../api/query/QueryManager";
import {ErrorMessage} from "../common/errors/ErrorMessage";
import {TypeDBClientError} from "../common/errors/TypeDBClientError";
import {RequestBuilder} from "../common/rpc/RequestBuilder";
import {Stream} from "../common/util/Stream";
import {ConceptManagerImpl} from "../concept/ConceptManagerImpl";
import {LogicManagerImpl} from "../logic/LogicManagerImpl";
import {QueryManagerImpl} from "../query/QueryManagerImpl";
import {BidirectionalStream} from "../stream/BidirectionalStream";
import {TypeDBSessionImpl} from "./TypeDBSessionImpl";
import assert = require("assert");
Comment on lines +22 to +37
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto format....

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've never used assert before in Client NodeJS's production code.

Unlike in Java, there is no clean way to turn off assertions in NodeJS. (Methods typically used include hot module replacement - literally stripping the statements out of the source code before running it - and modifying the assert prototype to replace assert with a no-op function at runtime!)

I think we're better off replacing it with throw new TypeDBClientError. We've done this elsewhere (see TypeDBClientImpl.session which throws a Client NodeJS-only error if the session ID already exists).

flyingsilverfin marked this conversation as resolved.
Show resolved Hide resolved
import TRANSACTION_CLOSED = ErrorMessage.Client.TRANSACTION_CLOSED;
import TRANSACTION_CLOSED_WITH_ERRORS = ErrorMessage.Client.TRANSACTION_CLOSED_WITH_ERRORS;
import ILLEGAL_STATE = ErrorMessage.Internal.ILLEGAL_STATE;

export class TypeDBTransactionImpl implements TypeDBTransaction.Extended {
private readonly _session: TypeDBSessionImpl;
Expand Down Expand Up @@ -105,13 +108,20 @@ export class TypeDBTransactionImpl implements TypeDBTransaction.Extended {
}

public async rpcExecute(request: Transaction.Req, batch?: boolean): Promise<Transaction.Res> {
if (!this.isOpen()) throw new TypeDBClientError(TRANSACTION_CLOSED);
if (!this.isOpen()) this.throwTransactionClosed()
const useBatch = batch !== false;
return this._bidirectionalStream.single(request, useBatch);
}

public rpcStream(request: Transaction.Req): Stream<Transaction.ResPart> {
if (!this.isOpen()) throw new TypeDBClientError(TRANSACTION_CLOSED);
if (!this.isOpen()) this.throwTransactionClosed();
return this._bidirectionalStream.stream(request);
}

private throwTransactionClosed(): void {
if (this.isOpen()) throw new TypeDBClientError(ILLEGAL_STATE);
const errors = this._bidirectionalStream.getErrors();
if (errors.length == 0) throw new TypeDBClientError(TRANSACTION_CLOSED);
else throw new TypeDBClientError(TRANSACTION_CLOSED_WITH_ERRORS.message(errors));
}
}
6 changes: 4 additions & 2 deletions connection/cluster/ClusterClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,12 @@ export class ClusterClient implements TypeDBClient.Cluster {
return this;
}

close(): void {
async close(): Promise<void> {
if (this._isOpen) {
this._isOpen = false;
Object.values(this._serverClients).forEach(client => client.close());
for (const serverClient of Object.values(this._serverClients)) {
await serverClient.close();
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions connection/cluster/ClusterServerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ export class ClusterServerClient extends TypeDBClientImpl {
return this._databases;
}

close() {
super.close();
async close(): Promise<void> {
await super.close();
this._stub.close();
}
}
4 changes: 2 additions & 2 deletions connection/core/CoreClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ export class CoreClient extends TypeDBClientImpl {
return this._stub;
}

close() {
super.close();
async close(): Promise<void> {
await super.close();
this._stub.close();
}
}
4 changes: 2 additions & 2 deletions dependencies/vaticle/artifacts.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def vaticle_typedb_artifacts():
artifact_name = "typedb-server-{platform}-{version}.{ext}",
tag_source = deployment["artifact.release"],
commit_source = deployment["artifact.snapshot"],
commit = "6ed020e52fe379d1100f64511805ed344c7a68db"
tag = "2.6.1",
)

def vaticle_typedb_cluster_artifacts():
Expand All @@ -39,5 +39,5 @@ def vaticle_typedb_cluster_artifacts():
artifact_name = "typedb-cluster-all-{platform}-{version}.{ext}",
tag_source = deployment_private["artifact.release"],
commit_source = deployment_private["artifact.snapshot"],
commit = "f6d9eae58c83c165d932c0433a3a0d858e206fab",
commit = "f6d9eae58c83c165d932c0433a3a0d858e206fab"
)
Loading