Cake-Resque, a CakePHP plugin to manage queue system

Cake-Resque is a CakePHP plugin allowing you to put some tasks in a background queue, and execute them later.

Update 2012-02-17 : Refer to Github page to updated documentation
Update 2012-09-05 : Refer to official website for up-to-date API. CakeResque 1.1 comes with a lot of new features.

Background

Cake-Resque is based on Resque, written by defunkt, and used on Github to process background jobs. Resque uses Redis to store and retrieve the jobs, making it very fast. Among the redis advantage listed in the resque presentation are :

  • Atomic, O(1) list push and pop
  • Ability to paginate over lists without mutating them
  • Queryable keyspace, high visibility
  • Fast
  • Easy to install – no dependencies
  • Reliable Ruby client library
  • Store arbitrary strings
  • Support for integer counters
  • Persistent
  • Master-slave replication
  • Network aware

Plus, redis database is stored in the ram. It’s a very fast database, and used for tasks requiring many I/O, storing sessions, incrementing counter, caching, etc … Resque uses it to implement its queue structure, since it can retrieve and add data to a set in O(1) !

Resque is written for ruby, and Chris Boulton have done a good job porting it to PHP. Cake-Resque is based on CakePHP PHP-Resque Plugin by Mike Smullin.

Requirements

Installing Redis is fairly easy, on *nix system, it’s just 4 lines in the terminal and voila ! (Download redis).

PHP-Resque is included in Cake-Resque, so you don’t have to download it separately. Included version is the official version using Redisent as redis-php API.

Extra
I’ve made some modification and release another version of php-resque using phpredis. The phpredis version should be slightly faster, since phpredis is faster than redisent, because it’s a native php extension written in C. But you’ll have to compile and install that extension. Installation details are available on phpredis github page. Just download my version, and replace the php-resque folder in the plugin Vendor directory.

Installing

Clone or download the plugin from the Github repository, and put it in your cake Plugin directory. The plugin’s folder should be named ‘Resque’.

Configuring

Resque options are in Resque/Config/bootstrap.php. There, you can set up the redis server address, and other various options when creating a worker.

Then, you will have to load that bootstrap file into cake, via your app/Config/bootstrap.php. It depends on how you already load your existing plugin. Mine looks like this :

CakePlugin::loadAll(array(
    'Resque' => array('bootstrap' => true)
));

Finally, you load the ResqueComponent in your AppController :

public $components = array(/* you other components */, 'Resque.resque');

Starting the Resque service

In a queue system, to defer a task, you just put it in a temporary place, then at a fixed interval, someone else will go there to take it and execute it. In the jargon, that ‘someone’ is called a worker, and the task, a job.

The jobs are store in a redis set, a worker will retrieve the most older jobs in the set, execute it, remove it from the set, and repeat the same thing until the set is empty, then go to sleep, for X seconds, before doing the same thing again.

That worker should be running permanently, and thus, should be started in a shell.

Plugin shell is accessible via the cake console :

cake Resque.resque

Available subcommand are :

Command Description
start Start a new Resque worker.
stop Stop all Resque workers.
restart Stop all Resque workers, and start a new one.
stats View stats about processed/failed jobs.
tail View tail of the workers logs.
jobs Display a list of all available jobs.

We will walk through each command.

Start

In your terminal, type

cake Resque.resque start

to start a new worker, with default options (defined in Plugin/Resque/Config/bootstrap.php).

You can override some options using :

-u your_user : to start the worker process as your_user, user should be existing on the system,
-q queue1,queue2,queue3 : using other queues instead of ‘default’. Multiple queues must be separated with a comma, queues are created on start,
-i X : to define the number of seconds between each works (default : 5),
-n Y : to define the number of workers to fork (default : 1)

You can create as many workers as you want by using the -n options, or just simply run another cake Resque.resque start to create a new one. Each workers have its own process, and when executing a job, it will fork another process to do it.
Using the -n options will use pcntl to fork the process. Ensure that it’s enabled on your server before using it.

