Using Drupal 8 Queues To Poll Orders From External APIs

Drupal 8 is a powerful CMS which offers so many things, from a fantastic mobile experience to seamless content management. Under the hood, it uses Symfony components to make life as a developer a lot easier, which is quite a jump from Drupal 7’s procedural methods.

I had to quickly learn object-oriented programming to get in line with Drupal 8, and have managed to utilize the power it offers to create a system which integrates with a host of third-party APIs, all running nicely using Drupal’s amazing queue API.

This article presumes you have knowledge of PHP, Drupal 8, creating custom modules in Drupal 8 and the concept of restful APIs.

Background
We needed to build a system which would constantly poll various APIs to grab order information in a time-updated order, past to present. In the context of this post, I will be discussing Amazon MWS in particular.

Our main issue was that on our old system, we were constantly requesting data from Amazon and because we didn’t have a queuing system, we forced the scripts to “sleep” to avoid being throttled (you can only request 100 orders at a time).

Amazon MWS, like a lot of other APIs, uses a variation of the leaky bucket algorithm, which means you have to find a balance between the maximum request quota and the restore rate otherwise you will receive errors from them.

The issue with using PHP’s “sleep” is it will eventually lead to issues with maximum execution time and hold up processes, should you request a lot of data at once. So, the natural solution: use a queuing system. So how do we do this with Drupal 8?

1. Create your own module
Create your module skeleton in Drupal 8. I use Drupal console to do this.

2. Use a third-party Amazon MWS SDK or write your own
You can either use Amazon MWS’s SDK or write your own API integration. I prefer doing the latter as it gives me more understanding and control over my code. I tend to write these in separate modules in Drupal as they serve different purposes.

3. Create your queue plugin
The Drupal 8 queuing system works on the basis that you send items to it to be processed. In order to harness the power of the queues and create your own queue, you need to extend the QueueWorkerBase plugin which makes this all possible.

To do this, make sure you have the “src” directory in the root of your module. You then need to create the folder structure “src/Plugin/QueueWorker”, and inside here you create your class (make sure the filename has the same name). In this case, we’ll call it AmazonOrdersQueueWorker.

Make sure it is namespaced correctly (ie in this instance it’s Drupal\my_module\Plugin\QueueWorker – after my_module, you want any folders below “src” here). Give this file the skeleton of a QueueWorkerBase, and in the annotation give it a unique ID and a relevant title. We’ll come back to this in a little while.

namespace Drupal\my_module\Plugin\QueueWorker;
use Drupal\Core\Queue\QueueWorkerBase;

/**
 * Processes tasks for Amazon Orders queue.
 *
 * @QueueWorker(
 *   id = "get_orders_from_amazon",
 *   title = @Translation("Queue worker for importing Amazon items."),
 * )
 */
class AmazonOrdersQueueWorker extends QueueWorkerBase {

  /**
   * {@inheritdoc}
   */
  public function processItem($item) {

  }

}

4. Create your own service to fetch data
You’re better off creating your own service if you want other modules to be able to kick off your functions. I like to put them in a folder called “Service” under the “src” directory, just so it remains organized (you don’t have to do this – I just do this for my own sanity!). Create your new class and again, make sure it is namespaced correctly.

Our service is likely going to need access to other services. The best way to do this is to use dependency injection so you don’t end up instantiating classes that don’t need to be recreated. One way to have access to Drupal’s container (and thus dependency injection) is by implementing ContainerInjectionInterface on your class.

use Symfony\Component\DependencyInjection\ContainerInterface;

