Skip to content

Commit

Permalink
fix(worker): remove rateLimit method in favor of queue (#2921)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Nov 22, 2024
1 parent d304267 commit c7c827f
Show file tree
Hide file tree
Showing 13 changed files with 180 additions and 92 deletions.
21 changes: 21 additions & 0 deletions docs/gitbook/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
## [5.28.2](https://github.com/taskforcesh/bullmq/compare/v5.28.1...v5.28.2) (2024-11-22)


### Bug Fixes

* **queue:** change _jobScheduler from private to protected for extension ([#2920](https://github.com/taskforcesh/bullmq/issues/2920)) ([34c2348](https://github.com/taskforcesh/bullmq/commit/34c23485bcb32b3c69046b2fb37e5db8927561ce))

## [5.28.1](https://github.com/taskforcesh/bullmq/compare/v5.28.0...v5.28.1) (2024-11-20)


### Bug Fixes

* **scheduler:** use Job class from getter for extension ([#2917](https://github.com/taskforcesh/bullmq/issues/2917)) ([5fbb075](https://github.com/taskforcesh/bullmq/commit/5fbb075dd4abd51cc84a59575261de84e56633d8))

# [5.28.0](https://github.com/taskforcesh/bullmq/compare/v5.27.0...v5.28.0) (2024-11-19)


### Features

* **job-scheduler:** add telemetry support to the job scheduler ([72ea950](https://github.com/taskforcesh/bullmq/commit/72ea950ea251aa12f879ba19c0b5dfeb6a093da2))

# [5.27.0](https://github.com/taskforcesh/bullmq/compare/v5.26.2...v5.27.0) (2024-11-19)


Expand Down
12 changes: 7 additions & 5 deletions docs/gitbook/guide/rate-limiting.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,23 @@ await queue.add('rate limited paint', { customerId: 'my-customer-id' });

Sometimes is useful to rate-limit a queue manually instead of based on some static options. For example, you may have an API that returns `429 Too Many Requests`, and you want to rate-limit the queue based on that response.

For this purpose, you can use the worker method **`rateLimit`** like this:
For this purpose, you can use the queue method **`rateLimit`** like this:

```typescript
import { Worker } from 'bullmq';
import { Queue, RateLimitError, Worker } from 'bullmq';

const queue = new Queue('myQueue', { connection });

const worker = new Worker(
'myQueue',
async () => {
const [isRateLimited, duration] = await doExternalCall();
if (isRateLimited) {
await worker.rateLimit(duration);
await queue.rateLimit(duration);
// Do not forget to throw this special exception,
// since we must differentiate this case from a failure
// in order to move the job to wait again.
throw Worker.RateLimitError();
throw new RateLimitError();
}
},
{
Expand Down Expand Up @@ -130,6 +132,6 @@ By removing rate limit key, workers will be able to pick jobs again and your rat

## Read more:

- 💡 [Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Worker.html#rateLimit)
- 💡 [Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#rateLimit)
- 💡 [Get Rate Limit Ttl API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#getRateLimitTtl)
- 💡 [Remove Rate Limit Key API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#removeRateLimitKey)
10 changes: 6 additions & 4 deletions docs/gitbook/patterns/stop-retrying-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,23 @@ await queue.add(
When a job is rate limited using `Worker.RateLimitError` and tried again, the `attempts` check is ignored, as rate limiting is not considered a real error. However, if you want to manually check the attempts and avoid retrying the job, you can do the following:

```typescript
import { Worker, UnrecoverableError } from 'bullmq';
import { Queue, RateLimitError, Worker, UnrecoverableError } from 'bullmq';

const queue = new Queue('myQueue', { connection });

const worker = new Worker(
'myQueue',
async job => {
const [isRateLimited, duration] = await doExternalCall();
if (isRateLimited) {
await worker.rateLimit(duration);
await queue.rateLimit(duration);
if (job.attemptsMade >= job.opts.attempts) {
throw new UnrecoverableError('Unrecoverable');
}
// Do not forget to throw this special exception,
// since we must differentiate this case from a failure
// in order to move the job to wait again.
throw Worker.RateLimitError();
throw new RateLimitError();
}
},
{
Expand All @@ -54,4 +56,4 @@ const worker = new Worker(

## Read more:

- 💡 [Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Worker.html#rateLimit)
- 💡 [Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#rateLimit)
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bullmq",
"version": "5.27.0",
"version": "5.28.2",
"description": "Queue for messages and jobs based on Redis",
"homepage": "https://bullmq.io/",
"main": "./dist/cjs/index.js",
Expand Down
1 change: 1 addition & 0 deletions src/classes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export * from './child-processor';
export * from './errors';
export * from './flow-producer';
export * from './job';
export * from './job-scheduler';
// export * from './main'; this file must not be exported
// export * from './main-worker'; this file must not be exported
export * from './queue-base';
Expand Down
78 changes: 51 additions & 27 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { JobsOptions, RepeatStrategy } from '../types';
import { Job } from './job';
import { QueueBase } from './queue-base';
import { RedisConnection } from './redis-connection';
import { SpanKind, TelemetryAttributes } from '../enums';

export interface JobSchedulerJson {
key: string; // key is actually the job scheduler id
Expand Down Expand Up @@ -46,6 +47,12 @@ export class JobScheduler extends QueueBase {
);
}

if (!pattern && !every) {
throw new Error(
'Either .pattern or .every options must be defined for this repeatable job',
);
}

if (repeatOpts.immediately && repeatOpts.startDate) {
throw new Error(
'Both .immediately and .startDate options are defined for this repeatable job',
Expand Down Expand Up @@ -77,7 +84,7 @@ export class JobScheduler extends QueueBase {
now = startMillis > now ? startMillis : now;
}

let nextMillis;
let nextMillis: number;
if (every) {
nextMillis = prevMillis + every;

Expand All @@ -92,7 +99,7 @@ export class JobScheduler extends QueueBase {
const multi = (await this.client).multi();
if (nextMillis) {
if (override) {
await this.scripts.addJobScheduler(
this.scripts.addJobScheduler(
(<unknown>multi) as RedisClient,
jobSchedulerId,
nextMillis,
Expand All @@ -105,37 +112,54 @@ export class JobScheduler extends QueueBase {
},
);
} else {
await this.scripts.updateJobSchedulerNextMillis(
this.scripts.updateJobSchedulerNextMillis(
(<unknown>multi) as RedisClient,
jobSchedulerId,
nextMillis,
);
}

const job = this.createNextJob<T, R, N>(
(<unknown>multi) as RedisClient,
jobName,
nextMillis,
jobSchedulerId,
{ ...opts, repeat: filteredRepeatOpts },
jobData,
iterationCount,
return this.trace<Job<T, R, N>>(
SpanKind.PRODUCER,
'add',
`${this.name}.${jobName}`,
async (span, srcPropagationMedatada) => {
const job = this.createNextJob<T, R, N>(
(<unknown>multi) as RedisClient,
jobName,
nextMillis,
jobSchedulerId,
{
...opts,
repeat: filteredRepeatOpts,
telemetryMetadata: srcPropagationMedatada,
},
jobData,
iterationCount,
);

const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][]

// Check if there are any errors
const erroredResult = results.find(result => result[0]);
if (erroredResult) {
throw new Error(
`Error upserting job scheduler ${jobSchedulerId} - ${erroredResult[0]}`,
);
}

// Get last result with the job id
const lastResult = results.pop();
job.id = lastResult[1] as string;

span?.setAttributes({
[TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
[TelemetryAttributes.JobId]: job.id,
});

return job;
},
);

const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][]

// Check if there are any errors
const erroredResult = results.find(result => result[0]);
if (erroredResult) {
throw new Error(
`Error upserting job scheduler ${jobSchedulerId} - ${erroredResult[0]}`,
);
}

// Get last result with the job id
const lastResult = results.pop();
job.id = lastResult[1] as string;
return job;
}
}

Expand Down Expand Up @@ -170,7 +194,7 @@ export class JobScheduler extends QueueBase {

mergedOpts.repeat = { ...opts.repeat, count: currentCount };

const job = new Job<T, R, N>(this, name, data, mergedOpts, jobId);
const job = new this.Job<T, R, N>(this, name, data, mergedOpts, jobId);
job.addJob(client);

return job;
Expand Down
7 changes: 0 additions & 7 deletions src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,6 @@ export class QueueGetters<JobBase extends Job = Job> extends QueueBase {
});
}

/**
* Helper to easily extend Job class calls.
*/
protected get Job(): typeof Job {
return Job;
}

private sanitizeJobTypes(types: JobType[] | JobType | undefined): JobType[] {
const currentTypes = typeof types === 'string' ? [types] : types;

Expand Down
2 changes: 1 addition & 1 deletion src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ export class Queue<
protected libName = 'bullmq';

private _repeat?: Repeat; // To be deprecated in v6 in favor of JobScheduler
private _jobScheduler?: JobScheduler;
protected _jobScheduler?: JobScheduler;

constructor(
name: string,
Expand Down
32 changes: 0 additions & 32 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,6 @@ export class Worker<
protected processFn: Processor<DataType, ResultType, NameType>;
protected running = false;

static RateLimitError(): Error {
return new RateLimitError();
}

constructor(
name: string,
processor?: string | URL | null | Processor<DataType, ResultType, NameType>,
Expand Down Expand Up @@ -624,34 +620,6 @@ export class Worker<
}
}

/**
* Overrides the rate limit to be active for the next jobs.
* @deprecated This method is deprecated and will be removed in v6. Use queue.rateLimit method instead.
* @param expireTimeMs - expire time in ms of this rate limit.
*/
async rateLimit(expireTimeMs: number): Promise<void> {
await this.trace<void>(
SpanKind.INTERNAL,
'rateLimit',
this.name,
async span => {
span?.setAttributes({
[TelemetryAttributes.WorkerId]: this.id,
[TelemetryAttributes.WorkerRateLimit]: expireTimeMs,
});

await this.client.then(client =>
client.set(
this.keys.limiter,
Number.MAX_SAFE_INTEGER,
'PX',
expireTimeMs,
),
);
},
);
}

get minimumBlockTimeout(): number {
return this.blockingConnection.capabilities.canBlockFor1Ms
? /* 1 millisecond is chosen because the granularity of our timestamps are milliseconds.
Expand Down
1 change: 1 addition & 0 deletions src/enums/telemetry-attributes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export enum TelemetryAttributes {
JobResult = 'bullmq.job.result',
JobFailedReason = 'bullmq.job.failed.reason',
FlowName = 'bullmq.flow.name',
JobSchedulerId = 'bullmq.job.scheduler.id',
}

export enum SpanKind {
Expand Down
20 changes: 20 additions & 0 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1809,6 +1809,12 @@ describe('Job Scheduler', function () {
);
});

it('should throw an error when not specifying .pattern or .every', async function () {
await expect(queue.upsertJobScheduler('repeat', {})).to.be.rejectedWith(
'Either .pattern or .every options must be defined for this repeatable job',
);
});

it('should throw an error when using .immediately and .startDate simultaneously', async function () {
await expect(
queue.upsertJobScheduler('repeat', {
Expand All @@ -1821,6 +1827,20 @@ describe('Job Scheduler', function () {
);
});

it("should return a valid job with the job's options and data passed as the job template", async function () {
const repeatOpts = {
every: 1000,
};

const job = await queue.upsertJobScheduler('test', repeatOpts, {
data: { foo: 'bar' },
});

expect(job).to.be.ok;
expect(job!.data.foo).to.be.eql('bar');
expect(job!.opts.repeat!.every).to.be.eql(1000);
});

it('should emit a waiting event when adding a repeatable job to the waiting list', async function () {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);
Expand Down
Loading

0 comments on commit c7c827f

Please sign in to comment.