Skip to content

Commit

Permalink
feat(core): Expose withTransaction method on TransactionalConnection
Browse files Browse the repository at this point in the history
Closes #1129
  • Loading branch information
michaelbromley committed Oct 5, 2021
1 parent 3b3bb3b commit 861ef29
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 108 deletions.
49 changes: 49 additions & 0 deletions packages/core/e2e/database-transactions.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,46 @@ describe('Transaction infrastructure', () => {
expect(!!verify.users.find((u: any) => u.identifier === 'test4')).toBe(true);
});

it('failing mutation inside connection.withTransaction() wrapper with request context', async () => {
try {
await adminClient.query(CREATE_ADMIN5, {
emailAddress: 'test5',
fail: true,
noContext: false,
});
fail('Should have thrown');
} catch (e) {
expect(e.message).toContain('Failed!');
}

const { verify } = await adminClient.query(VERIFY_TEST);

expect(verify.admins.length).toBe(2);
expect(verify.users.length).toBe(3);
expect(!!verify.admins.find((a: any) => a.emailAddress === 'test5')).toBe(false);
expect(!!verify.users.find((u: any) => u.identifier === 'test5')).toBe(false);
});

it('failing mutation inside connection.withTransaction() wrapper without request context', async () => {
try {
await adminClient.query(CREATE_ADMIN5, {
emailAddress: 'test5',
fail: true,
noContext: true,
});
fail('Should have thrown');
} catch (e) {
expect(e.message).toContain('Failed!');
}

const { verify } = await adminClient.query(VERIFY_TEST);

expect(verify.admins.length).toBe(2);
expect(verify.users.length).toBe(3);
expect(!!verify.admins.find((a: any) => a.emailAddress === 'test5')).toBe(false);
expect(!!verify.users.find((u: any) => u.identifier === 'test5')).toBe(false);
});

