Skip to content

Commit

Permalink
fix(core): Reduce chance of index err in assigning variants to channels
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbromley committed Feb 4, 2021
1 parent 25ba87d commit 8a1ff82
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 25 deletions.
11 changes: 1 addition & 10 deletions packages/core/e2e/default-search-plugin.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ describe('Default search plugin', () => {
}, TEST_SETUP_TIMEOUT_MS);

afterAll(async () => {
await awaitRunningJobs(adminClient);
await server.destroy();
});

Expand Down Expand Up @@ -938,11 +939,6 @@ describe('Default search plugin', () => {
});
await awaitRunningJobs(adminClient);

if (process.env.DB === 'postgres') {
// The postgres test is kinda flaky so we stick in a pause for good measure
await new Promise(resolve => setTimeout(resolve, 1000));
}

adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);

const { search: searchGrouped } = await doAdminSearchQuery({ groupByProduct: true });
Expand All @@ -969,11 +965,6 @@ describe('Default search plugin', () => {
});
await awaitRunningJobs(adminClient);

if (process.env.DB === 'postgres') {
// The postgres test is kinda flaky so we stick in a pause for good measure
await new Promise(resolve => setTimeout(resolve, 1000));
}

adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);

const { search: searchGrouped } = await doAdminSearchQuery({ groupByProduct: true });
Expand Down
20 changes: 16 additions & 4 deletions packages/core/src/service/services/product-variant.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -528,12 +528,18 @@ export class ProductVariantService {
variant.price * priceFactor,
input.channelId,
);
this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'assigned'));
}
return this.findByIds(
const result = await this.findByIds(
ctx,
variants.map(v => v.id),
);
// Publish the events at the latest possible stage to decrease the chance of race conditions
// whereby an event listener triggers a query which does not yet have access to the changes
// within the current transaction.
for (const variant of variants) {
this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'assigned'));
}
return result;
}

async removeProductVariantsFromChannel(
Expand Down Expand Up @@ -575,12 +581,18 @@ export class ProductVariantService {
input.channelId,
]);
}
this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'removed'));
}
return this.findByIds(
const result = await this.findByIds(
ctx,
variants.map(v => v.id),
);
// Publish the events at the latest possible stage to decrease the chance of race conditions
// whereby an event listener triggers a query which does not yet have access to the changes
// within the current transaction.
for (const variant of variants) {
this.eventBus.publish(new ProductVariantChannelEvent(ctx, variant, input.channelId, 'removed'));
}
return result;
}

private async validateVariantOptionIds(ctx: RequestContext, input: CreateProductVariantInput) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ describe('Elasticsearch plugin', () => {
}, TEST_SETUP_TIMEOUT_MS);

afterAll(async () => {
await awaitRunningJobs(adminClient);
await server.destroy();
});

Expand Down Expand Up @@ -808,11 +809,6 @@ describe('Elasticsearch plugin', () => {
});
await awaitRunningJobs(adminClient);

if (process.env.DB === 'postgres') {
// The postgres test is kinda flaky so we stick in a pause for good measure
await new Promise(resolve => setTimeout(resolve, 1000));
}

adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);

const { search: searchGrouped } = await doAdminSearchQuery(adminClient, {
Expand Down Expand Up @@ -843,11 +839,6 @@ describe('Elasticsearch plugin', () => {
});
await awaitRunningJobs(adminClient);

if (process.env.DB === 'postgres') {
// The postgres test is kinda flaky so we stick in a pause for good measure
await new Promise(resolve => setTimeout(resolve, 1000));
}

adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);

const { search: searchGrouped } = await doAdminSearchQuery(adminClient, {
Expand Down
2 changes: 1 addition & 1 deletion packages/elasticsearch-plugin/src/indexer.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
ctx: rawContext,
productVariantId,
channelId,
}: AssignVariantToChannelMessage['data']): Observable<AssignVariantToChannelMessage['response']> {
}: RemoveVariantFromChannelMessage['data']): Observable<RemoveVariantFromChannelMessage['response']> {
const ctx = RequestContext.deserialize(rawContext);
return asyncObservable(async () => {
const productVariant = await this.connection.getEntityOrThrow(
Expand Down

0 comments on commit 8a1ff82

Please sign in to comment.