Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: taskforcesh/bullmq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v5.28.2
Choose a base ref
...
head repository: taskforcesh/bullmq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v5.29.0
Choose a head ref
  • 3 commits
  • 6 files changed
  • 2 contributors

Commits on Nov 22, 2024

  1. Copy the full SHA
    09f2571 View commit details
  2. test(worker): improve flaky test

    manast committed Nov 22, 2024
    Copy the full SHA
    88a720c View commit details
  3. chore(release): 5.29.0 [skip ci]

    # [5.29.0](v5.28.2...v5.29.0) (2024-11-22)
    
    ### Features
    
    * **queue:** refactor a protected addJob method allowing telemetry extensions ([09f2571](09f2571))
    semantic-release-bot committed Nov 22, 2024
    Copy the full SHA
    bf03f86 View commit details
Showing with 79 additions and 49 deletions.
  1. +7 −0 docs/gitbook/changelog.md
  2. +1 −1 package.json
  3. +55 −36 src/classes/queue.ts
  4. +1 −1 src/classes/worker.ts
  5. +1 −1 src/version.ts
  6. +14 −10 tests/test_worker.ts
7 changes: 7 additions & 0 deletions docs/gitbook/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# [5.29.0](https://github.com/taskforcesh/bullmq/compare/v5.28.2...v5.29.0) (2024-11-22)


### Features

* **queue:** refactor a protected addJob method allowing telemetry extensions ([09f2571](https://github.com/taskforcesh/bullmq/commit/09f257196f6d5a6690edbf55f12d585cec86ee8f))

## [5.28.2](https://github.com/taskforcesh/bullmq/compare/v5.28.1...v5.28.2) (2024-11-22)


2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bullmq",
"version": "5.28.2",
"version": "5.29.0",
"description": "Queue for messages and jobs based on Redis",
"homepage": "https://bullmq.io/",
"main": "./dist/cjs/index.js",
91 changes: 55 additions & 36 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
@@ -311,47 +311,66 @@ export class Queue<
opts = { ...opts, telemetryMetadata: srcPropagationMedatada };
}

if (opts && opts.repeat) {
if (opts.repeat.endDate) {
if (+new Date(opts.repeat.endDate) < Date.now()) {
throw new Error(
'End date must be greater than current timestamp',
);
}
}
const job = await this.addJob(name, data, opts);

return (await this.repeat).updateRepeatableJob<
DataType,
ResultType,
NameType
>(name, data, { ...this.jobsOpts, ...opts }, { override: true });
} else {
const jobId = opts?.jobId;
span?.setAttributes({
[TelemetryAttributes.JobName]: name,
[TelemetryAttributes.JobId]: job.id,
});

if (jobId == '0' || jobId?.startsWith('0:')) {
throw new Error("JobId cannot be '0' or start with 0:");
}
return job;
},
);
}

const job = await this.Job.create<DataType, ResultType, NameType>(
this as MinimalQueue,
name,
data,
{
...this.jobsOpts,
...opts,
jobId,
},
);
this.emit('waiting', job as JobBase<DataType, ResultType, NameType>);
/**
* addJob is a telemetry free version of the add method, useful in order to wrap it
* with custom telemetry on subclasses.
*
* @param name
* @param data
* @param opts
*
* @returns Job
*/
protected async addJob(
name: NameType,
data: DataType,
opts?: JobsOptions,
): Promise<Job<DataType, ResultType, NameType>> {
if (opts && opts.repeat) {
if (opts.repeat.endDate) {
if (+new Date(opts.repeat.endDate) < Date.now()) {
throw new Error('End date must be greater than current timestamp');
}
}

span?.setAttributes({
[TelemetryAttributes.JobId]: job.id,
});
return (await this.repeat).updateRepeatableJob<
DataType,
ResultType,
NameType
>(name, data, { ...this.jobsOpts, ...opts }, { override: true });
} else {
const jobId = opts?.jobId;

return job;
}
},
);
if (jobId == '0' || jobId?.startsWith('0:')) {
throw new Error("JobId cannot be '0' or start with 0:");
}

const job = await this.Job.create<DataType, ResultType, NameType>(
this as MinimalQueue,
name,
data,
{
...this.jobsOpts,
...opts,
jobId,
},
);
this.emit('waiting', job as JobBase<DataType, ResultType, NameType>);

return job;
}
}

/**
2 changes: 1 addition & 1 deletion src/classes/worker.ts
Original file line number Diff line number Diff line change
@@ -995,7 +995,7 @@ will never work with more accuracy than 1ms. */
* This method waits for current jobs to finalize before returning.
*
* @param force - Use force boolean parameter if you do not want to wait for
* current jobs to be processed. When using telemetry, be mindful that it can
* current jobs to be processed. When using telemetry, be mindful that it can
* interfere with the proper closure of spans, potentially preventing them from being exported.
*
* @returns Promise that resolves when the worker has been closed.
2 changes: 1 addition & 1 deletion src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export const version = '5.28.2';
export const version = '5.29.0';
24 changes: 14 additions & 10 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
@@ -510,9 +510,17 @@ describe('workers', function () {
await worker.close();
});

it('do not call moveToActive more than number of jobs + 1', async () => {
it('do not call moveToActive more than number of jobs + 2', async () => {
const numJobs = 50;
let completedJobs = 0;

const jobs: Promise<Job>[] = [];
for (let i = 0; i < numJobs; i++) {
jobs.push(queue.add('test', { foo: 'bar' }));
}

await Promise.all(jobs);

const worker = new Worker(
queueName,
async job => {
@@ -521,22 +529,16 @@ describe('workers', function () {
},
{ connection, prefix, concurrency: 100 },
);
await worker.waitUntilReady();

// Add spy to worker.moveToActive
const spy = sinon.spy(worker, 'moveToActive');
const bclientSpy = sinon.spy(
await worker.blockingConnection.client,
'bzpopmin',
);
await worker.waitUntilReady();

for (let i = 0; i < numJobs; i++) {
const job = await queue.add('test', { foo: 'bar' });
expect(job.id).to.be.ok;
expect(job.data.foo).to.be.eql('bar');
}

expect(bclientSpy.callCount).to.be.equal(1);
expect(bclientSpy.callCount).to.be.equal(0);

await new Promise<void>((resolve, reject) => {
worker.on('completed', (job: Job, result: any) => {
@@ -547,9 +549,11 @@ describe('workers', function () {
});
});

expect(completedJobs).to.be.equal(numJobs);
expect(bclientSpy.callCount).to.be.equal(2);

// Check moveToActive was called numJobs + 2 times
expect(spy.callCount).to.be.equal(numJobs + 2);
expect(bclientSpy.callCount).to.be.equal(3);

await worker.close();
});