diff --git a/tests/test_flow.ts b/tests/test_flow.ts index 8736904bb6..0369a39988 100644 --- a/tests/test_flow.ts +++ b/tests/test_flow.ts @@ -3286,6 +3286,102 @@ describe('flows', () => { }); }); + describe('when children have delay', () => { + it('moves children to delayed', async () => { + const name = 'child-job'; + const values = [{ idx: 0, bar: 'something' }]; + + const topQueueName = `top-queue-${v4()}`; + + let parentProcessor; + const childrenWorker = new Worker( + queueName, + async (job: Job) => { + await delay(500); + return values[job.data.idx]; + }, + { + connection, + prefix, + }, + ); + + const completed = new Promise((resolve, reject) => { + childrenWorker.on('completed', async function () { + resolve(); + }); + }); + + const processingTop = new Promise((resolve, reject) => [ + (parentProcessor = async (job: Job) => { + try { + const { processed } = await job.getDependencies(); + expect(Object.keys(processed)).to.have.length(1); + + const childrenValues = await job.getChildrenValues(); + + const jobKey = queue.toKey(tree.children[0].job.id); + expect(childrenValues[jobKey]).to.be.deep.equal(values[0]); + expect(processed[jobKey]).to.be.deep.equal(values[0]); + + resolve(); + } catch (err) { + console.error(err); + reject(err); + } + }), + ]); + + const parentWorker = new Worker(topQueueName, parentProcessor, { + connection, + prefix, + }); + + const flow = new FlowProducer({ connection, prefix }); + const tree = await flow.add({ + name: 'root-job', + queueName: topQueueName, + data: {}, + children: [ + { + name, + data: { idx: 0, foo: 'bar' }, + queueName, + opts: { + delay: 2000, + }, + }, + ], + }); + + expect(tree).to.have.property('job'); + expect(tree).to.have.property('children'); + + const { children, job } = tree; + const isWaitingChildren = await job.isWaitingChildren(); + + expect(isWaitingChildren).to.be.true; + expect(children).to.have.length(1); + + expect(children[0].job.id).to.be.ok; + expect(children[0].job.data.foo).to.be.eql('bar'); + + const isDelayed = await children![0].job.isDelayed(); + + expect(isDelayed).to.be.true; + + await completed; + + await childrenWorker.close(); + + await processingTop; + await parentWorker.close(); + await flow.close(); + + await removeAllQueueData(new IORedis(redisHost), topQueueName); + }); + }); + it('should not process parent if child fails', async () => { const name = 'child-job';