diff --git a/packages/core/e2e/default-search-plugin.e2e-spec.ts b/packages/core/e2e/default-search-plugin.e2e-spec.ts index 9ca7262095..4c00b85506 100644 --- a/packages/core/e2e/default-search-plugin.e2e-spec.ts +++ b/packages/core/e2e/default-search-plugin.e2e-spec.ts @@ -78,6 +78,7 @@ describe('Default search plugin', () => { }, TEST_SETUP_TIMEOUT_MS); afterAll(async () => { + await awaitRunningJobs(adminClient); await server.destroy(); }); @@ -938,11 +939,6 @@ describe('Default search plugin', () => { }); await awaitRunningJobs(adminClient); - if (process.env.DB === 'postgres') { - // The postgres test is kinda flaky so we stick in a pause for good measure - await new Promise(resolve => setTimeout(resolve, 1000)); - } - adminClient.setChannelToken(SECOND_CHANNEL_TOKEN); const { search: searchGrouped } = await doAdminSearchQuery({ groupByProduct: true }); @@ -969,11 +965,6 @@ describe('Default search plugin', () => { }); await awaitRunningJobs(adminClient); - if (process.env.DB === 'postgres') { - // The postgres test is kinda flaky so we stick in a pause for good measure - await new Promise(resolve => setTimeout(resolve, 1000)); - } - adminClient.setChannelToken(SECOND_CHANNEL_TOKEN); const { search: searchGrouped } = await doAdminSearchQuery({ groupByProduct: true }); diff --git a/packages/core/src/service/services/product-variant.service.ts b/packages/core/src/service/services/product-variant.service.ts index 622c505370..10dece8616 100644 --- a/packages/core/src/service/services/product-variant.service.ts +++ b/packages/core/src/service/services/product-variant.service.ts @@ -528,12 +528,18 @@ export class ProductVariantService { variant.price * priceFactor, input.channelId, ); - this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'assigned')); } - return this.findByIds( + const result = await this.findByIds( ctx, variants.map(v => v.id), ); + // Publish the events at the latest possible stage to decrease the chance of race conditions + // whereby an event listener triggers a query which does not yet have access to the changes + // within the current transaction. + for (const variant of variants) { + this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'assigned')); + } + return result; } async removeProductVariantsFromChannel( @@ -575,12 +581,18 @@ export class ProductVariantService { input.channelId, ]); } - this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'removed')); } - return this.findByIds( + const result = await this.findByIds( ctx, variants.map(v => v.id), ); + // Publish the events at the latest possible stage to decrease the chance of race conditions + // whereby an event listener triggers a query which does not yet have access to the changes + // within the current transaction. + for (const variant of variants) { + this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'removed')); + } + return result; } private async validateVariantOptionIds(ctx: RequestContext, input: CreateProductVariantInput) { diff --git a/packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts b/packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts index 0d9af74c7b..e860228d99 100644 --- a/packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts +++ b/packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts @@ -131,6 +131,7 @@ describe('Elasticsearch plugin', () => { }, TEST_SETUP_TIMEOUT_MS); afterAll(async () => { + await awaitRunningJobs(adminClient); await server.destroy(); }); @@ -808,11 +809,6 @@ describe('Elasticsearch plugin', () => { }); await awaitRunningJobs(adminClient); - if (process.env.DB === 'postgres') { - // The postgres test is kinda flaky so we stick in a pause for good measure - await new Promise(resolve => setTimeout(resolve, 1000)); - } - adminClient.setChannelToken(SECOND_CHANNEL_TOKEN); const { search: searchGrouped } = await doAdminSearchQuery(adminClient, { @@ -843,11 +839,6 @@ describe('Elasticsearch plugin', () => { }); await awaitRunningJobs(adminClient); - if (process.env.DB === 'postgres') { - // The postgres test is kinda flaky so we stick in a pause for good measure - await new Promise(resolve => setTimeout(resolve, 1000)); - } - adminClient.setChannelToken(SECOND_CHANNEL_TOKEN); const { search: searchGrouped } = await doAdminSearchQuery(adminClient, { diff --git a/packages/elasticsearch-plugin/src/indexer.controller.ts b/packages/elasticsearch-plugin/src/indexer.controller.ts index 813cef3f6c..bf0754ca50 100644 --- a/packages/elasticsearch-plugin/src/indexer.controller.ts +++ b/packages/elasticsearch-plugin/src/indexer.controller.ts @@ -191,7 +191,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes ctx: rawContext, productVariantId, channelId, - }: AssignVariantToChannelMessage['data']): Observable { + }: RemoveVariantFromChannelMessage['data']): Observable { const ctx = RequestContext.deserialize(rawContext); return asyncObservable(async () => { const productVariant = await this.connection.getEntityOrThrow(