// Testing https://github.com/vendure-ecommerce/vendure/issues/520
it('passing transaction via EventBus', async () => {
TransactionTestPlugin.reset();
Expand Down Expand Up @@ -179,6 +219,15 @@ const CREATE_ADMIN4 = gql`
${ADMIN_FRAGMENT}
`;

const CREATE_ADMIN5 = gql`
mutation CreateTestAdmin5($emailAddress: String!, $fail: Boolean!, $noContext: Boolean!) {
createTestAdministrator5(emailAddress: $emailAddress, fail: $fail, noContext: $noContext) {
...CreatedAdmin
}
}
${ADMIN_FRAGMENT}
`;

const VERIFY_TEST = gql`
query VerifyTest {
verify {
Expand Down
28 changes: 28 additions & 0 deletions packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,29 @@ class TestResolver {
return admin;
}

@Mutation()
async createTestAdministrator5(@Ctx() ctx: RequestContext, @Args() args: any) {
if (args.noContext === true) {
return this.connection.withTransaction(ctx, async _ctx => {
const admin = await this.testAdminService.createAdministrator(
_ctx,
args.emailAddress,
args.fail,
);
return admin;
});
} else {
return this.connection.withTransaction(async _ctx => {
const admin = await this.testAdminService.createAdministrator(
_ctx,
args.emailAddress,
args.fail,
);
return admin;
});
}
}

@Query()
async verify() {
const admins = await this.connection.getRepository(Administrator).find();
Expand All @@ -130,6 +153,11 @@ class TestResolver {
createTestAdministrator2(emailAddress: String!, fail: Boolean!): Administrator
createTestAdministrator3(emailAddress: String!, fail: Boolean!): Administrator
createTestAdministrator4(emailAddress: String!, fail: Boolean!): Administrator
createTestAdministrator5(
emailAddress: String!
fail: Boolean!
noContext: Boolean!
): Administrator
}
type VerifyResult {
admins: [Administrator!]!
Expand Down
120 changes: 15 additions & 105 deletions packages/core/src/api/middleware/transaction-interceptor.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from '@nestjs/common';
import { Reflector } from '@nestjs/core';
import { Observable, of } from 'rxjs';
import { retryWhen, take, tap } from 'rxjs/operators';
import { QueryRunner } from 'typeorm';
import { TransactionAlreadyStartedError } from 'typeorm/error/TransactionAlreadyStartedError';

import { REQUEST_CONTEXT_KEY, TRANSACTION_MANAGER_KEY } from '../../common/constants';
import { REQUEST_CONTEXT_KEY } from '../../common/constants';
import { TransactionWrapper } from '../../connection/transaction-wrapper';
import { TransactionalConnection } from '../../connection/transactional-connection';
import { parseContext } from '../common/parse-context';
import { RequestContext } from '../common/request-context';
import { TransactionMode, TRANSACTION_MODE_METADATA_KEY } from '../decorators/transaction.decorator';

/**
Expand All @@ -18,7 +15,11 @@ import { TransactionMode, TRANSACTION_MODE_METADATA_KEY } from '../decorators/tr
*/
@Injectable()
export class TransactionInterceptor implements NestInterceptor {
constructor(private connection: TransactionalConnection, private reflector: Reflector) {}
constructor(
private connection: TransactionalConnection,
private transactionWrapper: TransactionWrapper,
private reflector: Reflector,
) {}
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
const { isGraphQL, req } = parseContext(context);
const ctx = (req as any)[REQUEST_CONTEXT_KEY];
Expand All @@ -27,107 +28,16 @@ export class TransactionInterceptor implements NestInterceptor {
TRANSACTION_MODE_METADATA_KEY,
context.getHandler(),
);
return of(this.withTransaction(ctx, () => next.handle(), transactionMode));
return of(
this.transactionWrapper.executeInTransaction(
ctx,
() => next.handle(),
transactionMode,
this.connection.rawConnection,
),
);
} else {
return next.handle();
}
}

/**
* @description
* Executes the `work` function within the context of a transaction.
*/
private async withTransaction<T>(
ctx: RequestContext,
work: () => Observable<T>,
mode: TransactionMode,
): Promise<T> {
const queryRunnerExists = !!(ctx as any)[TRANSACTION_MANAGER_KEY];
if (queryRunnerExists) {
// If a QueryRunner already exists on the RequestContext, there must be an existing
// outer transaction in progress. In that case, we just execute the work function
// as usual without needing to further wrap in a transaction.
return work().toPromise();
}
const queryRunner = this.connection.rawConnection.createQueryRunner();
if (mode === 'auto') {
await this.startTransaction(queryRunner);
}
(ctx as any)[TRANSACTION_MANAGER_KEY] = queryRunner.manager;

try {
const maxRetries = 5;
const result = await work()
.pipe(
retryWhen(errors =>
errors.pipe(
tap(err => {
if (!this.isRetriableError(err)) {
throw err;
}
}),
take(maxRetries),
),
),
)
.toPromise();
if (queryRunner.isTransactionActive) {
await queryRunner.commitTransaction();
}
return result;
} catch (error) {
if (queryRunner.isTransactionActive) {
await queryRunner.rollbackTransaction();
}
throw error;
} finally {
if (queryRunner?.isReleased === false) {
await queryRunner.release();
}
}
}

/**
* Attempts to start a DB transaction, with retry logic in the case that a transaction
* is already started for the connection (which is mainly a problem with SQLite/Sql.js)
*/
private async startTransaction(queryRunner: QueryRunner) {
const maxRetries = 25;
let attempts = 0;
let lastError: any;
// Returns false if a transaction is already in progress
async function attemptStartTransaction(): Promise<boolean> {
try {
await queryRunner.startTransaction();
return true;
} catch (err) {
lastError = err;
if (err instanceof TransactionAlreadyStartedError) {
return false;
}
throw err;
}
}
while (attempts < maxRetries) {
const result = await attemptStartTransaction();
if (result) {
return;
}
attempts++;
// insert an increasing delay before retrying
await new Promise(resolve => setTimeout(resolve, attempts * 20));
}
throw lastError;
}

/**
* If the resolver function throws an error, there are certain cases in which
* we want to retry the whole thing again - notably in the case of a deadlock
* situation, which can usually be retried with success.
*/
private isRetriableError(err: any): boolean {
const mysqlDeadlock = err.code === 'ER_LOCK_DEADLOCK';
const postgresDeadlock = err.code === 'deadlock_detected';
return mysqlDeadlock || postgresDeadlock;
}
}
5 changes: 3 additions & 2 deletions packages/core/src/connection/connection.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import { ConfigService } from '../config/config.service';
import { TypeOrmLogger } from '../config/logger/typeorm-logger';

import { TransactionSubscriber } from './transaction-subscriber';
import { TransactionWrapper } from './transaction-wrapper';
import { TransactionalConnection } from './transactional-connection';

let defaultTypeOrmModule: DynamicModule;

@Module({
imports: [ConfigModule],
providers: [TransactionalConnection, TransactionSubscriber],
exports: [TransactionalConnection, TransactionSubscriber],
providers: [TransactionalConnection, TransactionSubscriber, TransactionWrapper],
exports: [TransactionalConnection, TransactionSubscriber, TransactionWrapper],
})
export class ConnectionModule {
static forRoot(): DynamicModule {
Expand Down
118 changes: 118 additions & 0 deletions packages/core/src/connection/transaction-wrapper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import { from, Observable, of } from 'rxjs';
import { retryWhen, take, tap } from 'rxjs/operators';
import { Connection, QueryRunner } from 'typeorm';
import { TransactionAlreadyStartedError } from 'typeorm/error/TransactionAlreadyStartedError';

import { RequestContext } from '../api/common/request-context';
import { TransactionMode } from '../api/decorators/transaction.decorator';
import { TRANSACTION_MANAGER_KEY } from '../common/constants';

/**
* @description
* This helper class is used to wrap operations in a TypeORM transaction in order to ensure
* atomic operations on the database.
*/
export class TransactionWrapper {
/**
* @description
* Executes the `work` function within the context of a transaction. If the `work` function
* resolves / completes, then all the DB operations it contains will be committed. If it
* throws an error or rejects, then all DB operations will be rolled back.
*/
async executeInTransaction<T>(
ctx: RequestContext,
work: () => Observable<T> | Promise<T>,
mode: TransactionMode,
connection: Connection,
): Promise<T> {
const queryRunnerExists = !!(ctx as any)[TRANSACTION_MANAGER_KEY];
if (queryRunnerExists) {
// If a QueryRunner already exists on the RequestContext, there must be an existing
// outer transaction in progress. In that case, we just execute the work function
// as usual without needing to further wrap in a transaction.
return from(work()).toPromise();
}
const queryRunner = connection.createQueryRunner();
if (mode === 'auto') {
await this.startTransaction(queryRunner);
}
(ctx as any)[TRANSACTION_MANAGER_KEY] = queryRunner.manager;

try {
const maxRetries = 5;
const result = await from(work())
.pipe(
retryWhen(errors =>
errors.pipe(
tap(err => {
if (!this.isRetriableError(err)) {
throw err;
}
}),
take(maxRetries),
),
),
)
.toPromise();
if (queryRunner.isTransactionActive) {
await queryRunner.commitTransaction();
}
return result;
} catch (error) {
if (queryRunner.isTransactionActive) {
await queryRunner.rollbackTransaction();
}
throw error;
} finally {
if (queryRunner?.isReleased === false) {
await queryRunner.release();
}
}
}

/**
* Attempts to start a DB transaction, with retry logic in the case that a transaction
* is already started for the connection (which is mainly a problem with SQLite/Sql.js)
*/
private async startTransaction(queryRunner: QueryRunner) {
const maxRetries = 25;
let attempts = 0;
let lastError: any;

// Returns false if a transaction is already in progress
async function attemptStartTransaction(): Promise<boolean> {
try {
await queryRunner.startTransaction();
return true;
} catch (err) {
lastError = err;
if (err instanceof TransactionAlreadyStartedError) {
return false;
}
throw err;
}
}

while (attempts < maxRetries) {
const result = await attemptStartTransaction();
if (result) {
return;
}
attempts++;
// insert an increasing delay before retrying
await new Promise(resolve => setTimeout(resolve, attempts * 20));
}
throw lastError;
}

/**
* If the resolver function throws an error, there are certain cases in which
* we want to retry the whole thing again - notably in the case of a deadlock
* situation, which can usually be retried with success.
*/
private isRetriableError(err: any): boolean {
const mysqlDeadlock = err.code === 'ER_LOCK_DEADLOCK';
const postgresDeadlock = err.code === 'deadlock_detected';
return mysqlDeadlock || postgresDeadlock;
}
}
Loading

0 comments on commit 861ef29

Please sign in to comment.