diff --git a/packages/core/e2e/database-transactions.e2e-spec.ts b/packages/core/e2e/database-transactions.e2e-spec.ts index 5f38704eba..07198a5005 100644 --- a/packages/core/e2e/database-transactions.e2e-spec.ts +++ b/packages/core/e2e/database-transactions.e2e-spec.ts @@ -107,6 +107,46 @@ describe('Transaction infrastructure', () => { expect(!!verify.users.find((u: any) => u.identifier === 'test4')).toBe(true); }); + it('failing mutation inside connection.withTransaction() wrapper with request context', async () => { + try { + await adminClient.query(CREATE_ADMIN5, { + emailAddress: 'test5', + fail: true, + noContext: false, + }); + fail('Should have thrown'); + } catch (e) { + expect(e.message).toContain('Failed!'); + } + + const { verify } = await adminClient.query(VERIFY_TEST); + + expect(verify.admins.length).toBe(2); + expect(verify.users.length).toBe(3); + expect(!!verify.admins.find((a: any) => a.emailAddress === 'test5')).toBe(false); + expect(!!verify.users.find((u: any) => u.identifier === 'test5')).toBe(false); + }); + + it('failing mutation inside connection.withTransaction() wrapper without request context', async () => { + try { + await adminClient.query(CREATE_ADMIN5, { + emailAddress: 'test5', + fail: true, + noContext: true, + }); + fail('Should have thrown'); + } catch (e) { + expect(e.message).toContain('Failed!'); + } + + const { verify } = await adminClient.query(VERIFY_TEST); + + expect(verify.admins.length).toBe(2); + expect(verify.users.length).toBe(3); + expect(!!verify.admins.find((a: any) => a.emailAddress === 'test5')).toBe(false); + expect(!!verify.users.find((u: any) => u.identifier === 'test5')).toBe(false); + }); + // Testing https://github.com/vendure-ecommerce/vendure/issues/520 it('passing transaction via EventBus', async () => { TransactionTestPlugin.reset(); @@ -179,6 +219,15 @@ const CREATE_ADMIN4 = gql` ${ADMIN_FRAGMENT} `; +const CREATE_ADMIN5 = gql` + mutation CreateTestAdmin5($emailAddress: String!, $fail: Boolean!, $noContext: Boolean!) { + createTestAdministrator5(emailAddress: $emailAddress, fail: $fail, noContext: $noContext) { + ...CreatedAdmin + } + } + ${ADMIN_FRAGMENT} +`; + const VERIFY_TEST = gql` query VerifyTest { verify { diff --git a/packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts b/packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts index 327d20e72f..5e5c44a4d7 100644 --- a/packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts +++ b/packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts @@ -109,6 +109,29 @@ class TestResolver { return admin; } + @Mutation() + async createTestAdministrator5(@Ctx() ctx: RequestContext, @Args() args: any) { + if (args.noContext === true) { + return this.connection.withTransaction(ctx, async _ctx => { + const admin = await this.testAdminService.createAdministrator( + _ctx, + args.emailAddress, + args.fail, + ); + return admin; + }); + } else { + return this.connection.withTransaction(async _ctx => { + const admin = await this.testAdminService.createAdministrator( + _ctx, + args.emailAddress, + args.fail, + ); + return admin; + }); + } + } + @Query() async verify() { const admins = await this.connection.getRepository(Administrator).find(); @@ -130,6 +153,11 @@ class TestResolver { createTestAdministrator2(emailAddress: String!, fail: Boolean!): Administrator createTestAdministrator3(emailAddress: String!, fail: Boolean!): Administrator createTestAdministrator4(emailAddress: String!, fail: Boolean!): Administrator + createTestAdministrator5( + emailAddress: String! + fail: Boolean! + noContext: Boolean! + ): Administrator } type VerifyResult { admins: [Administrator!]! diff --git a/packages/core/src/api/middleware/transaction-interceptor.ts b/packages/core/src/api/middleware/transaction-interceptor.ts index d943740b86..3b074f870c 100644 --- a/packages/core/src/api/middleware/transaction-interceptor.ts +++ b/packages/core/src/api/middleware/transaction-interceptor.ts @@ -1,14 +1,11 @@ import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from '@nestjs/common'; import { Reflector } from '@nestjs/core'; import { Observable, of } from 'rxjs'; -import { retryWhen, take, tap } from 'rxjs/operators'; -import { QueryRunner } from 'typeorm'; -import { TransactionAlreadyStartedError } from 'typeorm/error/TransactionAlreadyStartedError'; -import { REQUEST_CONTEXT_KEY, TRANSACTION_MANAGER_KEY } from '../../common/constants'; +import { REQUEST_CONTEXT_KEY } from '../../common/constants'; +import { TransactionWrapper } from '../../connection/transaction-wrapper'; import { TransactionalConnection } from '../../connection/transactional-connection'; import { parseContext } from '../common/parse-context'; -import { RequestContext } from '../common/request-context'; import { TransactionMode, TRANSACTION_MODE_METADATA_KEY } from '../decorators/transaction.decorator'; /** @@ -18,7 +15,11 @@ import { TransactionMode, TRANSACTION_MODE_METADATA_KEY } from '../decorators/tr */ @Injectable() export class TransactionInterceptor implements NestInterceptor { - constructor(private connection: TransactionalConnection, private reflector: Reflector) {} + constructor( + private connection: TransactionalConnection, + private transactionWrapper: TransactionWrapper, + private reflector: Reflector, + ) {} intercept(context: ExecutionContext, next: CallHandler): Observable { const { isGraphQL, req } = parseContext(context); const ctx = (req as any)[REQUEST_CONTEXT_KEY]; @@ -27,107 +28,16 @@ export class TransactionInterceptor implements NestInterceptor { TRANSACTION_MODE_METADATA_KEY, context.getHandler(), ); - return of(this.withTransaction(ctx, () => next.handle(), transactionMode)); + return of( + this.transactionWrapper.executeInTransaction( + ctx, + () => next.handle(), + transactionMode, + this.connection.rawConnection, + ), + ); } else { return next.handle(); } } - - /** - * @description - * Executes the `work` function within the context of a transaction. - */ - private async withTransaction( - ctx: RequestContext, - work: () => Observable, - mode: TransactionMode, - ): Promise { - const queryRunnerExists = !!(ctx as any)[TRANSACTION_MANAGER_KEY]; - if (queryRunnerExists) { - // If a QueryRunner already exists on the RequestContext, there must be an existing - // outer transaction in progress. In that case, we just execute the work function - // as usual without needing to further wrap in a transaction. - return work().toPromise(); - } - const queryRunner = this.connection.rawConnection.createQueryRunner(); - if (mode === 'auto') { - await this.startTransaction(queryRunner); - } - (ctx as any)[TRANSACTION_MANAGER_KEY] = queryRunner.manager; - - try { - const maxRetries = 5; - const result = await work() - .pipe( - retryWhen(errors => - errors.pipe( - tap(err => { - if (!this.isRetriableError(err)) { - throw err; - } - }), - take(maxRetries), - ), - ), - ) - .toPromise(); - if (queryRunner.isTransactionActive) { - await queryRunner.commitTransaction(); - } - return result; - } catch (error) { - if (queryRunner.isTransactionActive) { - await queryRunner.rollbackTransaction(); - } - throw error; - } finally { - if (queryRunner?.isReleased === false) { - await queryRunner.release(); - } - } - } - - /** - * Attempts to start a DB transaction, with retry logic in the case that a transaction - * is already started for the connection (which is mainly a problem with SQLite/Sql.js) - */ - private async startTransaction(queryRunner: QueryRunner) { - const maxRetries = 25; - let attempts = 0; - let lastError: any; - // Returns false if a transaction is already in progress - async function attemptStartTransaction(): Promise { - try { - await queryRunner.startTransaction(); - return true; - } catch (err) { - lastError = err; - if (err instanceof TransactionAlreadyStartedError) { - return false; - } - throw err; - } - } - while (attempts < maxRetries) { - const result = await attemptStartTransaction(); - if (result) { - return; - } - attempts++; - // insert an increasing delay before retrying - await new Promise(resolve => setTimeout(resolve, attempts * 20)); - } - throw lastError; - } - - /** - * If the resolver function throws an error, there are certain cases in which - * we want to retry the whole thing again - notably in the case of a deadlock - * situation, which can usually be retried with success. - */ - private isRetriableError(err: any): boolean { - const mysqlDeadlock = err.code === 'ER_LOCK_DEADLOCK'; - const postgresDeadlock = err.code === 'deadlock_detected'; - return mysqlDeadlock || postgresDeadlock; - } } diff --git a/packages/core/src/connection/connection.module.ts b/packages/core/src/connection/connection.module.ts index 3c11aa8ace..608e92c568 100644 --- a/packages/core/src/connection/connection.module.ts +++ b/packages/core/src/connection/connection.module.ts @@ -7,14 +7,15 @@ import { ConfigService } from '../config/config.service'; import { TypeOrmLogger } from '../config/logger/typeorm-logger'; import { TransactionSubscriber } from './transaction-subscriber'; +import { TransactionWrapper } from './transaction-wrapper'; import { TransactionalConnection } from './transactional-connection'; let defaultTypeOrmModule: DynamicModule; @Module({ imports: [ConfigModule], - providers: [TransactionalConnection, TransactionSubscriber], - exports: [TransactionalConnection, TransactionSubscriber], + providers: [TransactionalConnection, TransactionSubscriber, TransactionWrapper], + exports: [TransactionalConnection, TransactionSubscriber, TransactionWrapper], }) export class ConnectionModule { static forRoot(): DynamicModule { diff --git a/packages/core/src/connection/transaction-wrapper.ts b/packages/core/src/connection/transaction-wrapper.ts new file mode 100644 index 0000000000..910110ba2b --- /dev/null +++ b/packages/core/src/connection/transaction-wrapper.ts @@ -0,0 +1,118 @@ +import { from, Observable, of } from 'rxjs'; +import { retryWhen, take, tap } from 'rxjs/operators'; +import { Connection, QueryRunner } from 'typeorm'; +import { TransactionAlreadyStartedError } from 'typeorm/error/TransactionAlreadyStartedError'; + +import { RequestContext } from '../api/common/request-context'; +import { TransactionMode } from '../api/decorators/transaction.decorator'; +import { TRANSACTION_MANAGER_KEY } from '../common/constants'; + +/** + * @description + * This helper class is used to wrap operations in a TypeORM transaction in order to ensure + * atomic operations on the database. + */ +export class TransactionWrapper { + /** + * @description + * Executes the `work` function within the context of a transaction. If the `work` function + * resolves / completes, then all the DB operations it contains will be committed. If it + * throws an error or rejects, then all DB operations will be rolled back. + */ + async executeInTransaction( + ctx: RequestContext, + work: () => Observable | Promise, + mode: TransactionMode, + connection: Connection, + ): Promise { + const queryRunnerExists = !!(ctx as any)[TRANSACTION_MANAGER_KEY]; + if (queryRunnerExists) { + // If a QueryRunner already exists on the RequestContext, there must be an existing + // outer transaction in progress. In that case, we just execute the work function + // as usual without needing to further wrap in a transaction. + return from(work()).toPromise(); + } + const queryRunner = connection.createQueryRunner(); + if (mode === 'auto') { + await this.startTransaction(queryRunner); + } + (ctx as any)[TRANSACTION_MANAGER_KEY] = queryRunner.manager; + + try { + const maxRetries = 5; + const result = await from(work()) + .pipe( + retryWhen(errors => + errors.pipe( + tap(err => { + if (!this.isRetriableError(err)) { + throw err; + } + }), + take(maxRetries), + ), + ), + ) + .toPromise(); + if (queryRunner.isTransactionActive) { + await queryRunner.commitTransaction(); + } + return result; + } catch (error) { + if (queryRunner.isTransactionActive) { + await queryRunner.rollbackTransaction(); + } + throw error; + } finally { + if (queryRunner?.isReleased === false) { + await queryRunner.release(); + } + } + } + + /** + * Attempts to start a DB transaction, with retry logic in the case that a transaction + * is already started for the connection (which is mainly a problem with SQLite/Sql.js) + */ + private async startTransaction(queryRunner: QueryRunner) { + const maxRetries = 25; + let attempts = 0; + let lastError: any; + + // Returns false if a transaction is already in progress + async function attemptStartTransaction(): Promise { + try { + await queryRunner.startTransaction(); + return true; + } catch (err) { + lastError = err; + if (err instanceof TransactionAlreadyStartedError) { + return false; + } + throw err; + } + } + + while (attempts < maxRetries) { + const result = await attemptStartTransaction(); + if (result) { + return; + } + attempts++; + // insert an increasing delay before retrying + await new Promise(resolve => setTimeout(resolve, attempts * 20)); + } + throw lastError; + } + + /** + * If the resolver function throws an error, there are certain cases in which + * we want to retry the whole thing again - notably in the case of a deadlock + * situation, which can usually be retried with success. + */ + private isRetriableError(err: any): boolean { + const mysqlDeadlock = err.code === 'ER_LOCK_DEADLOCK'; + const postgresDeadlock = err.code === 'deadlock_detected'; + return mysqlDeadlock || postgresDeadlock; + } +} diff --git a/packages/core/src/connection/transactional-connection.ts b/packages/core/src/connection/transactional-connection.ts index 9acd781bc6..4a224f8ef1 100644 --- a/packages/core/src/connection/transactional-connection.ts +++ b/packages/core/src/connection/transactional-connection.ts @@ -18,6 +18,7 @@ import { EntityNotFoundError } from '../common/error/errors'; import { ChannelAware, SoftDeletable } from '../common/types/common-types'; import { VendureEntity } from '../entity/base/base.entity'; +import { TransactionWrapper } from './transaction-wrapper'; import { GetEntityOrThrowOptions } from './types'; /** @@ -34,7 +35,10 @@ import { GetEntityOrThrowOptions } from './types'; */ @Injectable() export class TransactionalConnection { - constructor(@InjectConnection() private connection: Connection) {} + constructor( + @InjectConnection() private connection: Connection, + private transactionWrapper: TransactionWrapper, + ) {} /** * @description @@ -81,6 +85,62 @@ export class TransactionalConnection { } } + /** + * @description + * Allows database operations to be wrapped in a transaction, ensuring that in the event of an error being + * thrown at any point, the entire transaction will be rolled back and no changes will be saved. + * + * In the context of API requests, you should instead use the {@link Transaction} decorator on your resolver or + * controller method. + * + * On the other hand, for code that does not run in the context of a GraphQL/REST request, this method + * should be used to protect against non-atomic changes to the data which could leave your data in an + * inconsistent state. + * + * Such situations include function processed by the JobQueue or stand-alone scripts which make use + * of Vendure internal services. + * + * If there is already a {@link RequestContext} object available, you should pass it in as the first + * argument in order to add a new transaction to it. If not, omit the first argument and an empty + * RequestContext object will be created, which is then used to propagate the transaction to + * all inner method calls. + * + * @example + * ```TypeScript + * private async transferCredit(fromId: ID, toId: ID, amount: number) { + * await this.connection.withTransaction(ctx => { + * await this.giftCardService.updateCustomerCredit(fromId, -amount); + * + * // If some intermediate logic here throws an Error, + * // then all DB transactions will be rolled back and neither Customer's + * // credit balance will have changed. + * + * await this.giftCardService.updateCustomerCredit(toId, amount); + * }) + * } + * ``` + * + * @since v1.3.0 + */ + async withTransaction(work: (ctx: RequestContext) => Promise): Promise; + async withTransaction(ctx: RequestContext, work: (ctx: RequestContext) => Promise): Promise; + async withTransaction( + ctxOrWork: RequestContext | ((ctx: RequestContext) => Promise), + maybeWork?: (ctx: RequestContext) => Promise, + ): Promise { + let ctx: RequestContext; + let work: (ctx: RequestContext) => Promise; + if (ctxOrWork instanceof RequestContext) { + ctx = ctxOrWork; + // tslint:disable-next-line:no-non-null-assertion + work = maybeWork!; + } else { + ctx = RequestContext.empty(); + work = ctxOrWork; + } + return this.transactionWrapper.executeInTransaction(ctx, () => work(ctx), 'auto', this.rawConnection); + } + /** * @description * Manually start a transaction if one is not already in progress. This method should be used in