Once started, the worker will poll the redis server every x seconds. You should see its activities using redis monitor command.

redis-cli monitor

You should see another new line like that each x seconds :

1320705855.609884 "LPOP" "queue:default"

-> It’s working ! The worker is ‘poping’ the queue named default.

Stop

To stop all the workers (because you can’t stop a single worker yet) :

cake Resque.resque start

This will stop and shutdown all the workers. Workers that’s still processing jobs will be shutdown once the jobs are done. To force an immediate shutdown (that will quit all processing jobs and mark them as failed), use the -f option.

Restart

To restart the workers :

cake Resque.resque restart

All start and stop options are available inside restart.

Tail

Once started, each workers will log their actions in the app/tmp/logs/php-resque-worker.log file. You can tail that log using :

cake Resque.resque tail

or just add -f option when starting a worker. tail itself doesn’t takes arguments nor options.

Stats

stats also doesn’t takes arguments nor options.

It will print something like that

It displays the total number of processed and failed jobs since redis server start, and some stats for each workers. You should uses stats when you want to know if the workers were correctly started.

Jobs

Print a list of all available jobs

But we need to create the jobs first.

Creating a job

First, a job is a class with a method perform(), called by the worker when processing the job. We can take advantage of cake shell for DRY sake : a job is a shell. You should take a look at the CakePHP documentation about Shell. Understanding how a shell works, how to load and use models etc are essential for creating a job, since it runs as a shell.

class SendNotificationShell extends Shell
{
    function main()
    {
        $this->out("Hello World !");
    }

    function send()
    {
        // Send some notifications
    }

    function perform()
    {
       $this->initialize();
       // do something
    }
}

Class above is just a regular Cake shell, that you could call via

cake send_notification send

Magic happens when you add the perform() method to your shell, because Resque will parse all your shell and decide which one can be used as a job depending of that method existence.

You must have $this->initialize() in the perform method, in order to load used models if any. That method will then works like any other method in the shell, but can also be called by a resque worker. Two for the price of one !

We are calling the shell class through the backdoor, bypassing cake dispatcher, thus the need of initializing some task manually through with initialize(). It’s dirty, and I’m open if you have a more elegant way.

All your shell class should be located in app/Console/Command/

We now have the workers and the jobs, we are now able to add a job in the queue, for the workers to execute it.

Enqueuing a job

Console shell dispose of an enqueue command, allowing you to enqueue a specific job

cake enqueue default SendNotificationShell 'an@email.com', 'Hi! how are you ?'

This will enqueue, in the default queue, the job SendNotificationShell, as arguments an@email, and the string Hi! how are you?. You can add as many arguments as you like.

When a worker will came across that job, it will instantiate the class SendNotificationShell and execute the perform() methods with the given arguments.

And obviously, enqueue a job via shell is just for debugging only. You can enqueue job directly within you application, with pure php code. To enqueue a job, e.g. in your model

class Test extends AppModel
{
    public function afterSave()
    {
        Resque::enqueue('default', 'SendNotification', array('an@email.com', 'holy cow !'));
    }
}

As you see, you get more freedom for the arguments type, as we can pass an array, associated if you like, or the entire $this->data (though I don’t recommend it). The array will be serialize and store in the queue. You can access it (unserialized) via $this->args inside the job perform() method.

Don’t forget to restart the workers when you make any changes to you job classes.

I’m using this plugin on one of my website to defer some tasks, like sending notification about an user activity to all of its friends, updating user activity feed, logging user actions, etc …

Since the website is still in development, I don’t have the opportunity to use Resque at its full power, and can not test some areas, such as multiple workers on different redis shard, on different server, and of course, can not see how it will reacts in production with 50 jobs/minutes ….

Feedback are welcome, and please report bugs and other things on the project issues tracker