From f0fd66258b4bbc85ee6952bb908121b7726875d5 Mon Sep 17 00:00:00 2001 From: Michael Bromley Date: Wed, 29 Sep 2021 10:38:51 +0200 Subject: [PATCH] feat(core): Make event bus subscriptions transaction-safe Fixes #1107. Relates to #520. This commit introduces a new mechanism to guarantee that event subscribers do not run into issues with database transactions. It works by postponing the publishing on an event until the associated transaction has completed. Thus the subscriber can be assured that all data changes in the transaction are available to read right away. --- .../e2e/database-transactions.e2e-spec.ts | 36 ++++++++-- .../e2e/default-search-plugin.e2e-spec.ts | 8 +-- .../test-plugins/transaction-test-plugin.ts | 36 ++++++++-- .../core/src/config/config.service.mock.ts | 1 + .../core/src/connection/connection.module.ts | 6 +- .../src/connection/transaction-subscriber.ts | 67 +++++++++++++++++++ .../connection/transactional-connection.ts | 8 +-- .../core/src/event-bus/event-bus.module.ts | 3 + packages/core/src/event-bus/event-bus.ts | 54 +++++++++------ .../services/product-variant.service.ts | 3 - .../event-bus-transactions-plugin.ts | 63 +++++++++++++++++ 11 files changed, 241 insertions(+), 44 deletions(-) create mode 100644 packages/core/src/connection/transaction-subscriber.ts create mode 100644 packages/dev-server/test-plugins/event-bus-transactions-plugin.ts diff --git a/packages/core/e2e/database-transactions.e2e-spec.ts b/packages/core/e2e/database-transactions.e2e-spec.ts index c672081170..5f38704eba 100644 --- a/packages/core/e2e/database-transactions.e2e-spec.ts +++ b/packages/core/e2e/database-transactions.e2e-spec.ts @@ -2,11 +2,16 @@ import { mergeConfig } from '@vendure/core'; import { createTestEnvironment } from '@vendure/testing'; import gql from 'graphql-tag'; import path from 'path'; +import { ReplaySubject } from 'rxjs'; import { initialData } from '../../../e2e-common/e2e-initial-data'; import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config'; -import { TransactionTestPlugin, TRIGGER_EMAIL } from './fixtures/test-plugins/transaction-test-plugin'; +import { + TransactionTestPlugin, + TRIGGER_ATTEMPTED_READ_EMAIL, + TRIGGER_ATTEMPTED_UPDATE_EMAIL, +} from './fixtures/test-plugins/transaction-test-plugin'; describe('Transaction infrastructure', () => { const { server, adminClient } = createTestEnvironment( @@ -104,13 +109,25 @@ describe('Transaction infrastructure', () => { // Testing https://github.com/vendure-ecommerce/vendure/issues/520 it('passing transaction via EventBus', async () => { - TransactionTestPlugin.errorHandler.mockClear(); + TransactionTestPlugin.reset(); const { createTestAdministrator } = await adminClient.query(CREATE_ADMIN, { - emailAddress: TRIGGER_EMAIL, + emailAddress: TRIGGER_ATTEMPTED_UPDATE_EMAIL, fail: false, }); await TransactionTestPlugin.eventHandlerComplete$.toPromise(); - expect(createTestAdministrator.emailAddress).toBe(TRIGGER_EMAIL); + expect(createTestAdministrator.emailAddress).toBe(TRIGGER_ATTEMPTED_UPDATE_EMAIL); + expect(TransactionTestPlugin.errorHandler).not.toHaveBeenCalled(); + }); + + // Testing https://github.com/vendure-ecommerce/vendure/issues/1107 + it('passing transaction via EventBus with delay in committing transaction', async () => { + TransactionTestPlugin.reset(); + const { createTestAdministrator4 } = await adminClient.query(CREATE_ADMIN4, { + emailAddress: TRIGGER_ATTEMPTED_READ_EMAIL, + fail: false, + }); + await TransactionTestPlugin.eventHandlerComplete$.toPromise(); + expect(createTestAdministrator4.emailAddress).toBe(TRIGGER_ATTEMPTED_READ_EMAIL); expect(TransactionTestPlugin.errorHandler).not.toHaveBeenCalled(); }); }); @@ -145,7 +162,7 @@ const CREATE_ADMIN2 = gql` `; const CREATE_ADMIN3 = gql` - mutation CreateTestAdmin2($emailAddress: String!, $fail: Boolean!) { + mutation CreateTestAdmin3($emailAddress: String!, $fail: Boolean!) { createTestAdministrator3(emailAddress: $emailAddress, fail: $fail) { ...CreatedAdmin } @@ -153,6 +170,15 @@ const CREATE_ADMIN3 = gql` ${ADMIN_FRAGMENT} `; +const CREATE_ADMIN4 = gql` + mutation CreateTestAdmin4($emailAddress: String!, $fail: Boolean!) { + createTestAdministrator4(emailAddress: $emailAddress, fail: $fail) { + ...CreatedAdmin + } + } + ${ADMIN_FRAGMENT} +`; + const VERIFY_TEST = gql` query VerifyTest { verify { diff --git a/packages/core/e2e/default-search-plugin.e2e-spec.ts b/packages/core/e2e/default-search-plugin.e2e-spec.ts index f159cc2abd..f60a86d8fa 100644 --- a/packages/core/e2e/default-search-plugin.e2e-spec.ts +++ b/packages/core/e2e/default-search-plugin.e2e-spec.ts @@ -42,7 +42,7 @@ import { UpdateCollection, UpdateProduct, UpdateProductVariants, - UpdateTaxRate + UpdateTaxRate, } from './graphql/generated-e2e-admin-types'; import { LogicalOperator, SearchProductsShop } from './graphql/generated-e2e-shop-types'; import { @@ -565,7 +565,7 @@ describe('Default search plugin', () => { }, ); expect(result.search.collections).toEqual([ - {collection: {id: 'T_2', name: 'Plants',},count: 3,}, + { collection: { id: 'T_2', name: 'Plants' }, count: 3 }, ]); }); @@ -579,8 +579,8 @@ describe('Default search plugin', () => { }, ); expect(result.search.collections).toEqual([ - {collection: {id: 'T_2', name: 'Plants',},count: 3,}, - ]); + { collection: { id: 'T_2', name: 'Plants' }, count: 3 }, + ]); }); it('encodes the productId and productVariantId', async () => { diff --git a/packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts b/packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts index 66bc8ff6c5..327d20e72f 100644 --- a/packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts +++ b/packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts @@ -23,7 +23,8 @@ export class TestEvent extends VendureEvent { } } -export const TRIGGER_EMAIL = 'trigger-email'; +export const TRIGGER_ATTEMPTED_UPDATE_EMAIL = 'trigger-attempted-update-email'; +export const TRIGGER_ATTEMPTED_READ_EMAIL = 'trigger-attempted-read-email'; @Injectable() class TestUserService { @@ -99,6 +100,15 @@ class TestResolver { return this.testAdminService.createAdministrator(ctx, args.emailAddress, args.fail); } + @Mutation() + @Transaction() + async createTestAdministrator4(@Ctx() ctx: RequestContext, @Args() args: any) { + const admin = await this.testAdminService.createAdministrator(ctx, args.emailAddress, args.fail); + this.eventBus.publish(new TestEvent(ctx, admin)); + await new Promise(resolve => setTimeout(resolve, 50)); + return admin; + } + @Query() async verify() { const admins = await this.connection.getRepository(Administrator).find(); @@ -119,6 +129,7 @@ class TestResolver { createTestAdministrator(emailAddress: String!, fail: Boolean!): Administrator createTestAdministrator2(emailAddress: String!, fail: Boolean!): Administrator createTestAdministrator3(emailAddress: String!, fail: Boolean!): Administrator + createTestAdministrator4(emailAddress: String!, fail: Boolean!): Administrator } type VerifyResult { admins: [Administrator!]! @@ -138,16 +149,33 @@ export class TransactionTestPlugin implements OnApplicationBootstrap { constructor(private eventBus: EventBus, private connection: TransactionalConnection) {} + static reset() { + this.eventHandlerComplete$ = new ReplaySubject(1); + this.errorHandler.mockClear(); + } + onApplicationBootstrap(): any { // This part is used to test how RequestContext with transactions behave // when used in an Event subscription this.subscription = this.eventBus.ofType(TestEvent).subscribe(async event => { const { ctx, administrator } = event; - if (administrator.emailAddress === TRIGGER_EMAIL) { + if (administrator.emailAddress === TRIGGER_ATTEMPTED_UPDATE_EMAIL) { + const adminRepository = this.connection.getRepository(ctx, Administrator); + await new Promise(resolve => setTimeout(resolve, 50)); administrator.lastName = 'modified'; try { - await new Promise(resolve => setTimeout(resolve, 50)); - await this.connection.getRepository(ctx, Administrator).save(administrator); + await adminRepository.save(administrator); + } catch (e) { + TransactionTestPlugin.errorHandler(e); + } finally { + TransactionTestPlugin.eventHandlerComplete$.complete(); + } + } + if (administrator.emailAddress === TRIGGER_ATTEMPTED_READ_EMAIL) { + // note the ctx is not passed here, so we are not inside the ongoing transaction + const adminRepository = this.connection.getRepository(Administrator); + try { + await adminRepository.findOneOrFail(administrator.id); } catch (e) { TransactionTestPlugin.errorHandler(e); } finally { diff --git a/packages/core/src/config/config.service.mock.ts b/packages/core/src/config/config.service.mock.ts index 6758af3d39..dc7ceefb63 100644 --- a/packages/core/src/config/config.service.mock.ts +++ b/packages/core/src/config/config.service.mock.ts @@ -23,6 +23,7 @@ export class MockConfigService implements MockClass { defaultLanguageCode: jest.Mock; roundingStrategy: {}; entityIdStrategy = new MockIdStrategy(); + entityOptions = {}; assetOptions = { assetNamingStrategy: {} as any, assetStorageStrategy: {} as any, diff --git a/packages/core/src/connection/connection.module.ts b/packages/core/src/connection/connection.module.ts index 3c2c0d47f7..79f2f0a564 100644 --- a/packages/core/src/connection/connection.module.ts +++ b/packages/core/src/connection/connection.module.ts @@ -6,14 +6,14 @@ import { ConfigModule } from '../config/config.module'; import { ConfigService } from '../config/config.service'; import { TypeOrmLogger } from '../config/logger/typeorm-logger'; +import { TransactionSubscriber } from './transaction-subscriber'; import { TransactionalConnection } from './transactional-connection'; let defaultTypeOrmModule: DynamicModule; @Module({ - imports: [], - providers: [TransactionalConnection], - exports: [TransactionalConnection], + providers: [TransactionalConnection, TransactionSubscriber], + exports: [TransactionalConnection, TransactionSubscriber], }) export class ConnectionModule { static forRoot(): DynamicModule { diff --git a/packages/core/src/connection/transaction-subscriber.ts b/packages/core/src/connection/transaction-subscriber.ts new file mode 100644 index 0000000000..3d1ac2d688 --- /dev/null +++ b/packages/core/src/connection/transaction-subscriber.ts @@ -0,0 +1,67 @@ +import { Injectable } from '@nestjs/common'; +import { InjectConnection } from '@nestjs/typeorm'; +import { merge, Subject } from 'rxjs'; +import { filter, map, take } from 'rxjs/operators'; +import { Connection, EntitySubscriberInterface } from 'typeorm'; +import { EntityManager } from 'typeorm/entity-manager/EntityManager'; +import { QueryRunner } from 'typeorm/query-runner/QueryRunner'; +import { TransactionCommitEvent } from 'typeorm/subscriber/event/TransactionCommitEvent'; +import { TransactionRollbackEvent } from 'typeorm/subscriber/event/TransactionRollbackEvent'; + +export interface TransactionSubscriberEvent { + /** + * Connection used in the event. + */ + connection: Connection; + /** + * QueryRunner used in the event transaction. + * All database operations in the subscribed event listener should be performed using this query runner instance. + */ + queryRunner: QueryRunner; + /** + * EntityManager used in the event transaction. + * All database operations in the subscribed event listener should be performed using this entity manager instance. + */ + manager: EntityManager; +} + +/** + * This subscriber listens to all transaction commit/rollback events emitted by TypeORM + * so that we can be notified as soon as a particular queryRunner's transactions ends. + * + * This is used by the {@link EventBus} to prevent events from being published until their + * associated transactions are complete. + */ +@Injectable() +export class TransactionSubscriber implements EntitySubscriberInterface { + private commit$ = new Subject(); + private rollback$ = new Subject(); + + constructor(@InjectConnection() private connection: Connection) { + if (!connection.subscribers.find(subscriber => subscriber.constructor === TransactionSubscriber)) { + connection.subscribers.push(this); + } + } + + afterTransactionCommit(event: TransactionCommitEvent) { + this.commit$.next(event); + } + + afterTransactionRollback(event: TransactionRollbackEvent) { + this.rollback$.next(event); + } + + awaitRelease(queryRunner: QueryRunner): Promise { + if (queryRunner.isTransactionActive) { + return merge(this.commit$, this.rollback$) + .pipe( + filter(event => event.queryRunner === queryRunner), + take(1), + map(event => event.queryRunner), + ) + .toPromise(); + } else { + return Promise.resolve(queryRunner); + } + } +} diff --git a/packages/core/src/connection/transactional-connection.ts b/packages/core/src/connection/transactional-connection.ts index 219d1f6ba5..c8df7f43c6 100644 --- a/packages/core/src/connection/transactional-connection.ts +++ b/packages/core/src/connection/transactional-connection.ts @@ -36,9 +36,7 @@ export interface GetEntityOrThrowOptions extends FindOneOptions { /** * @description * If set to a positive integer, it will retry getting the entity in case it is initially not - * found. This can be useful when working with the {@link EventBus} and subscribing to the - * creation of new Entities which may on first attempt be inaccessible due to an ongoing - * transaction. + * found. * * @since 1.1.0 * @default 0 @@ -157,9 +155,7 @@ export class TransactionalConnection { /** * @description * Finds an entity of the given type by ID, or throws an `EntityNotFoundError` if none - * is found. Can be configured to retry (using the `retries` option) in the event of the - * entity not being found on the first attempt. This can be useful when attempting to access - * an entity which was just created and may be inaccessible due to an ongoing transaction. + * is found. */ async getEntityOrThrow( ctx: RequestContext, diff --git a/packages/core/src/event-bus/event-bus.module.ts b/packages/core/src/event-bus/event-bus.module.ts index e79ad3cf9a..ef81333b9c 100644 --- a/packages/core/src/event-bus/event-bus.module.ts +++ b/packages/core/src/event-bus/event-bus.module.ts @@ -1,8 +1,11 @@ import { Module } from '@nestjs/common'; +import { ConnectionModule } from '../connection/connection.module'; + import { EventBus } from './event-bus'; @Module({ + imports: [ConnectionModule], providers: [EventBus], exports: [EventBus], }) diff --git a/packages/core/src/event-bus/event-bus.ts b/packages/core/src/event-bus/event-bus.ts index 9a31e27436..66ea7dc0d2 100644 --- a/packages/core/src/event-bus/event-bus.ts +++ b/packages/core/src/event-bus/event-bus.ts @@ -1,11 +1,12 @@ import { Injectable, OnModuleDestroy } from '@nestjs/common'; import { Type } from '@vendure/common/lib/shared-types'; import { Observable, Subject } from 'rxjs'; -import { filter, takeUntil } from 'rxjs/operators'; +import { filter, mergeMap, takeUntil } from 'rxjs/operators'; import { EntityManager } from 'typeorm'; import { RequestContext } from '../api/common/request-context'; import { TRANSACTION_MANAGER_KEY } from '../common/constants'; +import { TransactionSubscriber } from '../connection/transaction-subscriber'; import { VendureEvent } from './vendure-event'; @@ -57,22 +58,30 @@ export class EventBus implements OnModuleDestroy { private eventStream = new Subject(); private destroy$ = new Subject(); + constructor(private transactionSubscriber: TransactionSubscriber) {} + /** * @description * Publish an event which any subscribers can react to. */ publish(event: T): void { - this.eventStream.next(this.prepareRequestContext(event)); + this.eventStream.next(event); } /** * @description * Returns an RxJS Observable stream of events of the given type. + * If the event contains a {@link RequestContext} object, the subscriber + * will only get called after any active database transactions are complete. + * + * This means that the subscriber function can safely access all updated + * data related to the event. */ ofType(type: Type): Observable { return this.eventStream.asObservable().pipe( takeUntil(this.destroy$), filter(e => (e as any).constructor === type), + mergeMap(event => this.awaitActiveTransactions(event)), ) as Observable; } @@ -82,26 +91,33 @@ export class EventBus implements OnModuleDestroy { } /** - * If the Event includes a RequestContext property, we need to: + * If the Event includes a RequestContext property, we need to check for any active transaction + * associated with it, and if there is one, we await that transaction to either commit or rollback + * before publishing the event. + * + * The reason for this is that if the transaction is still active when event subscribers execute, + * this can cause a couple of issues: * - * 1) Set it as a copy of the original - * 2) Remove the TRANSACTION_MANAGER_KEY from that copy + * 1. If the transaction hasn't completed by the time the subscriber runs, the new data inside + * the transaction will not be available to the subscriber. + * 2. If the subscriber gets a reference to the EntityManager which has an active transaction, + * and then the transaction completes, and then the subscriber attempts a DB operation using that + * EntityManager, a fatal QueryRunnerAlreadyReleasedError will be thrown. * - * The TRANSACTION_MANAGER_KEY is used to track transactions across calls - * (this is why we always pass the `ctx` object to get TransactionalConnection.getRepository() method). - * However, allowing a transaction to continue in an async event subscriber function _will_ cause - * very confusing issues (see https://github.com/vendure-ecommerce/vendure/issues/520), which is why - * we simply remove the reference to the transaction manager from the context object altogether. + * For more context on these two issues, see: + * + * * https://github.com/vendure-ecommerce/vendure/issues/520 + * * https://github.com/vendure-ecommerce/vendure/issues/1107 */ - private prepareRequestContext(event: T): T { - for (const propertyName of Object.getOwnPropertyNames(event)) { - const property = event[propertyName as keyof T]; - if (property instanceof RequestContext) { - const ctxCopy = property.copy(); - delete (ctxCopy as any)[TRANSACTION_MANAGER_KEY]; - (event[propertyName as keyof T] as any) = ctxCopy; - } + private async awaitActiveTransactions(event: T): Promise { + const ctx = Object.values(event).find(value => value instanceof RequestContext); + if (!ctx) { + return event; + } + const transactionManager: EntityManager | undefined = (ctx as any)[TRANSACTION_MANAGER_KEY]; + if (!transactionManager?.queryRunner) { + return event; } - return event; + return this.transactionSubscriber.awaitRelease(transactionManager.queryRunner).then(() => event); } } diff --git a/packages/core/src/service/services/product-variant.service.ts b/packages/core/src/service/services/product-variant.service.ts index 9249ea63b0..467af0a63f 100644 --- a/packages/core/src/service/services/product-variant.service.ts +++ b/packages/core/src/service/services/product-variant.service.ts @@ -649,9 +649,6 @@ export class ProductVariantService { ctx, variants.map(v => v.id), ); - // Publish the events at the latest possible stage to decrease the chance of race conditions - // whereby an event listener triggers a query which does not yet have access to the changes - // within the current transaction. for (const variant of variants) { this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'assigned')); } diff --git a/packages/dev-server/test-plugins/event-bus-transactions-plugin.ts b/packages/dev-server/test-plugins/event-bus-transactions-plugin.ts new file mode 100644 index 0000000000..9877e620cf --- /dev/null +++ b/packages/dev-server/test-plugins/event-bus-transactions-plugin.ts @@ -0,0 +1,63 @@ +/* tslint:disable:no-non-null-assertion */ +import { OnModuleInit } from '@nestjs/common'; +import { Args, Mutation, Resolver } from '@nestjs/graphql'; +import { + Asset, + AssetEvent, + AssetService, + Ctx, + EventBus, + ID, + Logger, + PluginCommonModule, + RequestContext, + Transaction, + TransactionalConnection, + VendurePlugin, +} from '@vendure/core'; +import gql from 'graphql-tag'; + +@Resolver() +class TestResolver { + constructor(private assetService: AssetService) {} + + @Transaction() + @Mutation() + async setAssetName(@Ctx() ctx: RequestContext, @Args() args: { id: ID; name: string }) { + await this.assetService.update(ctx, { + id: args.id, + name: args.name, + }); + await new Promise(resolve => setTimeout(resolve, 500)); + Logger.info(`setAssetName returning`); + return true; + } +} + +// A plugin to explore solutions to https://github.com/vendure-ecommerce/vendure/issues/1107 +@VendurePlugin({ + imports: [PluginCommonModule], + adminApiExtensions: { + schema: gql` + extend type Mutation { + setAssetName(id: ID!, name: String!): Boolean + } + `, + resolvers: [TestResolver], + }, +}) +export class EventBusTransactionsPlugin implements OnModuleInit { + constructor(private eventBus: EventBus, private connection: TransactionalConnection) {} + + onModuleInit(): any { + this.eventBus.ofType(AssetEvent).subscribe(async event => { + Logger.info(`Event handler started`); + const repository = this.connection.getRepository(event.ctx, Asset); + await new Promise(resolve => setTimeout(resolve, 1000)); + const asset = await repository.findOne(event.asset.id); + Logger.info(`The asset name is ${asset?.name}`); + asset!.name = asset!.name + ' modified'; + await repository.save(asset!); + }); + } +}