class AmazonOrderMapper implements ContainerInjectionInterface {

Then, you can use the method “create” to pull in all the services you need access to, which then pass as arguments to the constructor (for a list of services, see here).

/**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container) {
    return new static(
      $container->get('database'),
      $container->get('queue'),
      $container->get('state'),
      $container->get('logger.factory')
    );
  }
  • Database connection – to communicate with the database (I tend to use custom database queries a lot rather than entity queries, you can do either but you’ll need different services accordingly)
  • Queue – The queue factory you need to interact with queues in Drupal 8
  • State – to assign various states to make sure we don’t re-run our function multiple times
  • Logger factory – to log errors/notices

You then need to mirror these in the constructor. Notice that with the queue factory, you can get a particular queue by its’ ID. Because we’re only interacting with one queue here, you can just grab your queue by the ID and store this rather than the QueueFactory itself.

  /**
   * Constructs a new OrderMapper object.
   *
   * @param \Drupal\Core\Database\Connection $connection
   *   The database query object.
   * @param \Drupal\Core\Queue\QueueFactory $queueFactory
   *   The queue factory object.
   * @param \Drupal\Core\State\State $state
   *   State object.
   * @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $loggerFactory
   *   The logger factory object.
   */
  public function __construct(Connection $connection, QueueFactory $queueFactory, State $state, LoggerChannelFactoryInterface $loggerFactory) {
    $this->connection = $connection;
    $this->queue = $queueFactory->get('get_orders_from_amazon', TRUE);
    $this->state = $state;
    $this->logger = $loggerFactory->get('my_module');
  }

Finally, to make your service accessible elsewhere and so Drupal 8 knows which services to inject, you need to define a my_module.services.yml file in the root of your module, and define the services in the same order you have as above as arguments:

services:
  my_module.order_mapper:
    class: Drupal\my_module\Service\AmazonOrderMapper
    arguments: ['@database', '@queue', '@state', '@logger.factory']

Great! You’ve got your service all ready.

5. Create your function to get orders
So this is an example of how I get orders and put them into a queue. Again, I’m using my own custom classes to interact with Amazon MWS.

/**
   * Get orders from Amazon updated between various dates.
   * @param bool $withItems
   *    When set to true, this will fetch order items along with orders.
   * @throws \Exception
   */
  public function getLastUpdatedOrders($withItems = true) {
    if ($this->canRequestMoreOrders()) {
      // Grab the order with the latest updated date from the database -
      // this is how we know when we last finished requesting.
      if ($date = $this->getLastSavedOrderDate('Amazon')) {
        $updatedAfter = $date;
      } else {
        $updatedAfter = new \DateTime('1st October 2017');
        $updatedAfter->setTime(0, 0, 0);
      }
      // If updated before isn't set, set it to two minutes before now (two mins before required by Amazon).
      $updatedBefore = new \DateTime('now - 2 minutes');
      // Create request parameters, as we will send this to the queue to
      // build the request.
      $requestParameters = new ListOrdersRequestParameters();
      // Set parameters.
      $requestParameters->setWithItems($withItems);
      $requestParameters->setLastUpdatedAfter($updatedAfter);
      $requestParameters->setLastUpdatedBefore($updatedBefore);
      $requestParameters->setOrderStatus(['Shipped']);
      $requestParameters->setMarketplace(['UK', 'DE', 'FR', 'IT', 'ES']);
      // Setting max results per page may reduce the throttling issues.
      $requestParameters->setMaxResultsPerPage(100);
      // Get the queue implementation for get_orders_from_amazon queue.
      $this->queue->createItem($requestParameters);
      $this->setLatestOrdersState('Pending');
      $this->logger->notice('Queued Amazon request for dates: %updatedAfter to %updatedBefore.', ['%updatedAfter' => $updatedAfter->format('Y-m-d H:i:s'), '%updatedBefore' => $updatedBefore->format('Y-m-d H:i:s')]);
    } else {
      $this->logger->notice('Orders endpoint service is not operating.');
    }
  }

  /**
   * Function to decide whether to request more orders or not.
   * @return bool
   */
  private function canRequestMoreOrders() {
    // Check to see how many items are in the queue.
    $numberOfQueueItems = $this->queue->numberOfItems();
    if ($numberOfQueueItems == 0) {
      $this->state->set('my_module_latest_orders.status', 'Empty');
      return true;
    } elseif ($numberOfQueueItems <= 2) {
      if ($status = $this->state->get('my_module_latest_orders.status')) {
        if ($status !== 'Pending' && $status !== 'Processing') {
          $this->logger->notice('Status is: "%status".', array('%status' => $status));
          return true;
        }
        $this->logger->notice('Skipping request for latest orders: Previous request status is: "%status".', array('%status' => $status));
      }
    } else {
      $this->logger->notice('Skipping request for more orders: More than 2 items in the queue');
    }
    return false;
  }

