From 739e56cb7730babd41e4b336924df4ff3f5f4c1e Mon Sep 17 00:00:00 2001 From: Michael Bromley Date: Tue, 10 Nov 2020 12:07:39 +0100 Subject: [PATCH] fix(core): Mitigate QueryRunnerAlreadyReleasedError in EventBus handlers Fixes #520 --- .../e2e/database-transactions.e2e-spec.ts | 14 +- .../test-plugins/transaction-test-plugin.ts | 52 +++++- .../src/api/common/request-context.spec.ts | 162 ++++++++++++------ .../core/src/api/common/request-context.ts | 20 ++- packages/core/src/event-bus/event-bus.ts | 35 +++- 5 files changed, 217 insertions(+), 66 deletions(-) diff --git a/packages/core/e2e/database-transactions.e2e-spec.ts b/packages/core/e2e/database-transactions.e2e-spec.ts index 8647df800d..c672081170 100644 --- a/packages/core/e2e/database-transactions.e2e-spec.ts +++ b/packages/core/e2e/database-transactions.e2e-spec.ts @@ -6,7 +6,7 @@ import path from 'path'; import { initialData } from '../../../e2e-common/e2e-initial-data'; import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config'; -import { TransactionTestPlugin } from './fixtures/test-plugins/transaction-test-plugin'; +import { TransactionTestPlugin, TRIGGER_EMAIL } from './fixtures/test-plugins/transaction-test-plugin'; describe('Transaction infrastructure', () => { const { server, adminClient } = createTestEnvironment( @@ -101,6 +101,18 @@ describe('Transaction infrastructure', () => { expect(!!verify.admins.find((a: any) => a.emailAddress === 'test4')).toBe(false); expect(!!verify.users.find((u: any) => u.identifier === 'test4')).toBe(true); }); + + // Testing https://github.com/vendure-ecommerce/vendure/issues/520 + it('passing transaction via EventBus', async () => { + TransactionTestPlugin.errorHandler.mockClear(); + const { createTestAdministrator } = await adminClient.query(CREATE_ADMIN, { + emailAddress: TRIGGER_EMAIL, + fail: false, + }); + await TransactionTestPlugin.eventHandlerComplete$.toPromise(); + expect(createTestAdministrator.emailAddress).toBe(TRIGGER_EMAIL); + expect(TransactionTestPlugin.errorHandler).not.toHaveBeenCalled(); + }); }); const ADMIN_FRAGMENT = gql` diff --git a/packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts b/packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts index 3643763e42..7ee2f09063 100644 --- a/packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts +++ b/packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts @@ -1,8 +1,9 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, OnApplicationBootstrap } from '@nestjs/common'; import { Args, Mutation, Query, Resolver } from '@nestjs/graphql'; import { Administrator, Ctx, + EventBus, InternalServerError, NativeAuthenticationMethod, PluginCommonModule, @@ -10,9 +11,19 @@ import { Transaction, TransactionalConnection, User, + VendureEvent, VendurePlugin, } from '@vendure/core'; import gql from 'graphql-tag'; +import { ReplaySubject, Subscription } from 'rxjs'; + +export class TestEvent extends VendureEvent { + constructor(public ctx: RequestContext, public administrator: Administrator) { + super(); + } +} + +export const TRIGGER_EMAIL = 'trigger-email'; @Injectable() class TestUserService { @@ -60,12 +71,18 @@ class TestAdminService { @Resolver() class TestResolver { - constructor(private testAdminService: TestAdminService, private connection: TransactionalConnection) {} + constructor( + private testAdminService: TestAdminService, + private connection: TransactionalConnection, + private eventBus: EventBus, + ) {} @Mutation() @Transaction() - createTestAdministrator(@Ctx() ctx: RequestContext, @Args() args: any) { - return this.testAdminService.createAdministrator(ctx, args.emailAddress, args.fail); + async createTestAdministrator(@Ctx() ctx: RequestContext, @Args() args: any) { + const admin = await this.testAdminService.createAdministrator(ctx, args.emailAddress, args.fail); + this.eventBus.publish(new TestEvent(ctx, admin)); + return admin; } @Mutation() @@ -114,4 +131,29 @@ class TestResolver { resolvers: [TestResolver], }, }) -export class TransactionTestPlugin {} +export class TransactionTestPlugin implements OnApplicationBootstrap { + private subscription: Subscription; + static errorHandler = jest.fn(); + static eventHandlerComplete$ = new ReplaySubject(1); + + constructor(private eventBus: EventBus, private connection: TransactionalConnection) {} + + onApplicationBootstrap(): any { + // This part is used to test how RequestContext with transactions behave + // when used in an Event subscription + this.subscription = this.eventBus.ofType(TestEvent).subscribe(async event => { + const { ctx, administrator } = event; + if (administrator.emailAddress === TRIGGER_EMAIL) { + administrator.lastName = 'modified'; + try { + await new Promise(resolve => setTimeout(resolve, 1)); + await this.connection.getRepository(ctx, Administrator).save(administrator); + } catch (e) { + TransactionTestPlugin.errorHandler(e); + } finally { + TransactionTestPlugin.eventHandlerComplete$.complete(); + } + } + }); + } +} diff --git a/packages/core/src/api/common/request-context.spec.ts b/packages/core/src/api/common/request-context.spec.ts index 8998c912e2..f3e35826e1 100644 --- a/packages/core/src/api/common/request-context.spec.ts +++ b/packages/core/src/api/common/request-context.spec.ts @@ -9,97 +9,147 @@ import { Zone } from '../../entity/zone/zone.entity'; import { RequestContext, SerializedRequestContext } from './request-context'; describe('RequestContext', () => { - describe('fromObject()', () => { + describe('serialize/deserialize', () => { + let serializedCtx: SerializedRequestContext; let original: RequestContext; - let ctxObject: SerializedRequestContext; - let session: CachedSession; - let channel: Channel; - let activeOrder: Order; - let zone: Zone; beforeAll(() => { - activeOrder = new Order({ - id: '55555', - active: true, - code: 'ADAWDJAWD', - }); - session = { - cacheExpiry: Number.MAX_SAFE_INTEGER, - expires: new Date(), - id: '1234', - token: '2d37187e9e8fc47807fe4f58ca', - activeOrderId: '123', - user: { - id: '8833774', - identifier: 'user', - verified: true, - channelPermissions: [], - }, - }; - zone = new Zone({ - id: '62626', - name: 'Europe', - }); - channel = new Channel({ - token: 'oiajwodij09au3r', - id: '995859', - code: '__default_channel__', - currencyCode: CurrencyCode.EUR, - pricesIncludeTax: true, - defaultLanguageCode: LanguageCode.en, - defaultShippingZone: zone, - defaultTaxZone: zone, - }); - original = new RequestContext({ - apiType: 'admin', - languageCode: LanguageCode.en, - channel, - session, - isAuthorized: true, - authorizedAsOwnerOnly: false, - }); - - ctxObject = original.serialize(); + original = createRequestContext(); + serializedCtx = original.serialize(); }); it('apiType', () => { - const result = RequestContext.deserialize(ctxObject); + const result = RequestContext.deserialize(serializedCtx); expect(result.apiType).toBe(original.apiType); }); it('channelId', () => { - const result = RequestContext.deserialize(ctxObject); + const result = RequestContext.deserialize(serializedCtx); expect(result.channelId).toBe(original.channelId); }); it('languageCode', () => { - const result = RequestContext.deserialize(ctxObject); + const result = RequestContext.deserialize(serializedCtx); expect(result.languageCode).toBe(original.languageCode); }); it('activeUserId', () => { - const result = RequestContext.deserialize(ctxObject); + const result = RequestContext.deserialize(serializedCtx); expect(result.activeUserId).toBe(original.activeUserId); }); it('isAuthorized', () => { - const result = RequestContext.deserialize(ctxObject); + const result = RequestContext.deserialize(serializedCtx); expect(result.isAuthorized).toBe(original.isAuthorized); }); it('authorizedAsOwnerOnly', () => { - const result = RequestContext.deserialize(ctxObject); + const result = RequestContext.deserialize(serializedCtx); expect(result.authorizedAsOwnerOnly).toBe(original.authorizedAsOwnerOnly); }); it('channel', () => { - const result = RequestContext.deserialize(ctxObject); + const result = RequestContext.deserialize(serializedCtx); expect(result.channel).toEqual(original.channel); }); it('session', () => { - const result = RequestContext.deserialize(ctxObject); + const result = RequestContext.deserialize(serializedCtx); expect(result.session).toEqual(original.session); }); }); + + describe('copy', () => { + let original: RequestContext; + + beforeAll(() => { + original = createRequestContext(); + }); + + it('is a RequestContext instance', () => { + const copy = original.copy(); + expect(copy instanceof RequestContext).toBe(true); + }); + + it('is not identical to original', () => { + const copy = original.copy(); + expect(copy === original).toBe(false); + }); + + it('getters work', () => { + const copy = original.copy(); + + expect(copy.apiType).toEqual(original.apiType); + expect(copy.channelId).toEqual(original.channelId); + expect(copy.languageCode).toEqual(original.languageCode); + expect(copy.activeUserId).toEqual(original.activeUserId); + expect(copy.isAuthorized).toEqual(original.isAuthorized); + expect(copy.authorizedAsOwnerOnly).toEqual(original.authorizedAsOwnerOnly); + expect(copy.channel).toEqual(original.channel); + expect(copy.session).toEqual(original.session); + }); + + it('mutating copy leaves original intact', () => { + const copy = original.copy(); + (copy as any).foo = 'bar'; + + expect((copy as any).foo).toBe('bar'); + expect((original as any).foo).toBeUndefined(); + }); + + it('mutating deep property affects both', () => { + const copy = original.copy(); + copy.channel.code = 'changed'; + + expect(copy.channel.code).toBe('changed'); + expect(original.channel.code).toBe('changed'); + }); + }); + + function createRequestContext() { + let session: CachedSession; + let channel: Channel; + let activeOrder: Order; + let zone: Zone; + activeOrder = new Order({ + id: '55555', + active: true, + code: 'ADAWDJAWD', + }); + session = { + cacheExpiry: Number.MAX_SAFE_INTEGER, + expires: new Date(), + id: '1234', + token: '2d37187e9e8fc47807fe4f58ca', + activeOrderId: '123', + user: { + id: '8833774', + identifier: 'user', + verified: true, + channelPermissions: [], + }, + }; + zone = new Zone({ + id: '62626', + name: 'Europe', + }); + channel = new Channel({ + token: 'oiajwodij09au3r', + id: '995859', + code: '__default_channel__', + currencyCode: CurrencyCode.EUR, + pricesIncludeTax: true, + defaultLanguageCode: LanguageCode.en, + defaultShippingZone: zone, + defaultTaxZone: zone, + }); + return new RequestContext({ + apiType: 'admin', + languageCode: LanguageCode.en, + channel, + session, + isAuthorized: true, + authorizedAsOwnerOnly: false, + }); + } }); diff --git a/packages/core/src/api/common/request-context.ts b/packages/core/src/api/common/request-context.ts index 5087a73d7b..15df581ac9 100644 --- a/packages/core/src/api/common/request-context.ts +++ b/packages/core/src/api/common/request-context.ts @@ -85,8 +85,8 @@ export class RequestContext { /** * @description - * Creates a new RequestContext object from a plain object which is the result of - * a JSON serialization - deserialization operation. + * Creates a new RequestContext object from a serialized object created by the + * `serialize()` method. */ static deserialize(ctxObject: SerializedRequestContext): RequestContext { return new RequestContext({ @@ -102,10 +102,26 @@ export class RequestContext { }); } + /** + * @description + * Serializes the RequestContext object into a JSON-compatible simple object. + * This is useful when you need to send a RequestContext object to another + * process, e.g. to pass it to the Worker process via the {@link WorkerService}. + */ serialize(): SerializedRequestContext { return JSON.parse(JSON.stringify(this)); } + /** + * @description + * Creates a shallow copy of the RequestContext instance. This means that + * mutations to the copy itself will not affect the original, but deep mutations + * (e.g. copy.channel.code = 'new') *will* also affect the original. + */ + copy(): RequestContext { + return Object.assign(Object.create(Object.getPrototypeOf(this)), this); + } + get apiType(): ApiType { return this._apiType; } diff --git a/packages/core/src/event-bus/event-bus.ts b/packages/core/src/event-bus/event-bus.ts index 5b0c5f1c96..478ecd141a 100644 --- a/packages/core/src/event-bus/event-bus.ts +++ b/packages/core/src/event-bus/event-bus.ts @@ -3,6 +3,9 @@ import { Type } from '@vendure/common/lib/shared-types'; import { Observable, Subject } from 'rxjs'; import { filter, takeUntil } from 'rxjs/operators'; +import { RequestContext } from '../api/common/request-context'; +import { TRANSACTION_MANAGER_KEY } from '../common/constants'; + import { VendureEvent } from './vendure-event'; export type EventHandler = (event: T) => void; @@ -33,7 +36,7 @@ export class EventBus implements OnModuleDestroy { handlers[i](event); } } - this.eventStream.next(event); + this.eventStream.next(this.prepareRequestContext(event)); } /** @@ -62,11 +65,39 @@ export class EventBus implements OnModuleDestroy { handlers.push(handler); } this.subscriberMap.set(type, handlers); - return () => this.subscriberMap.set(type, handlers.filter(h => h !== handler)); + return () => + this.subscriberMap.set( + type, + handlers.filter(h => h !== handler), + ); } /** @internal */ onModuleDestroy(): any { this.destroy$.next(); } + + /** + * If the Event includes a RequestContext property, we need to: + * + * 1) Set it as a copy of the original + * 2) Remove the TRANSACTION_MANAGER_KEY from that copy + * + * The TRANSACTION_MANAGER_KEY is used to track transactions across calls + * (this is why we always pass the `ctx` object to get TransactionalConnection.getRepository() method). + * However, allowing a transaction to continue in an async event subscriber function _will_ cause + * very confusing issues (see https://github.com/vendure-ecommerce/vendure/issues/520), which is why + * we simply remove the reference to the transaction manager from the context object altogether. + */ + private prepareRequestContext(event: T): T { + for (const propertyName of Object.getOwnPropertyNames(event)) { + const property = event[propertyName as keyof T]; + if (property instanceof RequestContext) { + const ctxCopy = property.copy(); + delete (ctxCopy as any)[TRANSACTION_MANAGER_KEY]; + (event[propertyName as keyof T] as any) = ctxCopy; + } + } + return event; + } }