Skip to content

Commit

Permalink
fix(core): Queue concurrent search index writes to avoid key conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbromley committed Jun 5, 2019
1 parent d45ff11 commit ae1145a
Showing 1 changed file with 3 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { unique } from '../../../../../common/lib/unique';
import { RequestContext } from '../../../api/common/request-context';
import { FacetValue } from '../../../entity/facet-value/facet-value.entity';
import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
import { AsyncQueue } from '../async-queue';
import { SearchIndexItem } from '../search-index-item.entity';

import { CompletedMessage, ConnectedMessage, Message, MessageType, ReturnRawBatchMessage, SaveVariantsPayload, VariantsSavedMessage } from './ipc';
Expand Down Expand Up @@ -39,6 +40,7 @@ export class IndexBuilder {
private connection: Connection;
private indexQueryBuilder: SelectQueryBuilder<ProductVariant>;
private onMessageHandlers = new Set<(message: string) => void>();
private queue = new AsyncQueue('search-index');

/**
* When running in the main process, it should be constructed with the existing connection.
Expand Down Expand Up @@ -139,7 +141,7 @@ export class IndexBuilder {
collectionIds: v.collections.map(c => c.id.toString()),
}),
);
await this.connection.getRepository(SearchIndexItem).save(items);
await this.queue.push(() => this.connection.getRepository(SearchIndexItem).save(items));
if (batch === total - 1) {
return new CompletedMessage(true);
} else {
Expand Down

0 comments on commit ae1145a

Please sign in to comment.