Adding real-time processing to QueueWorker plugins
Projects no longer need to rely on unpredictable processing time frames. The SM project can intercept legacy Drupal @QueueWorker
items and insert them into the Symfony Messenger message bus, effectively giving existing core and contrib queue workers jobs real-time processing capabilities.
by
daniel.phin
/ 7 February 2024
This post is part 5 in a series about Symfony Messenger.
- Introducing Symfony Messenger integrations with Drupal
- Symfony Messenger’ message and message handlers, and comparison with @QueueWorker
- Real-time: Symfony Messenger’ Consume command and prioritised messages
- Automatic message scheduling and replacing hook_cron
- Adding real-time processing to QueueWorker plugins
- Making Symfony Mailer asynchronous: integration with Symfony Messenger
- Displaying notifications when Symfony Messenger messages are processed
- Future of Symfony Messenger in Drupal
QueueWorker plugins
@QueueWorker
plugin implementations require no modifications, including the method of dispatch, data payload, or the processItem
. The data payload must of course be serialisable. Fortunately, most QueueWorker
plugins already comply since their data is serialised and stored to the queue
table. As always, avoid adding complex objects like Drupal entities to payloads.
Runners
With queue interception, the sm
command can be solely relied upon. Legacy runners such as Drupal web cron, request termination cron (automated_cron.module
), and Drush queue:run
will be rendered inoperable since they will no longer have anything to process. Consider decommissioning legacy runners when deploying queue interception.
Setup
Queue interception is a part of the primary SM module. Adding a single line in settings.php
is the only action required to to enabling this feature:
$settings['queue_default'] = \Drupal\sm\QueueInterceptor\SmLegacyQueueFactory::class;
SM module will need to be fully installed before this line is added. Consider wrapping the line in a class_exists(SmLegacyQueueFactory::class)
to enable in a single deployment.
Existing per-queue backends
Setup may be more complex if projects are utilising per-queue backends or anything other than the default database backend for queues, such as Redis. In that case, carefully evaluate whether to convert all or specific queues to use Symfony Messenger.
Whether per-queue backends are utilised can be determined by looking for queue_service_
or queue_reliable_service_
prefixed items in settings.php
.
Routing
@QueueWorker
jobs are converted to \Drupal\sm\QueueInterceptor\SmLegacyDrupalQueueItem
messages in the backend. Knowing this class name allows you to configure transport routing. If routing for this message is not explicitly configured, it will naturally fall back to the default transport, or execute synchronously if there is no routing configuration.
Running the jobs
As usual, when a transport is configured, all you need to do is run sm messenger:consume
to execute the tasks. The worker will either listen or poll for messages, and execute them in a very short amount of time after they are dispatched, in a dedicated thread. More information on the worker can be found in post 3 of this series.
The next post covers how Drupal emails can be dispatched to messages, so the web thread can execute faster.
Tagged