Skip to content

Commit

Permalink
feat(queue): add rateLimit method
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Nov 12, 2024
1 parent fa51711 commit 0d595da
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 16 deletions.
10 changes: 0 additions & 10 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,3 @@ export const defaultRepeatStrategy = (
// Ignore error
}
};

function removeUndefinedFields(obj: Record<string, any>) {
const newObj: Record<string, any> = {};
for (const key in obj) {
if (obj[key] !== undefined) {
newObj[key] = obj[key];
}
}
return newObj;
}
28 changes: 28 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,34 @@ export class Queue<
await super.close();
});
}

/**
* Overrides the rate limit to be active for the next jobs.
*
* @param expireTimeMs - expire time in ms of this rate limit.
*/
async rateLimit(expireTimeMs: number): Promise<void> {
await this.trace<void>(
SpanKind.INTERNAL,
'rateLimit',
this.name,
async span => {
span?.setAttributes({
[TelemetryAttributes.QueueRateLimit]: expireTimeMs,
});

await this.client.then(client =>
client.set(
this.keys.limiter,
Number.MAX_SAFE_INTEGER,
'PX',
expireTimeMs,
),
);
},
);
}

/**
* Resumes the processing of this queue globally.
*
Expand Down
2 changes: 1 addition & 1 deletion src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ export class Worker<

/**
* Overrides the rate limit to be active for the next jobs.
*
* @deprecated This method is deprecated and will be removed in v6. Use queue.rateLimit method instead.
* @param expireTimeMs - expire time in ms of this rate limit.
*/
async rateLimit(expireTimeMs: number): Promise<void> {
Expand Down
1 change: 1 addition & 0 deletions src/enums/telemetry-attributes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export enum TelemetryAttributes {
QueueDrainDelay = 'bullmq.queue.drain.delay',
QueueGrace = 'bullmq.queue.grace',
QueueCleanLimit = 'bullmq.queue.clean.limit',
QueueRateLimit = 'bullmq.queue.rate.limit',
JobType = 'bullmq.job.type',
QueueOptions = 'bullmq.queue.options',
QueueEventMaxLength = 'bullmq.queue.event.max.length',
Expand Down
16 changes: 11 additions & 5 deletions tests/test_concurrency.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { default as IORedis } from 'ioredis';
import { FlowProducer, QueueEvents, Queue, Worker } from '../src/classes';
import {
FlowProducer,
QueueEvents,
Queue,
Worker,
RateLimitError,
} from '../src/classes';
import { delay, removeAllQueueData } from '../src/utils';
import { beforeEach, describe, it, after as afterAll } from 'mocha';
import { v4 } from 'uuid';
Expand Down Expand Up @@ -156,8 +162,8 @@ describe('Concurrency', () => {
queueName,
async job => {
if (job.attemptsStarted === 1) {
await worker.rateLimit(dynamicLimit);
throw Worker.RateLimitError();
await queue.rateLimit(dynamicLimit);
throw new RateLimitError();
}
},
{
Expand Down Expand Up @@ -240,8 +246,8 @@ describe('Concurrency', () => {
queueName,
async job => {
if (job.attemptsStarted === 1) {
await worker.rateLimit(dynamicLimit);
throw Worker.RateLimitError();
await queue.rateLimit(dynamicLimit);
throw new RateLimitError();
}
},
{
Expand Down

0 comments on commit 0d595da

Please sign in to comment.