diff --git a/packages/job-queue-plugin/src/bullmq/plugin.ts b/packages/job-queue-plugin/src/bullmq/plugin.ts index ae0a6b8aac..93f73578e8 100644 --- a/packages/job-queue-plugin/src/bullmq/plugin.ts +++ b/packages/job-queue-plugin/src/bullmq/plugin.ts @@ -3,6 +3,7 @@ import { HealthCheckRegistryService, PluginCommonModule, VendurePlugin } from '@ import { BullMQJobQueueStrategy } from './bullmq-job-queue-strategy'; import { BULLMQ_PLUGIN_OPTIONS } from './constants'; import { RedisHealthIndicator } from './redis-health-indicator'; +import { RedisJobBufferStorageStrategy } from './redis-job-buffer-storage-strategy'; import { BullMQPluginOptions } from './types'; /** @@ -103,6 +104,7 @@ import { BullMQPluginOptions } from './types'; imports: [PluginCommonModule], configuration: config => { config.jobQueueOptions.jobQueueStrategy = new BullMQJobQueueStrategy(); + config.jobQueueOptions.jobBufferStorageStrategy = new RedisJobBufferStorageStrategy(); return config; }, providers: [ diff --git a/packages/job-queue-plugin/src/bullmq/redis-job-buffer-storage-strategy.ts b/packages/job-queue-plugin/src/bullmq/redis-job-buffer-storage-strategy.ts index 33778f6f88..91498fbeb1 100644 --- a/packages/job-queue-plugin/src/bullmq/redis-job-buffer-storage-strategy.ts +++ b/packages/job-queue-plugin/src/bullmq/redis-job-buffer-storage-strategy.ts @@ -1,9 +1,11 @@ -import { Injector, Job, JobBufferStorageStrategy } from '@vendure/core'; +import { Injector, Job, JobBufferStorageStrategy, JobConfig, Logger } from '@vendure/core'; import Redis, { Cluster, RedisOptions } from 'ioredis'; -import { BULLMQ_PLUGIN_OPTIONS } from './constants'; +import { BULLMQ_PLUGIN_OPTIONS, loggerCtx } from './constants'; import { BullMQPluginOptions } from './types'; +const BUFFER_LIST_PREFIX = 'vendure-job-buffer'; + export class RedisJobBufferStorageStrategy implements JobBufferStorageStrategy { private redis: Redis.Redis | Redis.Cluster; @@ -18,15 +20,52 @@ export class RedisJobBufferStorageStrategy implements JobBufferStorageStrategy { } } - async add(processorId: string, job: Job): Promise> { + async add(bufferId: string, job: Job): Promise> { + const result = await this.redis.lpush(this.keyName(bufferId), this.toJobConfigString(job)); return job; } async bufferSize(bufferIds?: string[]): Promise<{ [bufferId: string]: number }> { - throw new Error('Method not implemented.'); + const result: { [bufferId: string]: number } = {}; + for (const id of bufferIds || []) { + const key = this.keyName(id); + const count = await this.redis.llen(key); + result[id] = count; + } + return result; + } + + async flush(bufferIds?: string[]): Promise<{ [bufferId: string]: Job[] }> { + const result: { [bufferId: string]: Job[] } = {}; + for (const id of bufferIds || []) { + const key = this.keyName(id); + const items = await this.redis.lrange(key, 0, -1); + await this.redis.del(key); + result[id] = items.map(item => this.toJob(item)); + } + return result; } - async flush(bufferIds?: string[]): Promise<{ [bufferId: string]: Array> }> { - throw new Error('Method not implemented.'); + private keyName(bufferId: string) { + return `${BUFFER_LIST_PREFIX}:${bufferId}`; + } + + private toJobConfigString(job: Job): string { + const jobConfig: JobConfig = { + ...job, + data: job.data, + id: job.id ?? undefined, + }; + return JSON.stringify(jobConfig); + } + + private toJob(jobConfigString: string): Job { + try { + const jobConfig: JobConfig = JSON.parse(jobConfigString); + return new Job(jobConfig); + } catch (e) { + Logger.error(`Could not parse buffered job:\n${e.message}`, loggerCtx, e.stack); + throw e; + } } }