Skip to content

Commit

Permalink
fix(core): Reduce chance of index err in assigning variants to channels
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbromley committed Feb 10, 2021
1 parent 73d673c commit 58e3f7b
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 7 deletions.
3 changes: 1 addition & 2 deletions packages/core/e2e/default-search-plugin.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ describe('Default search plugin', () => {
}, TEST_SETUP_TIMEOUT_MS);

afterAll(async () => {
await awaitRunningJobs(adminClient);
await server.destroy();
});

Expand Down Expand Up @@ -937,8 +938,6 @@ describe('Default search plugin', () => {
input: { channelId: secondChannel.id, productVariantIds: ['T_10', 'T_15'] },
});
await awaitRunningJobs(adminClient);
// The postgres test is kinda flaky so we stick in a pause for good measure
await new Promise(resolve => setTimeout(resolve, 500));

adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);

Expand Down
20 changes: 16 additions & 4 deletions packages/core/src/service/services/product-variant.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -497,12 +497,18 @@ export class ProductVariantService {
variant.price * priceFactor,
input.channelId,
);
this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'assigned'));
}
return this.findByIds(
const result = await this.findByIds(
ctx,
variants.map(v => v.id),
);
// Publish the events at the latest possible stage to decrease the chance of race conditions
// whereby an event listener triggers a query which does not yet have access to the changes
// within the current transaction.
for (const variant of variants) {
this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'assigned'));
}
return result;
}

async removeProductVariantsFromChannel(
Expand Down Expand Up @@ -544,12 +550,18 @@ export class ProductVariantService {
input.channelId,
]);
}
this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'removed'));
}
return this.findByIds(
const result = await this.findByIds(
ctx,
variants.map(v => v.id),
);
// Publish the events at the latest possible stage to decrease the chance of race conditions
// whereby an event listener triggers a query which does not yet have access to the changes
// within the current transaction.
for (const variant of variants) {
this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'removed'));
}
return result;
}

private async validateVariantOptionIds(ctx: RequestContext, input: CreateProductVariantInput) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ describe('Elasticsearch plugin', () => {
}, TEST_SETUP_TIMEOUT_MS);

afterAll(async () => {
await awaitRunningJobs(adminClient);
await server.destroy();
});

Expand Down
2 changes: 1 addition & 1 deletion packages/elasticsearch-plugin/src/indexer.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
ctx: rawContext,
productVariantId,
channelId,
}: AssignVariantToChannelMessage['data']): Observable<AssignVariantToChannelMessage['response']> {
}: RemoveVariantFromChannelMessage['data']): Observable<RemoveVariantFromChannelMessage['response']> {
const ctx = RequestContext.deserialize(rawContext);
return asyncObservable(async () => {
const productVariant = await this.connection.getEntityOrThrow(
Expand Down

0 comments on commit 58e3f7b

Please sign in to comment.