-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Complex tasks control flow (eg: batches) #120
Comments
I think that if you explain what you want to accomplish I could provide you with a code example or similar. |
Thanks @manast! Now that I actually wrote what I have in mind using a fictive API, I realize it's probably a lot of work and not really worth it; I should probably stick to bigger jobs rather than splitting them. Still, I thought I'd share anyways in case you have any advice (and who knows, you may find it amusing!), but don't spend too much time on this! Cheers, Here's a simplified promise-based version of what I'm trying to accomplish: // Initial job to fire this /list-users call
request('/list-users').then(function(userUrls) {
// Which in turn fires many jobs
return Promise.all(userUrls.map(function(userUrl) {
return request(userUrl).then(function(user) {
return user.interestingInfos;
});
});
}).then(function(allUsers) {
// Reassemble all data somehow
return allUsers.reduce(function(sum, userInfo) {
return userInfo.salary + sum;
}, 0);
}); I am trying to split this code in multiple small jobs; in hope to gain:
Here's an example of what I have in mind (somewhat pseudocode): // New type of queue that has two "process" handlers: a regular worker
// and a handler that is called when all child jobs have completed
var GroupQueue = require('bull/lib/group-queue');
var Queue = require('bull');
var listUserQueue = Queue('list-user', 6379, '127.0.0.1');
var fetchMultipleUsersQueue = GroupQueue('fetch-multiple-user', 6379, '127.0.0.1');
listUserQueue.add({ url: '/list-users' });
listUserQueue.process(function(job, done, chain) {
request(job.data.url).then(function(userUrls) {
var fetchMultipleJobsParams = userUrls.map(function(url) {
return { url: url };
});
var groupJob = fetchMultipleUsersQueue.add(fetchMultipleJobsParams);
// chain: new concept that makes this job dependant of another job
// I _really_ have no idea if that can be done in a sane way.
chain(groupJob);
}).catch(done);
});
fetchMultipleUsersQueue.process({
// Called to process a single job
unit: function(job, done) {
request(job.data.url).then(function(user) {
var json = JSON.stringify(user.interestingInfos);
redis.set(fetchMultipleUsersQueue.toKey('result:' + job.jobId), json);
done();
}).catch(done);
},
// Called when all single jobs have completed for this group
group: function(jobs, done) {
var resultKeys = jobs.map(function(job) {
return fetchMultipleUsersQueue.toKey('result:' + job.jobId);
});
redis.mget(resultKeys).then(function(allUsers) {
var result = allUsers.map(JSON.parse).reduce(function(sum, userInfo) {
return userInfo.salary + sum;
}, 0);
// Do something with result, save it to the database or something
done();
});
}
}); |
Thank you for the example. Actually I have worked a bit trying to figure out a "chain" kind of api, since that would be really useful to create complex flows. I will analize this and try to figure out how such a chain mechanism could be implemented reliably. |
Lets the discussion continue here: #340 |
Hey,
I've dug through Bull's source and it appears there's no support for complex control flow / batched operations (chained or parallel). I'm looking for something akin to Celery Primitives or Sidekiq Pro batches.
Any idea how this could be integrated in bull, either as an extension or directly in the core?
The text was updated successfully, but these errors were encountered: