diff --git a/packages/core/e2e/database-transactions.e2e-spec.ts b/packages/core/e2e/database-transactions.e2e-spec.ts index c672081170..5f38704eba 100644 --- a/packages/core/e2e/database-transactions.e2e-spec.ts +++ b/packages/core/e2e/database-transactions.e2e-spec.ts @@ -2,11 +2,16 @@ import { mergeConfig } from '@vendure/core'; import { createTestEnvironment } from '@vendure/testing'; import gql from 'graphql-tag'; import path from 'path'; +import { ReplaySubject } from 'rxjs'; import { initialData } from '../../../e2e-common/e2e-initial-data'; import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config'; -import { TransactionTestPlugin, TRIGGER_EMAIL } from './fixtures/test-plugins/transaction-test-plugin'; +import { + TransactionTestPlugin, + TRIGGER_ATTEMPTED_READ_EMAIL, + TRIGGER_ATTEMPTED_UPDATE_EMAIL, +} from './fixtures/test-plugins/transaction-test-plugin'; describe('Transaction infrastructure', () => { const { server, adminClient } = createTestEnvironment( @@ -104,13 +109,25 @@ describe('Transaction infrastructure', () => { // Testing https://github.com/vendure-ecommerce/vendure/issues/520 it('passing transaction via EventBus', async () => { - TransactionTestPlugin.errorHandler.mockClear(); + TransactionTestPlugin.reset(); const { createTestAdministrator } = await adminClient.query(CREATE_ADMIN, { - emailAddress: TRIGGER_EMAIL, + emailAddress: TRIGGER_ATTEMPTED_UPDATE_EMAIL, fail: false, }); await TransactionTestPlugin.eventHandlerComplete$.toPromise(); - expect(createTestAdministrator.emailAddress).toBe(TRIGGER_EMAIL); + expect(createTestAdministrator.emailAddress).toBe(TRIGGER_ATTEMPTED_UPDATE_EMAIL); + expect(TransactionTestPlugin.errorHandler).not.toHaveBeenCalled(); + }); + + // Testing https://github.com/vendure-ecommerce/vendure/issues/1107 + it('passing transaction via EventBus with delay in committing transaction', async () => { + TransactionTestPlugin.reset(); + const { createTestAdministrator4 } = await adminClient.query(CREATE_ADMIN4, { + emailAddress: TRIGGER_ATTEMPTED_READ_EMAIL, + fail: false, + }); + await TransactionTestPlugin.eventHandlerComplete$.toPromise(); + expect(createTestAdministrator4.emailAddress).toBe(TRIGGER_ATTEMPTED_READ_EMAIL); expect(TransactionTestPlugin.errorHandler).not.toHaveBeenCalled(); }); }); @@ -145,7 +162,7 @@ const CREATE_ADMIN2 = gql` `; const CREATE_ADMIN3 = gql` - mutation CreateTestAdmin2($emailAddress: String!, $fail: Boolean!) { + mutation CreateTestAdmin3($emailAddress: String!, $fail: Boolean!) { createTestAdministrator3(emailAddress: $emailAddress, fail: $fail) { ...CreatedAdmin } @@ -153,6 +170,15 @@ const CREATE_ADMIN3 = gql` ${ADMIN_FRAGMENT} `; +const CREATE_ADMIN4 = gql` + mutation CreateTestAdmin4($emailAddress: String!, $fail: Boolean!) { + createTestAdministrator4(emailAddress: $emailAddress, fail: $fail) { + ...CreatedAdmin + } + } + ${ADMIN_FRAGMENT} +`; + const VERIFY_TEST = gql` query VerifyTest { verify { diff --git a/packages/core/e2e/default-search-plugin.e2e-spec.ts b/packages/core/e2e/default-search-plugin.e2e-spec.ts index f159cc2abd..f60a86d8fa 100644 --- a/packages/core/e2e/default-search-plugin.e2e-spec.ts +++ b/packages/core/e2e/default-search-plugin.e2e-spec.ts @@ -42,7 +42,7 @@ import { UpdateCollection, UpdateProduct, UpdateProductVariants, - UpdateTaxRate + UpdateTaxRate, } from './graphql/generated-e2e-admin-types'; import { LogicalOperator, SearchProductsShop } from './graphql/generated-e2e-shop-types'; import { @@ -565,7 +565,7 @@ describe('Default search plugin', () => { }, ); expect(result.search.collections).toEqual([ - {collection: {id: 'T_2', name: 'Plants',},count: 3,}, + { collection: { id: 'T_2', name: 'Plants' }, count: 3 }, ]); }); @@ -579,8 +579,8 @@ describe('Default search plugin', () => { }, ); expect(result.search.collections).toEqual([ - {collection: {id: 'T_2', name: 'Plants',},count: 3,}, - ]); + { collection: { id: 'T_2', name: 'Plants' }, count: 3 }, + ]); }); it('encodes the productId and productVariantId', async () => { diff --git a/packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts b/packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts index 66bc8ff6c5..327d20e72f 100644 --- a/packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts +++ b/packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts @@ -23,7 +23,8 @@ export class TestEvent extends VendureEvent { } } -export const TRIGGER_EMAIL = 'trigger-email'; +export const TRIGGER_ATTEMPTED_UPDATE_EMAIL = 'trigger-attempted-update-email'; +export const TRIGGER_ATTEMPTED_READ_EMAIL = 'trigger-attempted-read-email'; @Injectable() class TestUserService { @@ -99,6 +100,15 @@ class TestResolver { return this.testAdminService.createAdministrator(ctx, args.emailAddress, args.fail); } + @Mutation() + @Transaction() + async createTestAdministrator4(@Ctx() ctx: RequestContext, @Args() args: any) { + const admin = await this.testAdminService.createAdministrator(ctx, args.emailAddress, args.fail); + this.eventBus.publish(new TestEvent(ctx, admin)); + await new Promise(resolve => setTimeout(resolve, 50)); + return admin; + } + @Query() async verify() { const admins = await this.connection.getRepository(Administrator).find(); @@ -119,6 +129,7 @@ class TestResolver { createTestAdministrator(emailAddress: String!, fail: Boolean!): Administrator createTestAdministrator2(emailAddress: String!, fail: Boolean!): Administrator createTestAdministrator3(emailAddress: String!, fail: Boolean!): Administrator + createTestAdministrator4(emailAddress: String!, fail: Boolean!): Administrator } type VerifyResult { admins: [Administrator!]! @@ -138,16 +149,33 @@ export class TransactionTestPlugin implements OnApplicationBootstrap { constructor(private eventBus: EventBus, private connection: TransactionalConnection) {} + static reset() { + this.eventHandlerComplete$ = new ReplaySubject(1); + this.errorHandler.mockClear(); + } + onApplicationBootstrap(): any { // This part is used to test how RequestContext with transactions behave // when used in an Event subscription this.subscription = this.eventBus.ofType(TestEvent).subscribe(async event => { const { ctx, administrator } = event; - if (administrator.emailAddress === TRIGGER_EMAIL) { + if (administrator.emailAddress === TRIGGER_ATTEMPTED_UPDATE_EMAIL) { + const adminRepository = this.connection.getRepository(ctx, Administrator); + await new Promise(resolve => setTimeout(resolve, 50)); administrator.lastName = 'modified'; try { - await new Promise(resolve => setTimeout(resolve, 50)); - await this.connection.getRepository(ctx, Administrator).save(administrator); + await adminRepository.save(administrator); + } catch (e) { + TransactionTestPlugin.errorHandler(e); + } finally { + TransactionTestPlugin.eventHandlerComplete$.complete(); + } + } + if (administrator.emailAddress === TRIGGER_ATTEMPTED_READ_EMAIL) { + // note the ctx is not passed here, so we are not inside the ongoing transaction + const adminRepository = this.connection.getRepository(Administrator); + try { + await adminRepository.findOneOrFail(administrator.id); } catch (e) { TransactionTestPlugin.errorHandler(e); } finally { diff --git a/packages/core/src/config/config.service.mock.ts b/packages/core/src/config/config.service.mock.ts index 6758af3d39..dc7ceefb63 100644 --- a/packages/core/src/config/config.service.mock.ts +++ b/packages/core/src/config/config.service.mock.ts @@ -23,6 +23,7 @@ export class MockConfigService implements MockClass { defaultLanguageCode: jest.Mock; roundingStrategy: {}; entityIdStrategy = new MockIdStrategy(); + entityOptions = {}; assetOptions = { assetNamingStrategy: {} as any, assetStorageStrategy: {} as any, diff --git a/packages/core/src/connection/connection.module.ts b/packages/core/src/connection/connection.module.ts index 3c2c0d47f7..79f2f0a564 100644 --- a/packages/core/src/connection/connection.module.ts +++ b/packages/core/src/connection/connection.module.ts @@ -6,14 +6,14 @@ import { ConfigModule } from '../config/config.module'; import { ConfigService } from '../config/config.service'; import { TypeOrmLogger } from '../config/logger/typeorm-logger'; +import { TransactionSubscriber } from './transaction-subscriber'; import { TransactionalConnection } from './transactional-connection'; let defaultTypeOrmModule: DynamicModule; @Module({ - imports: [], - providers: [TransactionalConnection], - exports: [TransactionalConnection], + providers: [TransactionalConnection, TransactionSubscriber], + exports: [TransactionalConnection, TransactionSubscriber], }) export class ConnectionModule { static forRoot(): DynamicModule { diff --git a/packages/core/src/connection/transaction-subscriber.ts b/packages/core/src/connection/transaction-subscriber.ts new file mode 100644 index 0000000000..3d1ac2d688 --- /dev/null +++ b/packages/core/src/connection/transaction-subscriber.ts @@ -0,0 +1,67 @@ +import { Injectable } from '@nestjs/common'; +import { InjectConnection } from '@nestjs/typeorm'; +import { merge, Subject } from 'rxjs'; +import { filter, map, take } from 'rxjs/operators'; +import { Connection, EntitySubscriberInterface } from 'typeorm'; +import { EntityManager } from 'typeorm/entity-manager/EntityManager'; +import { QueryRunner } from 'typeorm/query-runner/QueryRunner'; +import { TransactionCommitEvent } from 'typeorm/subscriber/event/TransactionCommitEvent'; +import { TransactionRollbackEvent } from 'typeorm/subscriber/event/TransactionRollbackEvent'; + +export interface TransactionSubscriberEvent { + /** + * Connection used in the event. + */ + connection: Connection; + /** + * QueryRunner used in the event transaction. + * All database operations in the subscribed event listener should be performed using this query runner instance. + */ + queryRunner: QueryRunner; + /** + * EntityManager used in the event transaction. + * All database operations in the subscribed event listener should be performed using this entity manager instance. + */ + manager: EntityManager; +} + +/** + * This subscriber listens to all transaction commit/rollback events emitted by TypeORM + * so that we can be notified as soon as a particular queryRunner's transactions ends. + * + * This is used by the {@link EventBus} to prevent events from being published until their + * associated transactions are complete. + */ +@Injectable() +export class TransactionSubscriber implements EntitySubscriberInterface { + private commit$ = new Subject(); + private rollback$ = new Subject(); + + constructor(@InjectConnection() private connection: Connection) { + if (!connection.subscribers.find(subscriber => subscriber.constructor === TransactionSubscriber)) { + connection.subscribers.push(this); + } + } + + afterTransactionCommit(event: TransactionCommitEvent) { + this.commit$.next(event); + } + + afterTransactionRollback(event: TransactionRollbackEvent) { + this.rollback$.next(event); + } + + awaitRelease(queryRunner: QueryRunner): Promise { + if (queryRunner.isTransactionActive) { + return merge(this.commit$, this.rollback$) + .pipe( + filter(event => event.queryRunner === queryRunner), + take(1), + map(event => event.queryRunner), + ) + .toPromise(); + } else { + return Promise.resolve(queryRunner); + } + } +} diff --git a/packages/core/src/connection/transactional-connection.ts b/packages/core/src/connection/transactional-connection.ts index 219d1f6ba5..c8df7f43c6 100644 --- a/packages/core/src/connection/transactional-connection.ts +++ b/packages/core/src/connection/transactional-connection.ts @@ -36,9 +36,7 @@ export interface GetEntityOrThrowOptions extends FindOneOptions { /** * @description * If set to a positive integer, it will retry getting the entity in case it is initially not - * found. This can be useful when working with the {@link EventBus} and subscribing to the - * creation of new Entities which may on first attempt be inaccessible due to an ongoing - * transaction. + * found. * * @since 1.1.0 * @default 0 @@ -157,9 +155,7 @@ export class TransactionalConnection { /** * @description * Finds an entity of the given type by ID, or throws an `EntityNotFoundError` if none - * is found. Can be configured to retry (using the `retries` option) in the event of the - * entity not being found on the first attempt. This can be useful when attempting to access - * an entity which was just created and may be inaccessible due to an ongoing transaction. + * is found. */ async getEntityOrThrow( ctx: RequestContext, diff --git a/packages/core/src/event-bus/event-bus.module.ts b/packages/core/src/event-bus/event-bus.module.ts index e79ad3cf9a..ef81333b9c 100644 --- a/packages/core/src/event-bus/event-bus.module.ts +++ b/packages/core/src/event-bus/event-bus.module.ts @@ -1,8 +1,11 @@ import { Module } from '@nestjs/common'; +import { ConnectionModule } from '../connection/connection.module'; + import { EventBus } from './event-bus'; @Module({ + imports: [ConnectionModule], providers: [EventBus], exports: [EventBus], }) diff --git a/packages/core/src/event-bus/event-bus.ts b/packages/core/src/event-bus/event-bus.ts index 9a31e27436..66ea7dc0d2 100644 --- a/packages/core/src/event-bus/event-bus.ts +++ b/packages/core/src/event-bus/event-bus.ts @@ -1,11 +1,12 @@ import { Injectable, OnModuleDestroy } from '@nestjs/common'; import { Type } from '@vendure/common/lib/shared-types'; import { Observable, Subject } from 'rxjs'; -import { filter, takeUntil } from 'rxjs/operators'; +import { filter, mergeMap, takeUntil } from 'rxjs/operators'; import { EntityManager } from 'typeorm'; import { RequestContext } from '../api/common/request-context'; import { TRANSACTION_MANAGER_KEY } from '../common/constants'; +import { TransactionSubscriber } from '../connection/transaction-subscriber'; import { VendureEvent } from './vendure-event'; @@ -57,22 +58,30 @@ export class EventBus implements OnModuleDestroy { private eventStream = new Subject(); private destroy$ = new Subject(); + constructor(private transactionSubscriber: TransactionSubscriber) {} + /** * @description * Publish an event which any subscribers can react to. */ publish(event: T): void { - this.eventStream.next(this.prepareRequestContext(event)); + this.eventStream.next(event); } /** * @description * Returns an RxJS Observable stream of events of the given type. + * If the event contains a {@link RequestContext} object, the subscriber + * will only get called after any active database transactions are complete. + * + * This means that the subscriber function can safely access all updated + * data related to the event. */ ofType(type: Type): Observable { return this.eventStream.asObservable().pipe( takeUntil(this.destroy$), filter(e => (e as any).constructor === type), + mergeMap(event => this.awaitActiveTransactions(event)), ) as Observable; } @@ -82,26 +91,33 @@ export class EventBus implements OnModuleDestroy { } /** - * If the Event includes a RequestContext property, we need to: + * If the Event includes a RequestContext property, we need to check for any active transaction + * associated with it, and if there is one, we await that transaction to either commit or rollback + * before publishing the event. + * + * The reason for this is that if the transaction is still active when event subscribers execute, + * this can cause a couple of issues: * - * 1) Set it as a copy of the original - * 2) Remove the TRANSACTION_MANAGER_KEY from that copy + * 1. If the transaction hasn't completed by the time the subscriber runs, the new data inside + * the transaction will not be available to the subscriber. + * 2. If the subscriber gets a reference to the EntityManager which has an active transaction, + * and then the transaction completes, and then the subscriber attempts a DB operation using that + * EntityManager, a fatal QueryRunnerAlreadyReleasedError will be thrown. * - * The TRANSACTION_MANAGER_KEY is used to track transactions across calls - * (this is why we always pass the `ctx` object to get TransactionalConnection.getRepository() method). - * However, allowing a transaction to continue in an async event subscriber function _will_ cause - * very confusing issues (see https://github.com/vendure-ecommerce/vendure/issues/520), which is why - * we simply remove the reference to the transaction manager from the context object altogether. + * For more context on these two issues, see: + * + * * https://github.com/vendure-ecommerce/vendure/issues/520 + * * https://github.com/vendure-ecommerce/vendure/issues/1107 */ - private prepareRequestContext(event: T): T { - for (const propertyName of Object.getOwnPropertyNames(event)) { - const property = event[propertyName as keyof T]; - if (property instanceof RequestContext) { - const ctxCopy = property.copy(); - delete (ctxCopy as any)[TRANSACTION_MANAGER_KEY]; - (event[propertyName as keyof T] as any) = ctxCopy; - } + private async awaitActiveTransactions(event: T): Promise { + const ctx = Object.values(event).find(value => value instanceof RequestContext); + if (!ctx) { + return event; + } + const transactionManager: EntityManager | undefined = (ctx as any)[TRANSACTION_MANAGER_KEY]; + if (!transactionManager?.queryRunner) { + return event; } - return event; + return this.transactionSubscriber.awaitRelease(transactionManager.queryRunner).then(() => event); } } diff --git a/packages/core/src/service/services/product-variant.service.ts b/packages/core/src/service/services/product-variant.service.ts index 9249ea63b0..467af0a63f 100644 --- a/packages/core/src/service/services/product-variant.service.ts +++ b/packages/core/src/service/services/product-variant.service.ts @@ -649,9 +649,6 @@ export class ProductVariantService { ctx, variants.map(v => v.id), ); - // Publish the events at the latest possible stage to decrease the chance of race conditions - // whereby an event listener triggers a query which does not yet have access to the changes - // within the current transaction. for (const variant of variants) { this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'assigned')); } diff --git a/packages/dev-server/test-plugins/event-bus-transactions-plugin.ts b/packages/dev-server/test-plugins/event-bus-transactions-plugin.ts new file mode 100644 index 0000000000..9877e620cf --- /dev/null +++ b/packages/dev-server/test-plugins/event-bus-transactions-plugin.ts @@ -0,0 +1,63 @@ +/* tslint:disable:no-non-null-assertion */ +import { OnModuleInit } from '@nestjs/common'; +import { Args, Mutation, Resolver } from '@nestjs/graphql'; +import { + Asset, + AssetEvent, + AssetService, + Ctx, + EventBus, + ID, + Logger, + PluginCommonModule, + RequestContext, + Transaction, + TransactionalConnection, + VendurePlugin, +} from '@vendure/core'; +import gql from 'graphql-tag'; + +@Resolver() +class TestResolver { + constructor(private assetService: AssetService) {} + + @Transaction() + @Mutation() + async setAssetName(@Ctx() ctx: RequestContext, @Args() args: { id: ID; name: string }) { + await this.assetService.update(ctx, { + id: args.id, + name: args.name, + }); + await new Promise(resolve => setTimeout(resolve, 500)); + Logger.info(`setAssetName returning`); + return true; + } +} + +// A plugin to explore solutions to https://github.com/vendure-ecommerce/vendure/issues/1107 +@VendurePlugin({ + imports: [PluginCommonModule], + adminApiExtensions: { + schema: gql` + extend type Mutation { + setAssetName(id: ID!, name: String!): Boolean + } + `, + resolvers: [TestResolver], + }, +}) +export class EventBusTransactionsPlugin implements OnModuleInit { + constructor(private eventBus: EventBus, private connection: TransactionalConnection) {} + + onModuleInit(): any { + this.eventBus.ofType(AssetEvent).subscribe(async event => { + Logger.info(`Event handler started`); + const repository = this.connection.getRepository(event.ctx, Asset); + await new Promise(resolve => setTimeout(resolve, 1000)); + const asset = await repository.findOne(event.asset.id); + Logger.info(`The asset name is ${asset?.name}`); + asset!.name = asset!.name + ' modified'; + await repository.save(asset!); + }); + } +}