Command Queue and Scheduler for PHP7 and Redis
I built BusQue because I found a lack of choice of simple message queues for medium-sized PHP applications.
The name BusQue signifies Command Bus + Message Queue. It was designed to be used in conjunction with Tactician and Redis using either the PHPRedis or Predis clients, along with a serializer such as PHP serialize(), but is open to replacement with alternate adapters.
One key feature I found missing in other queues is the ability to assign a unique ID to a job, allowing the same job to be queued multiple times but have it only execute once after the last insertion.
Install with composer:
composer require mgdigital/busque
Or get the Symfony bundle:
composer require mgdigital/busque-bundle
You'll also need a Redis server to run the queues on.
To use BusQue you first need to instantiate an instance of
BusQue\Implementationwith its dependencies. A basic configuration could look something like this:
// The preferred client is PHPRedis: $client = new Redis(); $adapter = new BusQue\Redis\PHPRedis\PHPRedisAdapter($client);
// A Predis adepter is included, although Predis can have issues when used in long-running processes. // $client = new Predis\Client(); // $adapter = new BusQue\Redis\Predis\PredisAdapter($client);
$driver = new BusQue\Redis\RedisDriver($adapter);
// The PHP serializer should fit most use cases: $serializer = new BusQue\Serializer\PHPCommandSerializer();
// The MD5 generator creates an ID unique to the serialized command: $idGenerator = new BusQue\IdGenerator\Md5IdGenerator($serializer);
$implementation = new BusQue\Implementation( // Puts all commands into the "default" queue: new BusQue\QueueResolver\SimpleQueueResolver('default'), $serializer, $idGenerator, // The Redis driver is used as both the queue and scheduler: $driver, $driver, // Always returns the current time: new BusQue\SystemClock(), // Inject your command bus here: new BusQue\Tactician\CommandBusAdapter($commandBus), // Inject your logger here: new Psr\Log\NullLogger() );
$busQue = new BusQue\BusQue($implementation);
BusQue\Handler\ScheduledCommandHandlerclasses also needs to be registered with your command bus (Tactician). See the Tactician website for further information on using a command bus.
If you're using the Symfony bundle, then all of the above is done for you, and you can just get the
busqueservice from the container.
SendEmailCommandis a command which you've configured Tactician to handle:
workQueue('default'); // Hello Joe!
Or in your Symfony app run
app/console busque:queue_worker default
You need to run at least one worker instance for each of your queues, using something like supervisord.
Tip: If you want to see the commands being handled by the worker in the console, configure some logging middleware in Tactician, then run the
busque:queue_workercommand with the
handle(new BusQue\ScheduledCommand($command, new \DateTime('+1 minute')));
$busQue->scheduleCommand($command, new \DateTime('+1 minute'));
Only one scheduler worker is needed to manage the schedule for all queues. The scheduler worker's only job is to queue commands which are due. A queue worker must also be running to handle these commands.
workSchedule(); // 1 minute later... Hello Joe!
Or in your Symfony app run
This command is queued every time the stock level of a product changes, but we give the command an ID:
$uniqueCommandId = 'SyncStock' . $productId;
$commandBus->handle(new BusQue\QueuedCommand($command, $uniqueCommandId));
When you don't specify a unique command ID, one will be generated automatically.
What if the queue is busy and hasn't had time to process this command, before the stock level of this product changes a second time? The last thing we want is a duplicate of this message going into the queue, the stock level still only needs syncing once.
Because we identified the command by the product ID, it will only be allowed in the queue (or the scheduler) once at any given time.
Conversely, if you wanted to be able to issue the same command multiple times, and be sure the queue worker will run each copy of the command, you would have to ensure each copy of the command has a unique ID.
This behaviour works as follows:
Using the MD5IdGenerator will generate an ID consistently unique to the command and its payload. An alternate ID generator could be used if different behaviour is needed.
We can also check the number of items in any queue:
getQueuedCount($queueName); // 0
Queues are created automatically if they don't exist, using whichever queue name is returned from the
QueueResolverInterfaceadapter. A worker can work on a queue which doesn't exist yet. You need to make sure that if a new queue name is generated, there is a worker to receive the commands in that queue.
listQueues(); // ['SendEmailCommand', 'SyncStockCommand']
If you want to cancel a command for any reason, you can remove all trace of it with the following call:
listQueuedIds($queueName); // ['command1id', 'command2id']
listInProgressIds($queueName); // 
This method returns an unserialized command from BusQue based on its queue name and ID, leaving any messages in the queue untouched, and throwing a
BusQue\CommandNotFoundExceptionif the command was not found in the command store.
Further convenience methods can be found in the
See the test suite output on Travis CI:
Run the phpspec test suite:
bin/phpspec run -f pretty
And run the Behat acceptance suite:
By default the Behat suite will test integration with PHPRedis. Integration with Predis can also be tested:
bin/behat --profile predis
These tests will attempt to write to a Redis instance at
redis://redis:6379by default. You can configure an alternate test client by providing an alternate
FeatureContextclass extending either
A basic docker environment is included for testing.
cd docker docker-compose -f ./docker-compose.yml up docker exec -ti busque-php composer install docker exec -ti busque-php bin/behat