Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better control over job queue execution #1137

Closed
michaelbromley opened this issue Oct 7, 2021 Discussed in #1133 · 1 comment
Closed

Better control over job queue execution #1137

michaelbromley opened this issue Oct 7, 2021 Discussed in #1133 · 1 comment
Assignees
Labels
design 📐 This issue deals with high-level design of a feature type: feature ✨ @vendure/core

Comments

@michaelbromley
Copy link
Member

See this discussion for a detailed account of the issue and exploration of solutions:

Discussed in #1133

Originally posted by Izayda October 5, 2021
Currently, bulk import/update of products generate huge amount of similar operations:

  • Any change of variant/product generate apply-collections-filters job. Problems:
    • Each time job perform equal operation (iterate over each collection and apply collections filters), but that can be done just once after last update/import.
  • Any change of variant/product generate update-index-job. Problems:
    • Job is called from product variant level. But is executed at product level, because each job get product for variant and reindex all variants for product. So, there will be many equal jobs.
    • Job is dependent from collections. But can be execute in parallel with apply-collection-job, so, outdated collections data can be passed to index

The aim of this discussion: to determine issues, that should be resolved to eliminate this problems.
Original discussion in Slack by @skid: https://vendure-ecommerce.slack.com/archives/CKYMF0ZTJ/p1632747675049800

Probably possible ways:

  1. Turn off 'apply-collections-filters' and 'update-search-index' jobs. Update variants and products. Complete 'apply-collections-filters' job. Then complete full search reindex.
  2. Make a mutation "startBatchUpdate" that gives us batchUpdateId. Update variants and products and pass to all update/create/delete operations this batchUpdateId. Make a mutation "finishBatchUpdate", that accept batchUpdateId. This mutation complete 'apply-collections-filters' job and then start reindexing of affected variants.
  3. Make new job "update-variant-collections". This job should get all filters and calculate in which collections this variant should be mapped. Then this job get actual collections for this variant and calculate "differ". Then update that "differ". Then this job start reindexing of affected variant.

Is it possible to implement third way (to get collection filters to update from updated data of variant)?
If yes, i think combination of 2 and 3 can be implemented. If no, only second variant.

@michaelbromley need your input and decision.

@michaelbromley
Copy link
Member Author

michaelbromley commented Oct 8, 2021

This is now implemented and will be available in prerelease v1.3.0-beta.1.

Still to do: Documentation and e2e & unit tests.

Here's an example of the API in use:

/**
 * This is a buffer which will collection all the 'apply-collection-filters' jobs and buffer them.
 */
export class CollectionJobBuffer implements JobBuffer<ApplyCollectionFiltersJobData> {
    readonly id = 'search-plugin-apply-collection-filters';

    collect(job: Job): boolean {
        return job.queueName === 'apply-collection-filters';
    }

   /**
    * When the buffer gets flushed, this function will be passed all the collected jobs
    * and will reduce them down to a single job that has aggregated all of the collectionIds.
    */ 
    reduce(collectedJobs: Array<Job<ApplyCollectionFiltersJobData>>): Array<Job<any>> {
        const collectionIdsToUpdate = collectedJobs.reduce((result, job) => {
            return [...result, ...job.data.collectionIds];
        }, [] as ID[]);

        const referenceJob = collectedJobs[0];
        const batchedCollectionJob = new Job<ApplyCollectionFiltersJobData>({
            ...referenceJob,
            id: undefined,
            data: {
                collectionIds: unique(collectionIdsToUpdate),
                ctx: referenceJob.data.ctx,
                applyToChangedVariantsOnly: referenceJob.data.applyToChangedVariantsOnly,
            },
        });

        return [batchedCollectionJob];
    }
}

This buffer is used in the DefaultSearchPlugin like this:

export class SearchJobBufferService implements OnApplicationBootstrap {

    readonly collectionJobBuffer = new CollectionJobBuffer();

    onApplicationBootstrap(): any {
        if (this.bufferUpdates === true) {
            this.jobQueueService.addBuffer(this.collectionJobBuffer);
        }
    }

    async runPendingSearchUpdates(): Promise<void> {
        if (!this.bufferUpdates) {
            return;
        }
        await this.jobQueueService.flush(this.collectionJobBuffer);
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
design 📐 This issue deals with high-level design of a feature type: feature ✨ @vendure/core
Projects
None yet
Development

No branches or pull requests

1 participant