Revision: Sat, 21 Dec 2024 21:05:36 GMT

Configuring RoadRunner Queue Pipelines

Job queues have become an indispensable component in modern PHP applications, handling complex, resource-intensive tasks and delivering substantial enhancements to application performance.

One solution that harnesses the power of Go for PHP applications is RoadRunner, in conjunction with the Spiral Framework. This tutorial, will guide you through configuring queue pipelines for RoadRunner in Spiral applications, offering a high-performing, robust queue service solution.

Note
This tutorial covers the basics of the components and approaches. For more detailed information, we suggest referring to the relevant sections.

Our application will consist of two types of applications:

  1. Producer - will push jobs into a queue
  2. Consumer - will receive queued tasks and handle them

Producer

To get started with building producer application, you can easily install the default spiral/app bundle with most of the required components by running the following command:

composer create-project spiral/app my-app

During the installation process, you will be prompted to select various options with the Spiral installer, such as the application preset, whether to use Cycle ORM, which collections to use, which validator component to use, and so on. In our example we will use CLi application that will push a task into a queue from console command.

For this tutorial, we recommend choosing the options shown above:

✔ Which application preset do you want to install? > Cli
✔ Create a default application structure and demo data? > No
✔ Do you need Cycle ORM? > No
✔ Do you want to use Queue component? > Yes
✔ Do you want to use Cache component? > No
✔ Do you want to use Mailer Component? > No
✔ Do you want to use Storage component? > No
✔ Do you need RoadRunner? > Yes
✔ Do you need the RoadRunner Metrics? > No
✔ Do you need the Temporal? > No

Once the installation is complete, you need to configure RoadRunner queue pipelines, where you will push your jobs.

First of all you need to configure RoadRunner server to use jobs plugin:

yaml
.rr.yaml
amqp:
  addr: amqp://guest:guest@127.0.0.1:5672

jobs:
  consume: [ ]
  pipelines: { }

Note
You can read more about RoadRunner Jobs plugin configuration in the official documentation

As you can see we didn't specify any pipelines in our configuration. RoadRunner provides the ability to create pipelines dynamically, so we will create them later in our application.

Once the configuration is complete, you can start the server. RoadRunner uses RPC to communicate between PHP application and RoadRunner, so we need to start it before pushing jobs into a queue.

Let's check if everything work fine using the following command:

./rr serve

Configuration

The configuration of Spiral applications is accomplished through configuration files located in the app/config directory.

Let's define our first pipeline in the app/config/queue.php file:

php
app/config/queue.php
use Spiral\RoadRunner\Jobs\Queue\AMQPCreateInfo;

return [
    'default' => env('QUEUE_CONNECTION', 'roadrunner'),

    'pipelines' => [
         'default' => [
             'connector' => new AMQPCreateInfo(
                  name: 'default',
                  priority: 100,
                  queue: 'default',
             ),
             // Do not consume jobs for this pipeline on startup
             'consume' => false,
         ],
    ],
    
    'connections' => [
        'roadrunner' => [
            'driver' => 'roadrunner',
            'pipeline' => 'default',
        ],
    ],
];

Note
You can read more about roadrunner queue configuration in the Queue — RoadRunner integration section.

When you run the ./rr serve command, RoadRunner will create a pipeline with the name default and will use the roadrunner connection to push jobs into the queue by default.

Pushing jobs

Let's add some logic to it:

php
app/src/Endpoint/Console/PingCommand.php
namespace App\Endpoint\Console;

use Spiral\Console\Attribute\AsCommand;
use Spiral\Console\Command;
use Spiral\Queue\QueueInterface;

#[AsCommand(name: 'ping')]
final class PingCommand extends Command
{
    public function __invoke(QueueInterface $queue): int
    {
        $queue->push('ping', [
            'url' => 'https://spiral.dev',
        ]);

        return self::SUCCESS;
    }
}

In case if we use an array payload we can use simple serializer, like json.

Note
Read more about available serializers in the Component — Serializer section.

Let's define a default serializer in the app/config/queue.php file:

php
app/config/queue.php
return [
    // ...
    'defaultSerializer' => 'json',
];

Note
Read more about job payload serialization in the Queue — Running Jobs section.

And define default serializer in the app/config/queue.php file:

php
app/config/queue.php
return [
    // ...
    'defaultSerializer' => 'symfony-json',
];

Note
Read more about job payload serialization in the Queue — Running Jobs section.

That's it! Now you can push jobs into a queue using object payload and it will be automatically serialized and sent to a queue as a JSON string.

Let's create first a DTO class that will carry all the data we need:

php
app/src/DTO/Ping.php
namespace App\DTO;

final class Ping 
{
    public function __construct(
        public readonly string $url,
    ) {}
}

Now we can use it as a payload to push a job into a queue:

php
app/src/Endpoint/Console/PingCommand.php
namespace App\Endpoint\Console;