  /**
   * Set/update variables for the latest updated Amazon orders.
   * 
   * @param string $status
   *    The state the function is in.
   */
  public function setLatestOrdersState($status) {
    $this->state->set('my_module_latest_orders.status', $status);
  }

So what we are doing above is we are basically checking if we can get more orders. The reason we do this is that a Cron job is firing this function, and we don’t want to keep making the same call over and over if it’s not finished as we may end up getting a lot of pages back from Amazon depending on how far back we are requesting. We allow a maximum of 2 items in the queue as we don’t want to overlap.

We then start building the parameters we want for the request. We’re checking to see if there is the latest order with an updated timestamp, and if so we’ll use that as the “updatedAfter” – if not, we have a preset date we want to get orders after. We then send the request parameters to the queue as an item.

6. Create your function to process items in the queue
Going back to our queue class, we can now build the function that processes the items. Again, I’m going to want to utilize services, and because this is a factory plugin, we need to get the QueueWorkerBase to implement ContainerFactoryPluginInterface so we have access to the container.

use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;

/**
 * Processes tasks for Amazon Orders queue.
 *
 * @QueueWorker(
 *   id = "get_orders_from_amazon",
 *   title = @Translation("Queue worker for importing Amazon items."),
 * )
 */
class AmazonOrdersQueueWorker extends QueueWorkerBase implements ContainerFactoryPluginInterface {

We’ll then inject our relevant services:

/**
   * Constructs a new AmazonOrdersQueueWorker object.
   *
   * @param array $configuration
   *   A configuration array containing information about the plugin instance.
   * @param string $pluginId
   *   The plugin_id for the plugin instance.
   * @param array $pluginDefinition
   *   The plugin implementation definition.
   * @param \Drupal\Core\Queue\QueueInterface $getOrdersQueue
   *   The get orders queue object.
   * @param \Drupal\my_module\Service\AmazonOrderMapper $orderMapper
   *   The order mapper service.
   * @param \GuzzleHttp\ClientInterface
   *   Guzzle HTTP Client.
   * @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $loggerFactory
   *   The logger factory object.
   */
  public function __construct(array $configuration, $pluginId, array $pluginDefinition, QueueInterface $getOrdersQueue, AmazonOrderMapper $orderMapper, ClientInterface $client, LoggerChannelFactoryInterface $loggerFactory) {
    parent::__construct($configuration, $pluginId, $pluginDefinition);
    $this->getOrdersQueue = $getOrdersQueue;
    $this->orderMapper = $orderMapper;
    $this->client = $client;
    $this->logger = $loggerFactory->get('my_module');
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container, array $configuration, $pluginId, $pluginDefinition) {
    return new static(
      $configuration,
      $pluginId,
      $pluginDefinition,
      $container->get('queue')->get('get_orders_from_amazon', TRUE),
      $container->get('my_module.order_mapper'),
      $container->get('http_client'),
      $container->get('logger.factory')
    );
  }

You’ll see that we require extra variables like $configuration here – this is because the QueueWorkerBase needs these to work.  After you redeclare these, you can then add the extra services similarly to how we did previously. You’ll see here we’re including the service we created a minute ago, and also a Guzzle HTTP Client which is used by our request class.

Now the fun bit: getting the orders! Inside the processItem() function, we will build the function which gets the orders.

/**
   * {@inheritdoc}
   */
  public function processItem($item) {
    $parameters = $item;
    // Create a new ListOrdersRequest and pass in the client.
    $ordersRequest = new ListOrdersRequest($this->client);
    // Set the parameters.
    $ordersRequest->setParameters($parameters);
    $serviceRequest = new GetServiceStatusRequest($this->client, 'Orders');
    $lastUpdatedAfter = $ordersRequest->parameters->getLastUpdatedAfter();
    $lastUpdatedBefore = $ordersRequest->parameters->getLastUpdatedBefore();
    // Make sure the "Orders" Amazon MWS endpoint is operating.
    if ($serviceRequest->isServiceOperating()) {
      try {
        $orders = $ordersRequest->getOrders();
        $this->orderMapper->setLatestOrdersState('Processing');
      } catch (\Exception $e) {
        throw new SuspendQueueException(sprintf('Can't request orders. Error: %e', ['%e' => $e->getMessage()]));
      }
    } else {
      throw new RequeueException('Orders service not operating.');
    }
    // If we manage to get orders, save them (or create a new queue and queue them
    // to be saved).
    if (!empty($orders)) {
      $this->orderMapper->saveOrders($orders);
    }
    // See if there's is a next token for the next 100 orders -
    // this can be adapted to other APIs, such as checking for a
    // next page and passing it to the next request.
    if ($ordersRequest->parameters->hasNextToken()) {
      // If there are more orders, create a new queue item.
      $this->getOrdersQueue->createItem($ordersRequest->getParameters());
      $this->logger->notice('Queued another Amazon request for dates: %updatedAfter to %updatedBefore.', ['%updatedAfter' => $lastUpdatedAfter->format('Y-m-d H:i:s'), '%updatedBefore' => $lastUpdatedBefore->format('Y-m-d H:i:s')]);
    } else {
      $this->orderMapper->setLatestOrdersState('Complete');
    }
  }

As you can see, it won’t change the status to “Complete” until it’s finished and there are no more next tokens. As long as this status isn’t in “Pending” or “Processing”, the function that originally fires to get the latest orders can run again.

7. Decide how often you want your queue to be processed
In the original queue, if you want it to run via Cron, you can set the queue to process items on a Cron job by defining time in the annotation:

/**
 * Processes tasks for Amazon Orders queue.
 *
 * @QueueWorker(
 *   id = "get_orders_from_amazon",
 *   title = @Translation("Queue worker for importing Amazon items."),
 *   cron = {"time" = 2}
 * )
 */

So the above will process the queue every 2 minutes. However, Amazon MWS give you the restore rate and request quota for each endpoint, so we can use this to create a “wait time” to make sure you’re not requesting too often. You can create a service that uses states and queue interactions to only process queue items when enough time has passed after the last run, and process the queue externally.

/**
   * Constructs a new ProcessQueues object.
   * 
   * @param \Drupal\Core\Queue\QueueFactory $queueFactory
   *   The queue factory object.
   * @param \Drupal\Core\Queue\QueueWorkerManager $queueWorker
   *   The queue worker object.
   * @param \Drupal\Core\State\State $state
   *   The state object.
   * @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $loggerFactory
   *   The logger factory object.
   * @param \Drupal\Component\Datetime\TimeInterface $time
   *   The time service.
   */
  public function __construct(QueueFactory $queueFactory, QueueWorkerManager $queueWorker, State $state, LoggerChannelFactoryInterface $loggerFactory, TimeInterface $time) {
    $this->queueFactory = $queueFactory;
    $this->queueWorker = $queueWorker;
    $this->state = $state;
    $this->logger = $loggerFactory->get('my_module');
    $this->time = $time;
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container) {
    return new static(
      $container->get('queue'),
      $container->get('plugin.manager.queue_worker'),
      $container->get('state'),
      $container->get('logger.factory'),
      $container->get('datetime.time')
    );
  }

  /**
   * Process queues with a wait time.
   *
   * @param $queueId
   *   The queue ID.
   * @param $restoreRate
   *   The restore rate in seconds.
   * @param $requestQuota
   *   The request quota.
   * @throws \Exception
   */
  public function processQueueWithWaitTime($queueId, $restoreRate, $requestQuota) {
    // Get the queue if it exists.
    if ($queue = $this->queueFactory->get($queueId)) {
      $queueWorker = $this->queueWorker->createInstance($queueId);
      // Get the current request time.
      $requestTime = $this->time->getRequestTime();
      // If there is more than one item, and we can claim an item in the queue, then
      // try to process it.
      if ($queue->numberOfItems() > 0 && $item = $queue->claimItem(3600)) {
        try {
          // Get the last ran variable.
          $lastRun = $this->state->get("my_module_{$queueId}.last_run", 0);
          // Calculate the wait time (add 5 seconds to be safe).
          $waitTime = round($restoreRate / $requestQuota, 2, PHP_ROUND_HALF_UP) + 5;
          // See if the wait time has passed since the last run
          if ($lastRun == 0 || ($requestTime - $lastRun) >= $waitTime) {
            // If the wait time has passed, process the item.
            $queueWorker->processItem($item->data);
            $queue->deleteItem($item);
            // Set the last ran variable.
            $this->state->set("my_module_{$queueId}.last_run", $requestTime);
          } else {
            // If not enough time has passed, release the item to be processed next time.
            $queue->releaseItem($item);
          }
        // Some general exception handling below (and deciding whether to release
        // or delete the item depending on which exception is thrown).
        } catch (RequeueException $e) {
          // If we throw a RequeueException, we just want to release the item.
          $this->logger->notice('"%queueId" queue requeuing item, error: %error', array('%queueId' => $queueId, '%error' => $e->getMessage()));
          $queue->releaseItem($item);
        } catch (SuspendQueueException $e) {
          // If we have thrown a SuspendQueueException, it's likely we want to delete the item.
          $this->logger->error('"%queueId" queue deleting item, error: %error', array('%queueId' => $queueId, '%error' => $e->getMessage()));
          $queue->deleteItem($item);
        } catch (ServerException $e) {
          // A server exception might be that Amazon MWS is struggling, try to release the item.
          $this->logger->error('Guzzle exception. Requeuing item', array('%queueId' => $queueId));
          $queue->releaseItem($item);
        } catch (\Exception $e) {
          // This is an exception we haven't prepared for - delete the item.
          $this->logger->error('"%queueId" queue deleting item, error: %error', array('%queueId' => $queueId, '%error' => $e->getMessage()));
          $queue->deleteItem($item);
        }
      }
    } else {
      // The queue does not exist.
      $this->logger->error('Could not find queue ID: %queue', array('%queue' => $queueId));
    }
  }

So, instead of using the in-built Cron functionality on the queue worker, we can just run the above function on a Cron job and it does a similar thing – it just gives us a bit more control on what we want to do.

Again, to use the above as a service, just add it to your my_module.services.yml.

my_module.process_queues:
    class: Drupal\my_module\Service\ProcessQueues
    arguments: ['@queue', '@plugin.manager.queue_worker', '@state', '@logger.factory']

8. Some final functions and Cron jobs
To get it all running, we set up two functions (I’ve done this in the .module file, so we have to call services statically as we cannot get access to the container here). You then need to get them firing by configuring crontabs for them (I use Ultimate Cron).

The first one is the one that will request the latest orders (using our order_mapper service), which I have operating every half an hour.

/**
 * Get the last updated orders.
 */
function my_module_get_last_updated_orders_callback() {
  $service = \Drupal::service('my_module.order_mapper');
  $service->getLastUpdatedOrders(true);
}

And the second one is the function to process the queue with a wait time (using our process_queues service), which runs every 2 minutes and won’t do anything if there’s nothing in the queue (it runs so often as the above function could go on for hours depending on how long ago you’re requesting orders from).

/**
 * Calls process Amazon order requests function.
 */
function my_module_request_orders_callback() {
  $service = \Drupal::service('my_module.process_queues');
  $service->processQueueWithWaitTime('get_orders_from_amazon', 60, 6);
}

9. Let the system do all the work
And there we go! A fairly robust way to poll orders from an API and not get throttled constantly. The queuing system happily runs away in the background, and you don’t have to do anything. So far, I haven’t found many issues with this method, and it grabs the latest orders on a regular basis.

10. To wrap it up…
A lot of things in this article are flattened down – for example, I tend to store restore rates and request quotas in my Amazon MWS classes and pull wait times out of there, and it’s good to split things out into smaller classes that serve single purposes.

Also, when doing the ListOrdersRequest, in our system we do a ListOrderItemsRequest inside of that – which does fallback to using sleep() with the wait time if it needs to, but because we don’t tend to have orders that have a ridiculous amount of items in this works fine for us. So you’ll need to think of this sort of thing when building your modules.

However, hopefully, this article gets across the concepts of continually grabbing data from third-party APIs. Queues, services, Cron jobs and wait times – and voila! A system that rarely complains, or gets complained at.

I hope you find this article useful and manage to adapt the example code to your needs. Below is an example of how the module could be set out in terms of code structure. Until next time, Happy coding! @littlepixiez

file directory screenshot

SHARE ON

The Author: Tawny Bartlett

Using Drupal 8 Queues To Poll Orders From External APIs

You May Also Like

Leave a Reply

Your email address will not be published.