Using Drupal's Queue API

One of the additions to Drupal 7 that went in without a lot of fanfare was the Queue API. The Queue API is designed to manage tasks. It's very similar to the way we make to-do lists for household chores. If you have a hundred things to do in a day, you need to write them down and cross them off the list as you do them. Otherwise you lose track of what's left. In Drupal terms, these tasks would be data processing -- you have some data that needs to be used to do something else. The most obvious use case for the Queue API is in aggregating data (feeds), where you have a huge batch of items that need to be turned into Drupal content.

Before the Queue API, you could either keep this list of items in memory, or you could write them to a cache. The problem with keeping them in memory is that if you have a timeout or hit the memory limit, all that data is gone and you need to start from scratch (assuming you can still access the data). The problem with caching them is that they are still in a huge group, so if the list is really big, you're wasting a huge amount of memory reading and writing the entire thing on every request.

The Queue API is an attempt to address this problem by providing a simple, reliable way to keep track of these huge lists of tasks without requiring you to know anything about the whole group. You just prepare your data, put it in one item at a time, and ask for items back one at a time to process. When you're done with a single item, you just report back that you've finished, and the item is removed from the queue (checked off the list). Easy-peasy. So you'd use the Queue API to overcome memory/time limit problems, and to simplify your development by dealing with single items instead of huge lists.

Sample Code

For examples, I'll be using two dummy functions. First, the source of our data.

<?php
//Simulates a bunch of data that could come from a remote source.
function remoteData() {
  return array(
    array(
'title' => 'Sample Item 1', 'type' => 'article'),
    array(
'title' => 'Sample Item 2', 'type' => 'article'),
    array(
'title' => 'Sample Item 3', 'type' => 'article'),
    array(
'title' => 'Sample Item 4', 'type' => 'article'),
    array(
'title' => 'Sample Item 5', 'type' => 'article'),
  );
}
?>

The second dummy function just converts a single item from our array above to a node and saves it.

<?php
//Very basic example, just convert the items into nodes.
function saveRemoteItem($item) {
 
$node = (object) $item->data;
 
node_save($node);
  return !empty(
$node->nid);
}
?>

So let's see what some sample code looks like for actually using the Queue API.

<?php
//Get the queue so we can add to it.  Use a
//descriptive name. It's ok if it doesn't exist yet.
$queue = DrupalQueue::get('myQueue');

//Push all the items into the queue, one at a time.
//You can push any data in with (arrays, objects, etc).
foreach(remoteData() as $item) {
 
$queue->createItem($item);
}

//Pull items out one at a time.
while($item = $queue->claimItem()) {
 
 
//Try saving the data.
 
if(saveRemoteItem($item->data)) {
   
//Good, we succeeded.  Delete the item as it is no longer needed.
   
$queue->deleteItem($item);
  }
  else {
   
//You might want to log to watchdog and delete the item
    //anyway.  We'll just ignore the failure for our example.
 
}
}
?>

Cool, but the only benefit we've seen here is the persistence of data, which we're actually not taking advantage of here. Even if we lose access to the incoming items, we have them stashed away in our queue so we could process them separately if need be. Let's go for something a little more useful (and paradoxically, simpler), like setting up a queue to do the same work we did above each time cron is run.

<?php
/**
 * Implements hook_cron().
 */
function mymodule_cron() {
 
$queue = DrupalQueue::get('myCronQueue');
 
  foreach(
remoteData() as $item) {
   
$queue->createItem($item);
  }
}

/**
 * Implements hook_cron_queue_info().
 */
function mymodule_cron_queue_info() {
 
$queues = array();
 
$queues['myCronQueue'] = array(
   
'worker callback' => 'saveRemoteData', //function to call for each item
   
'time' => 60, //seconds to spend working on the queue
 
);
  return
$queues;
}
?>

It looks like we're missing the actually working part of our cron, but by using hook_cron_queue_info, we've told system.module that we have a queue that should be worked on during cron (for a maximum of 60 seconds each run), and what to do with the data. So all we have to do is use our own hook_cron to push data into the queue, and it will be pulled out automatically and passed to our callback function. Told you it was simple.

Advanced Stuff

There are some important features of the Queue API I haven't touched on yet. The first is that the queue is (by default) reliable. This means that while you're working on a single item, it is "locked" and won't be picked up by anyone else. This also means that the queue is safe for running multiple cron jobs at once. The second thing is that the implementation of the queue is completely decoupled from your code -- if a site gets extremely busy and the queue turns out to be a bottleneck, it's relatively easy to switch from a database backed queue to a memory only queue or an external queue such as ActiveMQ. Last but not least, the idea of queueing work is new to Drupal, but has a long history within other parts of computing. While the cron queue is a particularly obvious use for it, it seems like we are only scratching the surface of what can be done with this system, particularly with regards to delegating work PHP is not very good at out to a daemonized process on the server (or even remotely for that matter).