use Spiral\Console\Attribute\AsCommand;
use Spiral\Console\Command;
use Spiral\Queue\QueueInterface;
use App\DTO\PingDTO;

#[AsCommand(name: 'ping')]
final class PingCommand extends Command
{
    public function __invoke(QueueInterface $queue): int
    {
        $queue->push('ping', new Ping(url: 'https://spiral.dev'));

        return self::SUCCESS;
    }
}

:::

::::

Once we created a console command ping, we can push a job into a queue.

First, we need to start the RoadRunner server:

./rr serve

and then run our command:

php app.php ping

Consumer

To get started with building consumer application, you can easily install the default spiral/app bundle with most of the required components by running the following command:

composer create-project spiral/app my-consumer-app

For consumer, we need also Queue component and RoadRunner. Other components are optional and you can choose which one you need during the installation process.

✔ Which application preset do you want to install? > Cli
✔ Create a default application structure and demo data? > No
✔ Do you want to use Queue component? > Yes
✔ Do you need RoadRunner? > Yes

Once the installation is complete, you need to configure RoadRunner queue pipelines, where you will push your jobs.

First of all you need to configure RoadRunner server to use jobs plugin:

yaml
.rr.yaml
amqp:
  addr: amqp://guest:guest@127.0.0.1:5672

jobs:
  consume: [ ]
  pipelines: { }

Note
amqp section should be the same as in the producer application. Consumer and Producer should use the same AMQP server.

Configuration

Let's define our pipeline in the app/config/queue.php file:

php
app/config/queue.php
use Spiral\RoadRunner\Jobs\Queue\AMQPCreateInfo;

return [
    'default' => env('QUEUE_CONNECTION', 'roadrunner'),

    'pipelines' => [
         'default' => [
             'connector' => new AMQPCreateInfo(
                  name: 'default',
                  priority: 100,
                  queue: 'default',
             ),
             'consume' => true, // <===== Enables consuming
         ],
    ],
    
    'connections' => [
        'roadrunner' => [
            'driver' => 'roadrunner',
            'pipeline' => 'default',
        ],
    ],
];

Note
The only difference between consumer and producer configuration is that consumer should have consume option set to true. In this case, RoadRunner will automatically consume jobs from the AMQP server.

Job

When a job is going to be consumed, it will be passed to the job handler class that has all the logic to handle it.

Note
Read more about job handlers in the Queue — Job Handlers section.

Let's create our first job using scaffolder:

php app.php create:jobHandler Ping

Note
You can read more about scaffolding in the Basics — Scaffolding section.

After running this command, you will see the following output:

Declaration of 'PingJob' has been successfully written into '~/my-app/app/src/Endpoint/Job/PingJob.php'.

We've just created a job handler that will be used to handle our jobs.

Let's add some logic to it:

If you are using a JSON payload, you can use simple serializer like json to deserialize a payload received from a queue.

Let's define a default serializer in the app/config/queue.php file:

php
app/config/queue.php
return [
    // ...
    'defaultSerializer' => 'json',
];

Now let's add some logic to our job handler:

php
app/src/Endpoint/Job/PingJob.php
namespace App\Endpoint\Job;

use Psr\Log\LoggerInterface;
use Spiral\Queue\JobHandler;
use App\DTO\Ping;

final class PingJob extends JobHandler
{
    public function invoke(
        LoggerInterface $logger, 
        string $id, 
        Ping $payload, 
        array $headers,
    ): void {
        $logger->info('Ping job received', [
            'id' => $id,
            'url' => $payload->url,
            'headers' => $headers,
        ]);
    }
}

Our job handler will just log all the data received from the queue.

When we push a job into a queue using push method, we specified a task name ping in first argument. Now we need to tell consumer that if it receives a job with this task name, it should be handled by PingJob handler.

Let's connect our job handler with the task name in the app/config/queue.php file:

php
app/config/queue.php
return [
    // ...
    'registry' => [
        'handlers' => [
            'ping' => App\Endpoint\Job\PingJob::class
        ],
    ],
];

Note
You can read more about job handlers registry in the Queue — Job Handlers section.

After all these steps, we are ready to consume jobs from the queue.

Let's run RoadRunner server:

./rr serve

And push a job into a queue from producer application:

php app.php ping

Retry policy

Let's imagine that we have a job that should be retried if it fails. For example, we have a job that sends a request to a remote server. If the server is not available, we should retry this job after some time.

In this case we can use job headers and queue interceptors to implement retry policy.

Note
Read more about queue interceptors in the Queue — Interceptors section.

Let's create an interceptor that will catch all the exceptions in a job handler and try to retry it after some time:

php
app/src/Endpoint/Job/Interceptor/RetryPolicyInterceptor.php
namespace App\Endpoint\Job\Interceptor;

