Skip to content

Commit

Permalink
test(flow): add test case where child has a delay (#2910)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Nov 16, 2024
1 parent 65c7291 commit 8204ea3
Showing 1 changed file with 96 additions and 0 deletions.
96 changes: 96 additions & 0 deletions tests/test_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3286,6 +3286,102 @@ describe('flows', () => {
});
});

describe('when children have delay', () => {
it('moves children to delayed', async () => {
const name = 'child-job';
const values = [{ idx: 0, bar: 'something' }];

const topQueueName = `top-queue-${v4()}`;

let parentProcessor;
const childrenWorker = new Worker(
queueName,
async (job: Job) => {
await delay(500);
return values[job.data.idx];
},
{
connection,
prefix,
},
);

const completed = new Promise<void>((resolve, reject) => {
childrenWorker.on('completed', async function () {
resolve();
});
});

const processingTop = new Promise<void>((resolve, reject) => [
(parentProcessor = async (job: Job) => {
try {
const { processed } = await job.getDependencies();
expect(Object.keys(processed)).to.have.length(1);

const childrenValues = await job.getChildrenValues();

const jobKey = queue.toKey(tree.children[0].job.id);
expect(childrenValues[jobKey]).to.be.deep.equal(values[0]);
expect(processed[jobKey]).to.be.deep.equal(values[0]);

resolve();
} catch (err) {
console.error(err);
reject(err);
}
}),
]);

const parentWorker = new Worker(topQueueName, parentProcessor, {
connection,
prefix,
});

const flow = new FlowProducer({ connection, prefix });
const tree = await flow.add({
name: 'root-job',
queueName: topQueueName,
data: {},
children: [
{
name,
data: { idx: 0, foo: 'bar' },
queueName,
opts: {
delay: 2000,
},
},
],
});

expect(tree).to.have.property('job');
expect(tree).to.have.property('children');

const { children, job } = tree;
const isWaitingChildren = await job.isWaitingChildren();

expect(isWaitingChildren).to.be.true;
expect(children).to.have.length(1);

expect(children[0].job.id).to.be.ok;
expect(children[0].job.data.foo).to.be.eql('bar');

const isDelayed = await children![0].job.isDelayed();

expect(isDelayed).to.be.true;

await completed;

await childrenWorker.close();

await processingTop;
await parentWorker.close();
await flow.close();

await removeAllQueueData(new IORedis(redisHost), topQueueName);
});
});

it('should not process parent if child fails', async () => {
const name = 'child-job';

Expand Down

0 comments on commit 8204ea3

Please sign in to comment.