BBQ is a message queue abstraction library for PHP (5.3+). The library supports different queue types that you can mix as required by your application.
As the actual queue services are abstracted, you can use different queue types in different environments.
Via composer.json
"require": {
"eventio/bbq": "dev-master"
}
For Symfony2 projects, you can use EventioBBQBundle
You need at least one queue and register it with BBQ()
. A BBQ Queue acts as an
abstraction layer to a message queue service.
<?php
$bbq = new BBQ();
$queue = new DirectoryQueue('tasks', '/var/lib/bbq/email_tasks');
$bbq->registerQueue($queue);
After the queue is registered, you can consume the queue by
- pushing messages; or
- fetching messages
Push messages
$bbq->pushJob('tasks', new StringPayload('New task payload'));
BBQ::pushJob()
accepts to arguments:
- id of the queue where to push the job
- payload for the job
Fetch messages
$job = $bbq->fetchJob('tasks');
$payload = $job->getPayload();
echo $payload; // Outputs "New task payload"
$bbq->deleteJob($job);
BBQ::fetchJob()
accepts two arguments:
- id of the queue where from get a job (mandatory)
- optionally the timeout (seconds) how long we should wait for the task from the queue
As you can see from the example above, you should explicitly delete the job with deleteJob($job)
after
you have processed the job successfully. Otherwise, the job is most likely to be returned to the queue.
The actual behavior depends on the queue type.
A BBQ Queue combines the message queue service and the actual queue hosted by the service. When you use BBQ in your application, you don't need to know which service actually hosts the queue. You can easily use different queue types in different environments (dev, test, prod).
See Supported Queue Types below.
Each queue is registered with BBQ()
with an id. The id is any string that identifies the queue
in your application.
$queue = new DirectoryQueue('queue_id', '/tmp/queue');
DirectoryQueue persists jobs in files in the given directory.
Non-deleted but fetched jobs are returned to the queue as new jobs.
$pheanstalk = new \Pheanstalk_Pheanstalk('127.0.0.1');
$queue = new PheanstalkTubeQueue('queue_id', $pheanstalk, 'tube_name');
PheanstalkTubeQueue uses the Pheanstalk library to access
the configured beanstalkd server and one of it's tube. You need to pass an instance of \Pheanstalk_Pheanstalk
and
the name of the tube to the constructor.
Non-deleted but fetched jobs are returned to the queue when the script ends.
RedisQueue (Redis server)
$redis = new \Predis\Client();
$queueListKey = 'queue_key';
$queue = new RedisQueue('queue_id', $redis, $queueListKey);
RedisQueue uses the Predis PHP Library to access the configured Redis servers. The actual queue is implemented by a Redis list.
pushJob()
adds the job payload to the list using[LPUSH](http://redis.io/commands/lpush)
fetchJob()
fetches the job using[BRPOPLPUSH](http://redis.io/commands/brpoplpush)
or[RPOPLPUSH](http://redis.io/commands/rpoplpush)
finalizeJob()
deletes the job using[LREM](http://redis.io/commands/lrem)
from the processing queuereleaseJob()
moves the job back to the list queue using[RPOPLPUSH](http://redis.io/commands/lrem)
Non-deleted but fetched jobs are returned to the queue when the script ends.
RedisQueue uses a concept of processing queue to ensure the queue reliability also in case of the client
failures. Processing queue lives in a special key only between fetchJob()
and finalizeJob()
(or releaseJob()
) calls.
The processing queue key name is automatically constructed in fetchJob()
call and follows by default the pattern
<queue_name>:<host_name>:<pid>(random unique string)
. Read more about reliable queue pattern.
Queue Configuration
You can further customize the queue configuration by passing fourth argument to the queue constructor.
$queue = new RedisQueue('queue_id', $redis, $queueListKey, $configuration);
$configuration
should be an associative array. The default configuration (and possible variables) are following.
$configuration = array(
'processing_queue_key_prefix' => '%q:%h:%p',
'allow_infinite_blocking' => false,
'skip_shutdown_release' => false,
);
processing_queue_key_prefix
: The prefix pattern for the processing queue key.
There are a few placeholders that are replaced with actual values:
%q
main queue name, %h
hostname and %p
PHP process ID.
allow_infinite_blocking
: By default, if you do not pass any timeout
(or NULL
or 0
or false
) for fetchJob()
, RedisQueue will do
non-blocking [RPOPLPUSH](http://redis.io/commands/rpoplpush)
call instead of blocking [BRPOPLPUSH](http://redis.io/commands/brpoplpush)
. If the queue
contains no jobs, the function is returned immediately. If you set allow_infinite_blocking
to true
and
pass no timeout to fetchJob()
, the queue forces to use [BRPOPLPUSH](http://redis.io/commands/brpoplpush)
even
with no timeout (=infinite blocking). Use with care.
ship_shutdown_release
: By default, the queue registers a call that releases
possibly unreleased and unfinished but fetched jobs back to the queue. Set to true
to
disable the functionality.
$ironMQ = new \IronMQ(array(
'token' => 'YOUR_IRONMQ_TOKEN',
'project_id' => 'YOUR_IRONMQ_PROJECT_ID'
));
$queue = new IronMQQueue('queue_id', $ironMQ, 'queue_name');
IronMQQueue accesses Iron.io's IronMQ service over HTTP(S) Interface.
Non-deleted but fetched jobs are returned to the queue if they are not deleted after timeout has passed. The timeout is 60 seconds my default.
$queue = new ArrayQueue('queue_id');
The messages are stored in an array inside the queue object. Apparently this type of queue is not persistent between PHP processes and is useful mainly in testing.
The whole queue is destroyed when the script ends.
As the library is in its very early stages, you are more than welcome to contribute the work
- by fixing bugs
- by writing new tests
- by implementing new queue types
- by giving ideas and comments on the code
Copyright Eventio Oy, Ville Mattila, 2013
Released under the The MIT License