use Carbon\Carbon;
use Psr\Log\LoggerInterface;
use Spiral\Core\CoreInterceptorInterface;
use Spiral\Core\CoreInterface;
use Spiral\Exceptions\ExceptionReporterInterface;
use Spiral\Queue\Exception\FailException;
use Spiral\Queue\Exception\RetryException;
use Spiral\Queue\Options;

final class RetryPolicyInterceptor implements CoreInterceptorInterface
{
    public function __construct(
        private readonly LoggerInterface $logger,
        private readonly ExceptionReporterInterface $reporter,
        private readonly int $maxAttempts = 3,
        private readonly int $delayInSeconds = 5,
    ) {
    }

    public function process(string $controller, string $action, array $parameters, CoreInterface $core): mixed
    {
        try {
        
            // Try to execute a job handler
            return $core->callAction($controller, $action, $parameters);
            
        } catch (\Throwable $e) {
        
            // Report an exception
            $this->reporter->report($e);
            
            $headers = $parameters['headers'] ?? [];
            
            // Get attempts count from headers or if it the first attempt, use max attempts count
            $attempts = (int)($headers['attempts'] ?? $this->maxAttempts);
            
            // If attempts are over, throw a FailException
            if ($attempts === 0) {
                $this->logger->warning('Job handling failed: ['.$e->getMessage().']');
                
                throw new FailException($e->getMessage(), $e->getCode(), $e);
            }

            throw new RetryException(
                reason: $e->getMessage(),
                options: (new Options())->withDelay($this->delay)->withHeader('attempts', (string)($attempts - 1))
            );
        }
    }
}

Now we need to register our interceptor in the app/config/queue.php file:

php
app/config/queue.php
use App\Endpoint\Job\Interceptor\RetryPolicyInterceptor;

return [    
    // ...
    'interceptors' => [
        'consume' => [
            RetryPolicyInterceptor::class,
        ],
    ],
];

Now if our job handler fails, it will be retried after 5 seconds. After 3 attempts, the job will be marked as failed.

Want more? Unlock the Power of Advanced Workflow Orchestration

Spiral Framework provides an integration with Temporal, a powerful workflow orchestration tool that allows you to build complex workflows. Now, if you’re familiar with queue services like RoadRunner, you’re in for a treat because Temporal IO takes workflow management to a whole new level. It’s like having superpowers for handling complex workflows in a simple and elegant manner.

In the world of PHP development, we often find ourselves juggling various tasks and processes that need to be executed in a specific order. That’s where Temporal shines. It allows us to write expressive and powerful workflows in a way that’s easy to understand and maintain.

Let’s dive into an example that showcases the beauty of Temporal

Imagine you have a task of handling user subscriptions on a monthly basis. With Temporal, it becomes a straightforward process. Here’s a simple example to illustrate it:

php
<?php
/**
 * This file is part of Temporal package.
 *
 * For the full copyright and license information, please view the LICENSE
 * file that was distributed with this source code.
 */
declare(strict_types=1);

namespace Temporal\Samples\Subscription;

use Carbon\CarbonInterval;
use Temporal\Activity\ActivityOptions;
use Temporal\Exception\Failure\CanceledFailure;
use Temporal\Workflow;

/**
 * Demonstrates a long-running process to represent a user subscription workflow.
 */
class SubscriptionWorkflow implements SubscriptionWorkflowInterface
{
    private $account;

    // Workflow logic goes here...

    public function subscribe(string $userID)
    {
        yield $this->account->sendWelcomeEmail($userID);

        try {
            $trialPeriod = true;
            while (true) {
                // Lower the period duration to observe workflow behavior
                yield Workflow::timer(CarbonInterval::days(30));

                if ($trialPeriod) {
                    yield $this->account->sendEndOfTrialEmail($userID);
                    $trialPeriod = false;
                    continue;
                }

                yield $this->account->chargeMonthlyFee($userID);
                yield $this->account->sendMonthlyChargeEmail($userID);
            }
        } catch (CanceledFailure $e) {
            yield Workflow::asyncDetached(
                function () use ($userID) {
                    yield $this->account->processSubscriptionCancellation($userID);
                    yield $this->account->sendSorryToSeeYouGoEmail($userID);
                }
            );
        }
    }
}

In this example, the subscribe method represents the workflow logic for managing monthly subscriptions. The magic lies in the Workflow::timer function, which allows you to schedule a specific duration for each iteration of the loop.

By setting the timer to CarbonInterval::months(1), you can ensure that the subscription tasks are executed every month. Temporal takes care of the scheduling and coordination, freeing you from the hassle of managing it manually.

Moreover, Temporal provides built-in fault tolerance and scalability. If an exception occurs, such as a CanceledFailure indicating a subscription cancellation, you can handle it gracefully within the workflow.

With Temporal, managing complex periodic workflows like monthly subscriptions becomes a breeze.