Spiral has full integration with RoadRunner jobs plugin, which provides a unified queueing API for a variety of queue brokers such as:
Note
For more information on supported brokers, please visit the official site.
To enable the integration with RoadRunner, Spiral provides built-in support through the spiral/roadrunner-bridge package.
Warning
Actual version of thespiral/roadrunner-bridge
package is3.0
. If you are using older version of the package, some of the features described in this document may not be available.
To get started, you need to install Roadrunner bridge package. Once installed,
add the Spiral\RoadRunnerBridge\Bootloader\QueueBootloader
to the list of bootloaders in your Kernel class:
public function defineBootloaders(): array
{
return [
// ...
\Spiral\RoadRunnerBridge\Bootloader\QueueBootloader::class,
// ...
];
}
Read more about bootloaders in the Framework — Bootloaders section.
By doing this, an additional queue driver named roadrunner
will be registered automatically. You can then add a new
queue connection for RoadRunner in your app/config/queue.php
configuration file.
RoadRunner provides two ways to declare pipelines:
.rr.yaml
configuration file to declare pipelines and brokers.app/config/queue.php
..rr.yaml
This is the most common way to declare pipelines. But in this way, you can only declare static pipelines.
Note
If you need to declare dynamic pipelines, you should consider using the second way.
Here's a simple example:
amqp:
addr: amqp://guest:guest@localhost:5672
jobs:
consume: [ "in-memory", "high-priority" ]
pipelines:
in-memory:
driver: memory
config:
priority: 10
high-priority:
driver: amqp
config:
priority: 1
In this case your app/config/queue.php
configuration file should look like this:
return [
'default' => env('QUEUE_CONNECTION', 'roadrunner'),
'connections' => [
'roadrunner' => [
'driver' => 'roadrunner',
'pipeline' => 'in-memory', // Pipeline name from .rr.yaml
],
],
];
This method is useful when you need to declare dynamic pipelines or if you just want to keep all your configuration in one place.
There are some benefits to this approach:
You can create dynamic pipelines to distribute workload across multiple queue brokers or to balance the load across multiple workers. This can be useful in large-scale applications where job processing needs to be distributed across multiple servers or instances.
Here's a simple example:
use Spiral\RoadRunner\Jobs\Queue\MemoryCreateInfo;
use Spiral\RoadRunner\Jobs\Queue\AMQPCreateInfo;
return [
'default' => env('QUEUE_CONNECTION', 'rr-amqp'),
'pipelines' => [
'high-priority' => [
'connector' => new AMQPCreateInfo(
name: 'high-priority',
priority: 1,
queue: 'default',
),
'consume' => true, // Consume jobs for this pipeline on startup
],
'low-priority' => [
'connector' => new AMQPCreateInfo(
name: 'low-priority',
priority: 100,
queue: 'default',
),
'consume' => false, // Do not consume jobs for this pipeline on startup
],
'in-memory' => [
'connector' => new MemoryCreateInfo(name: 'local'),
'consume' => true,
]
],
'connections' => [
// ...
'rr-amqp' => [
'driver' => 'roadrunner',
'pipeline' => 'low-priority',
],
'rr-memory' => [
'driver' => 'roadrunner',
'pipeline' => 'in-memory',
],
],
];
Warning
All the pipelines defined with'consume' => true
will be initialized during application bootstrapping. This means that all the jobs pushed to these pipelines will be consumed immediately. All the pipelines with'consume' => false
will be initialized only when you push a job to them.
In some cases, you may need to create a pipeline with custom default options or in cases where the options are mandatory. For example, you may want to create a pipeline for Kafka broker, which requires additional options to be set.
use Spiral\RoadRunner\Jobs\Queue\KafkaCreateInfo;
use Spiral\RoadRunner\Jobs\KafkaOptions;
'pipelines' => [
'event-bus' => [
'connector' => new KafkaCreateInfo(name: 'kafka', topic: 'events', ...),
'options' => new KafkaOptions(topic: 'events', ...),
'consume' => true,
]
],
By default, when you push a job to the queue using the roadrunner
driver, the job will be pushed to the default
pipeline defined in your app/config/queue.php
configuration file.
'rr-amqp' => [
'driver' => 'roadrunner',
'pipeline' => 'low-priority',
],
If you want to push a job to a specific pipeline, you can specify the pipeline name
using Spiral\Queue\Options::onQueue
method.
use App\Endpoint\Job\SampleJob;
use Spiral\Queue\QueueInterface;
use Spiral\Queue\Options;
public function createJob(QueueInterface $queue): void
{
$queue->push(
SampleJob::class,
['value' => 123],
Options::onQueue('high-priority')
);
}
Spiral queue component provides Spiral\Queue\Options
class, but in some cases you may need to use options specific
to the roadrunner driver. In this case you can use options that implement Spiral\RoadRunner\Jobs\OptionsInterface
.
For example, if you want to pass additional options to Kafka broker, you can use Spiral\RoadRunner\Jobs\KafkaOptions
class.
use App\Endpoint\Job\SampleJob;
use Spiral\Queue\QueueInterface;
use Spiral\RoadRunner\Jobs\KafkaOptions;
public function createJob(QueueInterface $queue): void
{
$queue->push(
SampleJob::class,
['value' => 123],
new KafkaOptions(topic: 'events')
);
}
In some cases, you may need to create a custom driver for a specific queue broker.
For example, you may want to create a driver for specific queue broker.
namespace App\Infrastructure\Queue;
use Spiral\Queue\OptionsInterface;
use Spiral\Queue\QueueInterface;
use Spiral\RoadRunner\Jobs\JobsInterface;
use Spiral\RoadRunner\Jobs\Queue\KafkaCreateInfo;
use Spiral\RoadRunner\Jobs\KafkaOptions;
final class KafkaQueue imlements QueueInterface
{
private readonly QueueInterface $queue;
public function __construct(JobsInterface $jobs, string $name, string $topic)
{
$this->queue = $jobs->create(
new KafkaCreateInfo(
name: $name,
topic: $topic
),
new KafkaOptions($topic)
);
}
public function push(string $name, array $payload = [], OptionsInterface|KafkaOptions $options = null): string
{
return $this->queue->push($name, $payload, $options)->getId();
}
}
That's all. Now you can use your driver:
'connections' => [
'mail' => [
'driver' => \App\Infrastructure\Queue\KafkaQueue::class,
'name' => 'mail',
'topic' => 'mail',
],
],
See more
Read more about creating custom drivers in the Queue — Getting started section.