mirror of
https://github.com/symfony/symfony-docs.git
synced 2026-03-23 16:22:10 +01:00
This PR was merged into the 7.3 branch.
Discussion
----------
[Messenger] Document TransportMessageIdStamp support in AMQP transport
Fixes #20659
Commits
-------
d36326df9 [Messenger] Document TransportMessageIdStamp support in AMQP transport
3831 lines
141 KiB
ReStructuredText
3831 lines
141 KiB
ReStructuredText
Messenger: Sync & Queued Message Handling
|
|
=========================================
|
|
|
|
Messenger provides a message bus with the ability to send messages and then
|
|
handle them immediately in your application or send them through transports
|
|
(e.g. queues) to be handled later. To learn more about it, read the
|
|
:doc:`Messenger component docs </components/messenger>`.
|
|
|
|
Installation
|
|
------------
|
|
|
|
In applications using :ref:`Symfony Flex <symfony-flex>`, run this command to
|
|
install messenger:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ composer require symfony/messenger
|
|
|
|
Creating a Message & Handler
|
|
----------------------------
|
|
|
|
Messenger centers around two different classes that you'll create: (1) a message
|
|
class that holds data and (2) a handler(s) class that will be called when that
|
|
message is dispatched. The handler class will read the message class and perform
|
|
one or more tasks.
|
|
|
|
There are no specific requirements for a message class, except that it can be
|
|
serialized::
|
|
|
|
// src/Message/SmsNotification.php
|
|
namespace App\Message;
|
|
|
|
class SmsNotification
|
|
{
|
|
public function __construct(
|
|
private string $content,
|
|
) {
|
|
}
|
|
|
|
public function getContent(): string
|
|
{
|
|
return $this->content;
|
|
}
|
|
}
|
|
|
|
.. _messenger-handler:
|
|
|
|
A message handler is a PHP callable, the recommended way to create it is to
|
|
create a class that has the :class:`Symfony\\Component\\Messenger\\Attribute\\AsMessageHandler`
|
|
attribute and has an ``__invoke()`` method that's type-hinted with the
|
|
message class (or a message interface)::
|
|
|
|
// src/MessageHandler/SmsNotificationHandler.php
|
|
namespace App\MessageHandler;
|
|
|
|
use App\Message\SmsNotification;
|
|
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
|
|
|
|
#[AsMessageHandler]
|
|
class SmsNotificationHandler
|
|
{
|
|
public function __invoke(SmsNotification $message)
|
|
{
|
|
// ... do some work - like sending an SMS message!
|
|
}
|
|
}
|
|
|
|
.. tip::
|
|
|
|
You can also use the ``#[AsMessageHandler]`` attribute on individual class
|
|
methods. You may use the attribute on as many methods in a single class as you
|
|
like, allowing you to group the handling of multiple related types of messages.
|
|
|
|
Thanks to :ref:`autoconfiguration <services-autoconfigure>` and the ``SmsNotification``
|
|
type-hint, Symfony knows that this handler should be called when an ``SmsNotification``
|
|
message is dispatched. Most of the time, this is all you need to do. But you can
|
|
also :ref:`manually configure message handlers <messenger-handler-config>`. To
|
|
see all the configured handlers, run:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ php bin/console debug:messenger
|
|
|
|
Dispatching the Message
|
|
-----------------------
|
|
|
|
You're ready! To dispatch the message (and call the handler), inject the
|
|
``messenger.default_bus`` service (via the ``MessageBusInterface``), like in a controller::
|
|
|
|
// src/Controller/DefaultController.php
|
|
namespace App\Controller;
|
|
|
|
use App\Message\SmsNotification;
|
|
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
|
|
use Symfony\Component\HttpFoundation\Response;
|
|
use Symfony\Component\Messenger\MessageBusInterface;
|
|
|
|
class DefaultController extends AbstractController
|
|
{
|
|
public function index(MessageBusInterface $bus): Response
|
|
{
|
|
// will cause the SmsNotificationHandler to be called
|
|
$bus->dispatch(new SmsNotification('Look! I created a message!'));
|
|
|
|
// ...
|
|
}
|
|
}
|
|
|
|
Transports: Async/Queued Messages
|
|
---------------------------------
|
|
|
|
By default, messages are handled as soon as they are dispatched. If you want
|
|
to handle a message asynchronously, you can configure a transport. A transport
|
|
is capable of sending messages (e.g. to a queueing system) and then
|
|
:ref:`receiving them via a worker <messenger-worker>`. Messenger supports
|
|
:ref:`multiple transports <messenger-transports-config>`.
|
|
|
|
.. note::
|
|
|
|
If you want to use a transport that's not supported, check out the
|
|
`Enqueue's transport`_, which backs services like Kafka and Google
|
|
Pub/Sub.
|
|
|
|
A transport is registered using a "DSN". Thanks to Messenger's Flex recipe, your
|
|
``.env`` file already has a few examples.
|
|
|
|
.. code-block:: env
|
|
|
|
# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
|
|
# MESSENGER_TRANSPORT_DSN=doctrine://default
|
|
# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
|
|
|
|
Uncomment whichever transport you want (or set it in ``.env.local``). See
|
|
:ref:`messenger-transports-config` for more details.
|
|
|
|
Next, in ``config/packages/messenger.yaml``, let's define a transport called ``async``
|
|
that uses this configuration:
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
transports:
|
|
async: "%env(MESSENGER_TRANSPORT_DSN)%"
|
|
|
|
# or expanded to configure more options
|
|
#async:
|
|
# dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
|
|
# options: []
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<framework:messenger>
|
|
<framework:transport name="async">%env(MESSENGER_TRANSPORT_DSN)%</framework:transport>
|
|
|
|
<!-- or expanded to configure more options -->
|
|
<framework:transport name="async"
|
|
dsn="%env(MESSENGER_TRANSPORT_DSN)%"
|
|
>
|
|
<option key="...">...</option>
|
|
</framework:transport>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
$framework->messenger()
|
|
->transport('async')
|
|
->dsn(env('MESSENGER_TRANSPORT_DSN'))
|
|
;
|
|
|
|
$framework->messenger()
|
|
->transport('async')
|
|
->dsn(env('MESSENGER_TRANSPORT_DSN'))
|
|
->options([])
|
|
;
|
|
};
|
|
|
|
.. _messenger-routing:
|
|
|
|
Routing Messages to a Transport
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
Now that you have a transport configured, instead of handling a message immediately,
|
|
you can configure them to be sent to a transport:
|
|
|
|
.. _messenger-message-attribute:
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: php-attributes
|
|
|
|
// src/Message/SmsNotification.php
|
|
namespace App\Message;
|
|
|
|
use Symfony\Component\Messenger\Attribute\AsMessage;
|
|
|
|
#[AsMessage('async')]
|
|
class SmsNotification
|
|
{
|
|
// ...
|
|
}
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
transports:
|
|
async: "%env(MESSENGER_TRANSPORT_DSN)%"
|
|
|
|
routing:
|
|
# async is whatever name you gave your transport above
|
|
'App\Message\SmsNotification': async
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<framework:messenger>
|
|
<framework:routing message-class="App\Message\SmsNotification">
|
|
<!-- async is whatever name you gave your transport above -->
|
|
<framework:sender service="async"/>
|
|
</framework:routing>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
$framework->messenger()
|
|
// async is whatever name you gave your transport above
|
|
->routing('App\Message\SmsNotification')->senders(['async'])
|
|
;
|
|
};
|
|
|
|
.. versionadded:: 7.2
|
|
|
|
The ``#[AsMessage]`` attribute was introduced in Symfony 7.2.
|
|
|
|
Thanks to this, the ``App\Message\SmsNotification`` will be sent to the ``async``
|
|
transport and its handler(s) will *not* be called immediately. Any messages not
|
|
matched under ``routing`` will still be handled immediately, i.e. synchronously.
|
|
|
|
.. note::
|
|
|
|
If you configure routing with both YAML/XML/PHP configuration files and
|
|
PHP attributes, the configuration always takes precedence over the class
|
|
attribute. This behavior allows you to override routing on a per-environment basis.
|
|
|
|
.. note::
|
|
|
|
When configuring the routing in separate YAML/XML/PHP files, you can use a partial
|
|
PHP namespace like ``'App\Message\*'`` to match all the messages within the
|
|
matching namespace. The only requirement is that the ``'*'`` wildcard has to
|
|
be placed at the end of the namespace.
|
|
|
|
You may use ``'*'`` as the message class. This will act as a default routing
|
|
rule for any message not matched under ``routing``. This is useful to ensure
|
|
no message is handled synchronously by default.
|
|
|
|
The only drawback is that ``'*'`` will also apply to the emails sent with the
|
|
Symfony Mailer (which uses ``SendEmailMessage`` when Messenger is available).
|
|
This could cause issues if your emails are not serializable (e.g. if they include
|
|
file attachments as PHP resources/streams).
|
|
|
|
You can also route classes by their parent class or interface. Or send messages
|
|
to multiple transports:
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: php-attributes
|
|
|
|
// src/Message/SmsNotification.php
|
|
namespace App\Message;
|
|
|
|
use Symfony\Component\Messenger\Attribute\AsMessage;
|
|
|
|
#[AsMessage(['async', 'audit'])]
|
|
class SmsNotification
|
|
{
|
|
// ...
|
|
}
|
|
|
|
// if you prefer, you can also apply multiple attributes to the message class
|
|
#[AsMessage('async')]
|
|
#[AsMessage('audit')]
|
|
class SmsNotification
|
|
{
|
|
// ...
|
|
}
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
routing:
|
|
# route all messages that extend this example base class or interface
|
|
'App\Message\AbstractAsyncMessage': async
|
|
'App\Message\AsyncMessageInterface': async
|
|
|
|
'My\Message\ToBeSentToTwoSenders': [async, audit]
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<framework:messenger>
|
|
<!-- route all messages that extend this example base class or interface -->
|
|
<framework:routing message-class="App\Message\AbstractAsyncMessage">
|
|
<framework:sender service="async"/>
|
|
</framework:routing>
|
|
<framework:routing message-class="App\Message\AsyncMessageInterface">
|
|
<framework:sender service="async"/>
|
|
</framework:routing>
|
|
<framework:routing message-class="My\Message\ToBeSentToTwoSenders">
|
|
<framework:sender service="async"/>
|
|
<framework:sender service="audit"/>
|
|
</framework:routing>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
$messenger = $framework->messenger();
|
|
// route all messages that extend this example base class or interface
|
|
$messenger->routing('App\Message\AbstractAsyncMessage')->senders(['async']);
|
|
$messenger->routing('App\Message\AsyncMessageInterface')->senders(['async']);
|
|
$messenger->routing('My\Message\ToBeSentToTwoSenders')->senders(['async', 'audit']);
|
|
};
|
|
|
|
.. note::
|
|
|
|
If you configure routing for both a child and parent class, both rules
|
|
are used. E.g. if you have an ``SmsNotification`` object that extends
|
|
from ``Notification``, both the routing for ``Notification`` and
|
|
``SmsNotification`` will be used.
|
|
|
|
.. tip::
|
|
|
|
You can define and override the transport that a message is using at
|
|
runtime by using the
|
|
:class:`Symfony\\Component\\Messenger\\Stamp\\TransportNamesStamp` on
|
|
the envelope of the message. This stamp takes an array of transport
|
|
name as its only argument. For more information about stamps, see
|
|
`Envelopes & Stamps`_.
|
|
|
|
Doctrine Entities in Messages
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
If you need to pass a Doctrine entity in a message, it's better to pass the entity's
|
|
primary key (or whatever relevant information the handler actually needs, like ``email``,
|
|
etc.) instead of the object (otherwise you might see errors related to the Entity Manager)::
|
|
|
|
// src/Message/NewUserWelcomeEmail.php
|
|
namespace App\Message;
|
|
|
|
class NewUserWelcomeEmail
|
|
{
|
|
public function __construct(
|
|
private int $userId,
|
|
) {
|
|
}
|
|
|
|
public function getUserId(): int
|
|
{
|
|
return $this->userId;
|
|
}
|
|
}
|
|
|
|
Then, in your handler, you can query for a fresh object::
|
|
|
|
// src/MessageHandler/NewUserWelcomeEmailHandler.php
|
|
namespace App\MessageHandler;
|
|
|
|
use App\Message\NewUserWelcomeEmail;
|
|
use App\Repository\UserRepository;
|
|
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
|
|
|
|
#[AsMessageHandler]
|
|
class NewUserWelcomeEmailHandler
|
|
{
|
|
public function __construct(
|
|
private UserRepository $userRepository,
|
|
) {
|
|
}
|
|
|
|
public function __invoke(NewUserWelcomeEmail $welcomeEmail): void
|
|
{
|
|
$user = $this->userRepository->find($welcomeEmail->getUserId());
|
|
|
|
// ... send an email!
|
|
}
|
|
}
|
|
|
|
This guarantees the entity contains fresh data.
|
|
|
|
.. _messenger-handling-messages-synchronously:
|
|
|
|
Handling Messages Synchronously
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
If a message doesn't :ref:`match any routing rules <messenger-routing>`, it won't
|
|
be sent to any transport and will be handled immediately. In some cases (like
|
|
when `binding handlers to different transports`_),
|
|
it's easier or more flexible to handle this explicitly: by creating a ``sync``
|
|
transport and "sending" messages there to be handled immediately:
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
transports:
|
|
# ... other transports
|
|
|
|
sync: 'sync://'
|
|
|
|
routing:
|
|
App\Message\SmsNotification: sync
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<framework:messenger>
|
|
<!-- ... other transports -->
|
|
|
|
<framework:transport name="sync" dsn="sync://"/>
|
|
|
|
<framework:routing message-class="App\Message\SmsNotification">
|
|
<framework:sender service="sync"/>
|
|
</framework:routing>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
$messenger = $framework->messenger();
|
|
|
|
// ... other transports
|
|
|
|
$messenger->transport('sync')->dsn('sync://');
|
|
$messenger->routing('App\Message\SmsNotification')->senders(['sync']);
|
|
};
|
|
|
|
Creating your Own Transport
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
You can also create your own transport if you need to send or receive messages
|
|
from something that is not supported. See :doc:`/messenger/custom-transport`.
|
|
|
|
.. _messenger-worker:
|
|
|
|
Consuming Messages (Running the Worker)
|
|
---------------------------------------
|
|
|
|
Once your messages have been routed, in most cases, you'll need to "consume" them.
|
|
You can do this with the ``messenger:consume`` command:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ php bin/console messenger:consume async
|
|
|
|
# use -vv to see details about what's happening
|
|
$ php bin/console messenger:consume async -vv
|
|
|
|
The first argument is the receiver's name (or service id if you routed to a
|
|
custom service). By default, the command will run forever: looking for new messages
|
|
on your transport and handling them. This command is called your "worker".
|
|
|
|
If you want to consume messages from all available receivers, you can use the
|
|
command with the ``--all`` option:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ php bin/console messenger:consume --all
|
|
|
|
.. versionadded:: 7.1
|
|
|
|
The ``--all`` option was introduced in Symfony 7.1.
|
|
|
|
Messages that take a long time to process may be redelivered prematurely because
|
|
some transports assume that an unacknowledged message is lost. To prevent this
|
|
issue, use the ``--keepalive`` command option to specify an interval (in seconds;
|
|
default value = ``5``) at which the message is marked as "in progress". This prevents
|
|
the message from being redelivered until the worker completes processing it:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ php bin/console messenger:consume --keepalive
|
|
|
|
.. note::
|
|
|
|
This option is only available for the following transports: Beanstalkd, AmazonSQS, Doctrine and Redis.
|
|
|
|
.. versionadded:: 7.2
|
|
|
|
The ``--keepalive`` option was introduced in Symfony 7.2.
|
|
|
|
.. tip::
|
|
|
|
In a development environment and if you're using the Symfony CLI tool,
|
|
you can configure workers to be automatically run along with the webserver.
|
|
You can find more information in the
|
|
:ref:`Symfony CLI Workers <symfony-server_configuring-workers>` documentation.
|
|
|
|
.. tip::
|
|
|
|
To properly stop a worker, throw an instance of
|
|
:class:`Symfony\\Component\\Messenger\\Exception\\StopWorkerException`.
|
|
|
|
Deploying to Production
|
|
~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
On production, there are a few important things to think about:
|
|
|
|
**Use a Process Manager like Supervisor or systemd to keep your worker(s) running**
|
|
You'll want one or more "workers" running at all times. To do that, use a
|
|
process control system like :ref:`Supervisor <messenger-supervisor>`
|
|
or :ref:`systemd <messenger-systemd>`.
|
|
|
|
**Don't Let Workers Run Forever**
|
|
Some services (like Doctrine's ``EntityManager``) will consume more memory
|
|
over time. So, instead of allowing your worker to run forever, use a flag
|
|
like ``messenger:consume --limit=10`` to tell your worker to only handle 10
|
|
messages before exiting (then the process manager will create a new process). There
|
|
are also other options like ``--memory-limit=128M`` and ``--time-limit=3600``.
|
|
|
|
**Stopping Workers That Encounter Errors**
|
|
If a worker dependency like your database server is down, or timeout is reached,
|
|
you can try to add :ref:`reconnect logic <middleware-doctrine>`, or just quit
|
|
the worker if it receives too many errors with the ``--failure-limit`` option of
|
|
the ``messenger:consume`` command.
|
|
|
|
**Restart Workers on Deploy**
|
|
Each time you deploy, you'll need to restart all your worker processes so
|
|
that they see the newly deployed code. To do this, run ``messenger:stop-workers``
|
|
on deployment. This will signal to each worker that it should finish the message
|
|
it's currently handling and should shut down gracefully. Then, the process manager
|
|
will create new worker processes. The command uses the :ref:`app <cache-configuration-with-frameworkbundle>`
|
|
cache internally - so make sure this is configured to use an adapter you like.
|
|
|
|
**Use the Same Cache Between Deploys**
|
|
If your deploy strategy involves the creation of new target directories, you
|
|
should set a value for the :ref:`cache.prefix_seed <reference-cache-prefix-seed>`
|
|
configuration option in order to use the same cache namespace between deployments.
|
|
Otherwise, the ``cache.app`` pool will use the value of the ``kernel.project_dir``
|
|
parameter as base for the namespace, which will lead to different namespaces
|
|
each time a new deployment is made.
|
|
|
|
Prioritized Transports
|
|
~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
Sometimes certain types of messages should have a higher priority and be handled
|
|
before others. To make this possible, you can create multiple transports and route
|
|
different messages to them. For example:
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
transports:
|
|
async_priority_high:
|
|
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
|
|
options:
|
|
# queue_name is specific to the doctrine transport
|
|
queue_name: high
|
|
|
|
# for AMQP send to a separate exchange then queue
|
|
#exchange:
|
|
# name: high
|
|
#queues:
|
|
# messages_high: ~
|
|
# for redis try "group"
|
|
async_priority_low:
|
|
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
|
|
options:
|
|
queue_name: low
|
|
|
|
routing:
|
|
'App\Message\SmsNotification': async_priority_low
|
|
'App\Message\NewUserWelcomeEmail': async_priority_high
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<framework:messenger>
|
|
<framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
|
|
<framework:options>
|
|
<framework:queue>
|
|
<framework:name>Queue</framework:name>
|
|
</framework:queue>
|
|
</framework:options>
|
|
</framework:transport>
|
|
<framework:transport name="async_priority_low" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
|
|
<option key="queue_name">low</option>
|
|
</framework:transport>
|
|
|
|
<framework:routing message-class="App\Message\SmsNotification">
|
|
<framework:sender service="async_priority_low"/>
|
|
</framework:routing>
|
|
<framework:routing message-class="App\Message\NewUserWelcomeEmail">
|
|
<framework:sender service="async_priority_high"/>
|
|
</framework:routing>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
$messenger = $framework->messenger();
|
|
|
|
$messenger->transport('async_priority_high')
|
|
->dsn(env('MESSENGER_TRANSPORT_DSN'))
|
|
->options(['queue_name' => 'high']);
|
|
|
|
$messenger->transport('async_priority_low')
|
|
->dsn(env('MESSENGER_TRANSPORT_DSN'))
|
|
->options(['queue_name' => 'low']);
|
|
|
|
$messenger->routing('App\Message\SmsNotification')->senders(['async_priority_low']);
|
|
$messenger->routing('App\Message\NewUserWelcomeEmail')->senders(['async_priority_high']);
|
|
};
|
|
|
|
You can then run individual workers for each transport or instruct one worker
|
|
to handle messages in a priority order:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ php bin/console messenger:consume async_priority_high async_priority_low
|
|
|
|
The worker will always first look for messages waiting on ``async_priority_high``. If
|
|
there are none, *then* it will consume messages from ``async_priority_low``.
|
|
|
|
.. _messenger-limit-queues:
|
|
|
|
Limit Consuming to Specific Queues
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
Some transports (notably AMQP) have the concept of exchanges and queues. A Symfony
|
|
transport is always bound to an exchange. By default, the worker consumes from all
|
|
queues attached to the exchange of the specified transport. However, there are use
|
|
cases to want a worker to only consume from specific queues.
|
|
|
|
You can limit the worker to only process messages from specific queue(s):
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ php bin/console messenger:consume my_transport --queues=fasttrack
|
|
|
|
# you can pass the --queues option more than once to process multiple queues
|
|
$ php bin/console messenger:consume my_transport --queues=fasttrack1 --queues=fasttrack2
|
|
|
|
.. note::
|
|
|
|
To allow using the ``queues`` option, the receiver must implement the
|
|
:class:`Symfony\\Component\\Messenger\\Transport\\Receiver\\QueueReceiverInterface`.
|
|
|
|
.. _messenger-message-count:
|
|
|
|
Checking the Number of Queued Messages Per Transport
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
Run the ``messenger:stats`` command to know how many messages are in the "queues"
|
|
of some or all transports:
|
|
|
|
.. code-block:: terminal
|
|
|
|
# displays the number of queued messages in all transports
|
|
$ php bin/console messenger:stats
|
|
|
|
# shows stats only for some transports
|
|
$ php bin/console messenger:stats my_transport_name other_transport_name
|
|
|
|
# you can also output the stats in JSON format
|
|
$ php bin/console messenger:stats --format=json
|
|
$ php bin/console messenger:stats my_transport_name other_transport_name --format=json
|
|
|
|
.. versionadded:: 7.2
|
|
|
|
The ``format`` option was introduced in Symfony 7.2.
|
|
|
|
.. note::
|
|
|
|
In order for this command to work, the configured transport's receiver must implement
|
|
:class:`Symfony\\Component\\Messenger\\Transport\\Receiver\\MessageCountAwareInterface`.
|
|
|
|
.. _messenger-supervisor:
|
|
|
|
Supervisor Configuration
|
|
~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
Supervisor is a great tool to guarantee that your worker process(es) is
|
|
*always* running (even if it closes due to failure, hitting a message limit
|
|
or thanks to ``messenger:stop-workers``). You can install it on Ubuntu, for
|
|
example, via:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ sudo apt-get install supervisor
|
|
|
|
Supervisor configuration files typically live in a ``/etc/supervisor/conf.d``
|
|
directory. For example, you can create a new ``messenger-worker.conf`` file
|
|
there to make sure that 2 instances of ``messenger:consume`` are running at all
|
|
times:
|
|
|
|
.. code-block:: ini
|
|
|
|
;/etc/supervisor/conf.d/messenger-worker.conf
|
|
[program:messenger-consume]
|
|
command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
|
|
user=ubuntu
|
|
numprocs=2
|
|
startsecs=0
|
|
autostart=true
|
|
autorestart=true
|
|
startretries=10
|
|
process_name=%(program_name)s_%(process_num)02d
|
|
|
|
Change the ``async`` argument to use the name of your transport (or transports)
|
|
and ``user`` to the Unix user on your server.
|
|
|
|
.. warning::
|
|
|
|
During a deployment, something might be unavailable (e.g. the
|
|
database) causing the consumer to fail to start. In this situation,
|
|
Supervisor will try ``startretries`` number of times to restart the
|
|
command. Make sure to change this setting to avoid getting the command
|
|
in a FATAL state, which will never restart again.
|
|
|
|
Each restart, Supervisor increases the delay by 1 second. For instance, if
|
|
the value is ``10``, it will wait 1 sec, 2 sec, 3 sec, etc. This gives the
|
|
service a total of 55 seconds to become available again. Increase the
|
|
``startretries`` setting to cover the maximum expected downtime.
|
|
|
|
If you use the Redis Transport, note that each worker needs a unique consumer
|
|
name to avoid the same message being handled by multiple workers. One way to
|
|
achieve this is to set an environment variable in the Supervisor configuration
|
|
file, which you can then refer to in ``messenger.yaml``
|
|
(see the :ref:`Redis section <messenger-redis-transport>` below):
|
|
|
|
.. code-block:: ini
|
|
|
|
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
|
|
|
|
Next, tell Supervisor to read your config and start your workers:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ sudo supervisorctl reread
|
|
|
|
$ sudo supervisorctl update
|
|
|
|
$ sudo supervisorctl start messenger-consume:*
|
|
|
|
# If you deploy an update of your code, don't forget to restart your workers
|
|
# to run the new code
|
|
$ sudo supervisorctl restart messenger-consume:*
|
|
|
|
See the `Supervisor docs`_ for more details.
|
|
|
|
Graceful Shutdown
|
|
.................
|
|
|
|
If you install the `PCNTL`_ PHP extension in your project, workers will handle
|
|
the ``SIGTERM`` or ``SIGINT`` POSIX signals to finish processing their current
|
|
message before terminating.
|
|
|
|
However, you might prefer to use different POSIX signals for graceful shutdown.
|
|
You can override default ones by setting the ``framework.messenger.stop_worker_on_signals``
|
|
configuration option:
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
stop_worker_on_signals:
|
|
- SIGTERM
|
|
- SIGINT
|
|
- SIGUSR1
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<framework:messenger>
|
|
<!-- ... -->
|
|
<framework:stop-worker-on-signal>SIGTERM</framework:stop-worker-on-signal>
|
|
<framework:stop-worker-on-signal>SIGINT</framework:stop-worker-on-signal>
|
|
<framework:stop-worker-on-signal>SIGUSR1</framework:stop-worker-on-signal>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
$framework->messenger()
|
|
->stopWorkerOnSignals(['SIGTERM', 'SIGINT', 'SIGUSR1']);
|
|
};
|
|
|
|
.. versionadded:: 7.3
|
|
|
|
Support for signals plain names in configuration was introduced in Symfony 7.3.
|
|
Previously, you had to use the numeric values of signals as defined by the
|
|
``pcntl`` extension's `predefined constants`_.
|
|
|
|
In some cases the ``SIGTERM`` signal is sent by Supervisor itself (e.g. stopping
|
|
a Docker container having Supervisor as its entrypoint). In these cases you
|
|
need to add a ``stopwaitsecs`` key to the program configuration (with a value
|
|
of the desired grace period in seconds) in order to perform a graceful shutdown:
|
|
|
|
.. code-block:: ini
|
|
|
|
[program:x]
|
|
stopwaitsecs=20
|
|
|
|
.. _messenger-systemd:
|
|
|
|
Systemd Configuration
|
|
~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
While Supervisor is a great tool, it has the disadvantage that you need system
|
|
access to run it. Systemd has become the standard on most Linux distributions,
|
|
and has a good alternative called *user services*.
|
|
|
|
Systemd user service configuration files typically live in a ``~/.config/systemd/user``
|
|
directory. For example, you can create a new ``messenger-worker.service`` file. Or a
|
|
``messenger-worker@.service`` file if you want more instances running at the same time:
|
|
|
|
.. code-block:: ini
|
|
|
|
[Unit]
|
|
Description=Symfony messenger-consume %i
|
|
|
|
[Service]
|
|
ExecStart=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
|
|
# for Redis, set a custom consumer name for each instance
|
|
Environment="MESSENGER_CONSUMER_NAME=symfony-%n-%i"
|
|
Restart=always
|
|
RestartSec=30
|
|
|
|
[Install]
|
|
WantedBy=default.target
|
|
|
|
Now, tell systemd to enable and start one worker:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ systemctl --user enable messenger-worker@1.service
|
|
$ systemctl --user start messenger-worker@1.service
|
|
|
|
# to enable and start 20 workers
|
|
$ systemctl --user enable messenger-worker@{1..20}.service
|
|
$ systemctl --user start messenger-worker@{1..20}.service
|
|
|
|
If you change your service config file, you need to reload the daemon:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ systemctl --user daemon-reload
|
|
|
|
To restart all your consumers:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ systemctl --user restart messenger-consume@*.service
|
|
|
|
The systemd user instance is only started after the first login of the
|
|
particular user. Consumer often need to start on system boot instead.
|
|
Enable lingering on the user to activate that behavior:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ loginctl enable-linger <your-username>
|
|
|
|
Logs are managed by journald and can be worked with using the journalctl
|
|
command:
|
|
|
|
.. code-block:: terminal
|
|
|
|
# follow logs of consumer nr 11
|
|
$ journalctl -f --user-unit messenger-consume@11.service
|
|
|
|
# follow logs of all consumers
|
|
$ journalctl -f --user-unit messenger-consume@*
|
|
|
|
# follow all logs from your user services
|
|
$ journalctl -f _UID=$UID
|
|
|
|
See the `systemd docs`_ for more details.
|
|
|
|
.. note::
|
|
|
|
You either need elevated privileges for the ``journalctl`` command, or add
|
|
your user to the systemd-journal group:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ sudo usermod -a -G systemd-journal <your-username>
|
|
|
|
Stateless Worker
|
|
~~~~~~~~~~~~~~~~
|
|
|
|
PHP is designed to be stateless, there are no shared resources across different
|
|
requests. In HTTP context PHP cleans everything after sending the response, so
|
|
you can decide to not take care of services that may leak memory.
|
|
|
|
On the other hand, it's common for workers to process messages sequentially in
|
|
long-running CLI processes which don't finish after processing a single message.
|
|
Beware about service states to prevent information and/or memory leakage as
|
|
Symfony will inject the same instance of a service in all messages, preserving
|
|
the internal state of the services.
|
|
|
|
However, certain Symfony services, such as the Monolog
|
|
:ref:`fingers crossed handler <logging-handler-fingers_crossed>`, leak by design.
|
|
Symfony provides a **service reset** feature to solve this problem. When resetting
|
|
the container automatically between two messages, Symfony looks for any services
|
|
implementing :class:`Symfony\\Contracts\\Service\\ResetInterface` (including your
|
|
own services) and calls their ``reset()`` method so they can clean their internal state.
|
|
|
|
If a service is not stateless and you want to reset its properties after each message, then
|
|
the service must implement :class:`Symfony\\Contracts\\Service\\ResetInterface` where you can reset the
|
|
properties in the ``reset()`` method.
|
|
|
|
If you don't want to reset the container, add the ``--no-reset`` option when
|
|
running the ``messenger:consume`` command.
|
|
|
|
.. _messenger-retries-failures:
|
|
|
|
Rate Limited Transport
|
|
~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
Sometimes you might need to rate limit your message worker. You can configure a
|
|
rate limiter on a transport (requires the :doc:`RateLimiter component </rate_limiter>`)
|
|
by setting its ``rate_limiter`` option:
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
transports:
|
|
async:
|
|
rate_limiter: your_rate_limiter_name
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<framework:messenger>
|
|
<framework:transport name="async">
|
|
<option key="rate_limiter">your_rate_limiter_name</option>
|
|
</framework:transport>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework) {
|
|
$framework->messenger()
|
|
->transport('async')
|
|
->options(['rate_limiter' => 'your_rate_limiter_name'])
|
|
;
|
|
};
|
|
|
|
.. warning::
|
|
|
|
When a rate limiter is configured on a transport, it will block the whole
|
|
worker when the limit is hit. You should make sure you configure a dedicated
|
|
worker for a rate limited transport to avoid other transports to be blocked.
|
|
|
|
Retries & Failures
|
|
------------------
|
|
|
|
If an exception is thrown while consuming a message from a transport it will
|
|
automatically be re-sent to the transport to be tried again. By default, a message
|
|
will be retried 3 times before being discarded or
|
|
:ref:`sent to the failure transport <messenger-failure-transport>`. Each retry
|
|
will also be delayed, in case the failure was due to a temporary issue. All of
|
|
this is configurable for each transport:
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
transports:
|
|
async_priority_high:
|
|
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
|
|
|
|
# default configuration
|
|
retry_strategy:
|
|
max_retries: 3
|
|
# milliseconds delay
|
|
delay: 1000
|
|
# causes the delay to be higher before each retry
|
|
# e.g. 1 second delay, 2 seconds, 4 seconds
|
|
multiplier: 2
|
|
max_delay: 0
|
|
# applies randomness to the delay that can prevent the thundering herd effect
|
|
# the value (between 0 and 1.0) is the percentage of 'delay' that will be added/subtracted
|
|
jitter: 0.1
|
|
# override all of this with a service that
|
|
# implements Symfony\Component\Messenger\Retry\RetryStrategyInterface
|
|
# service: null
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<framework:messenger>
|
|
<framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%?queue_name=high_priority">
|
|
<framework:retry-strategy max-retries="3" delay="1000" multiplier="2" max-delay="0" jitter="0.1"/>
|
|
</framework:transport>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
$messenger = $framework->messenger();
|
|
|
|
$messenger->transport('async_priority_high')
|
|
->dsn(env('MESSENGER_TRANSPORT_DSN'))
|
|
// default configuration
|
|
->retryStrategy()
|
|
->maxRetries(3)
|
|
// milliseconds delay
|
|
->delay(1000)
|
|
// causes the delay to be higher before each retry
|
|
// e.g. 1 second delay, 2 seconds, 4 seconds
|
|
->multiplier(2)
|
|
->maxDelay(0)
|
|
// applies randomness to the delay that can prevent the thundering herd effect
|
|
// the value (between 0 and 1.0) is the percentage of 'delay' that will be added/subtracted
|
|
->jitter(0.1)
|
|
// override all of this with a service that
|
|
// implements Symfony\Component\Messenger\Retry\RetryStrategyInterface
|
|
->service(null)
|
|
;
|
|
};
|
|
|
|
.. versionadded:: 7.1
|
|
|
|
The ``jitter`` option was introduced in Symfony 7.1.
|
|
|
|
.. tip::
|
|
|
|
Symfony triggers a :class:`Symfony\\Component\\Messenger\\Event\\WorkerMessageRetriedEvent`
|
|
when a message is retried so you can run your own logic.
|
|
|
|
.. note::
|
|
|
|
Thanks to :class:`Symfony\\Component\\Messenger\\Stamp\\SerializedMessageStamp`,
|
|
the serialized form of the message is saved, which prevents to serialize it
|
|
again if the message is later retried.
|
|
|
|
Avoiding Retrying
|
|
~~~~~~~~~~~~~~~~~
|
|
|
|
Sometimes handling a message might fail in a way that you *know* is permanent
|
|
and should not be retried. If you throw
|
|
:class:`Symfony\\Component\\Messenger\\Exception\\UnrecoverableMessageHandlingException`,
|
|
the message will not be retried.
|
|
|
|
.. note::
|
|
|
|
Messages that will not be retried, will still show up in the configured failure transport.
|
|
If you want to avoid that, consider handling the error yourself and let the handler
|
|
successfully end.
|
|
|
|
Forcing Retrying
|
|
~~~~~~~~~~~~~~~~
|
|
|
|
Sometimes handling a message must fail in a way that you *know* is temporary
|
|
and must be retried. If you throw
|
|
:class:`Symfony\\Component\\Messenger\\Exception\\RecoverableMessageHandlingException`,
|
|
the message will always be retried infinitely and ``max_retries`` setting will be ignored.
|
|
|
|
You can define a custom retry delay in milliseconds (e.g., to use the value from
|
|
the ``Retry-After`` header in an HTTP response) by setting the ``retryDelay``
|
|
argument in the constructor of the ``RecoverableMessageHandlingException``.
|
|
|
|
.. versionadded:: 7.2
|
|
|
|
The ``retryDelay`` argument and the ``getRetryDelay()`` method were introduced
|
|
in Symfony 7.2.
|
|
|
|
.. _messenger-failure-transport:
|
|
|
|
Saving & Retrying Failed Messages
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
If a message fails it is retried multiple times (``max_retries``) and then will
|
|
be discarded. To avoid this happening, you can instead configure a ``failure_transport``:
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
# after retrying, messages will be sent to the "failed" transport
|
|
failure_transport: failed
|
|
|
|
transports:
|
|
# ... other transports
|
|
|
|
failed: 'doctrine://default?queue_name=failed'
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<!-- after retrying, messages will be sent to the "failed" transport -->
|
|
<framework:messenger failure-transport="failed">
|
|
<!-- ... other transports -->
|
|
|
|
<framework:transport name="failed" dsn="doctrine://default?queue_name=failed"/>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
$messenger = $framework->messenger();
|
|
|
|
// after retrying, messages will be sent to the "failed" transport
|
|
$messenger->failureTransport('failed');
|
|
|
|
// ... other transports
|
|
|
|
$messenger->transport('failed')
|
|
->dsn('doctrine://default?queue_name=failed');
|
|
};
|
|
|
|
In this example, if handling a message fails 3 times (default ``max_retries``),
|
|
it will then be sent to the ``failed`` transport. While you *can* use
|
|
``messenger:consume failed`` to consume this like a normal transport, you'll
|
|
usually want to manually view the messages in the failure transport and choose
|
|
to retry them:
|
|
|
|
.. code-block:: terminal
|
|
|
|
# see all messages in the failure transport with a default limit of 50
|
|
$ php bin/console messenger:failed:show
|
|
|
|
# see the 10 first messages
|
|
$ php bin/console messenger:failed:show --max=10
|
|
|
|
# see only App\Message\MyMessage messages
|
|
$ php bin/console messenger:failed:show --class-filter='App\Message\MyMessage'
|
|
|
|
# see the number of messages by message class
|
|
$ php bin/console messenger:failed:show --stats
|
|
|
|
# see details about a specific failure
|
|
$ php bin/console messenger:failed:show 20 -vv
|
|
|
|
# for each message, this command asks whether to retry, skip, or delete
|
|
$ php bin/console messenger:failed:retry -vv
|
|
|
|
# retry specific messages
|
|
$ php bin/console messenger:failed:retry 20 30 --force
|
|
|
|
# remove a message without retrying it
|
|
$ php bin/console messenger:failed:remove 20
|
|
|
|
# remove messages without retrying them and show each message before removing it
|
|
$ php bin/console messenger:failed:remove 20 30 --show-messages
|
|
|
|
# remove all messages in the failure transport
|
|
$ php bin/console messenger:failed:remove --all
|
|
|
|
# remove only App\Message\MyMessage messages
|
|
$ php bin/console messenger:failed:remove --class-filter='App\Message\MyMessage'
|
|
|
|
If the message fails again, it will be re-sent back to the failure transport
|
|
due to the normal :ref:`retry rules <messenger-retries-failures>`. Once the max
|
|
retry has been hit, the message will be discarded permanently.
|
|
|
|
.. versionadded:: 7.2
|
|
|
|
The option to skip a message in the ``messenger:failed:retry`` command was
|
|
introduced in Symfony 7.2
|
|
|
|
.. versionadded:: 7.3
|
|
|
|
The option to filter by a message class in the ``messenger:failed:remove`` command was
|
|
introduced in Symfony 7.3
|
|
|
|
Multiple Failed Transports
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
Sometimes it is not enough to have a single, global ``failed transport`` configured
|
|
because some messages are more important than others. In those cases, you can
|
|
override the failure transport for only specific transports:
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
# after retrying, messages will be sent to the "failed" transport
|
|
# by default if no "failed_transport" is configured inside a transport
|
|
failure_transport: failed_default
|
|
|
|
transports:
|
|
async_priority_high:
|
|
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
|
|
failure_transport: failed_high_priority
|
|
|
|
# since no failed transport is configured, the one used will be
|
|
# the global "failure_transport" set
|
|
async_priority_low:
|
|
dsn: 'doctrine://default?queue_name=async_priority_low'
|
|
|
|
failed_default: 'doctrine://default?queue_name=failed_default'
|
|
failed_high_priority: 'doctrine://default?queue_name=failed_high_priority'
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<!-- after retrying, messages will be sent to the "failed" transport
|
|
by default if no "failed-transport" is configured inside a transport -->
|
|
<framework:messenger failure-transport="failed_default">
|
|
<framework:transport name="async_priority_high" dsn="%env(MESSENGER_TRANSPORT_DSN)%" failure-transport="failed_high_priority"/>
|
|
<!-- since no "failed_transport" is configured, the one used will be
|
|
the global "failed_transport" set -->
|
|
<framework:transport name="async_priority_low" dsn="doctrine://default?queue_name=async_priority_low"/>
|
|
|
|
<framework:transport name="failed_default" dsn="doctrine://default?queue_name=failed_default"/>
|
|
<framework:transport name="failed_high_priority" dsn="doctrine://default?queue_name=failed_high_priority"/>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
$messenger = $framework->messenger();
|
|
|
|
// after retrying, messages will be sent to the "failed" transport
|
|
// by default if no "failure_transport" is configured inside a transport
|
|
$messenger->failureTransport('failed_default');
|
|
|
|
$messenger->transport('async_priority_high')
|
|
->dsn(env('MESSENGER_TRANSPORT_DSN'))
|
|
->failureTransport('failed_high_priority');
|
|
|
|
// since no failed transport is configured, the one used will be
|
|
// the global failure_transport set
|
|
$messenger->transport('async_priority_low')
|
|
->dsn('doctrine://default?queue_name=async_priority_low');
|
|
|
|
$messenger->transport('failed_default')
|
|
->dsn('doctrine://default?queue_name=failed_default');
|
|
|
|
$messenger->transport('failed_high_priority')
|
|
->dsn('doctrine://default?queue_name=failed_high_priority');
|
|
};
|
|
|
|
If there is no ``failure_transport`` defined globally or on the transport level,
|
|
the messages will be discarded after the number of retries.
|
|
|
|
The failed commands have an optional option ``--transport`` to specify
|
|
the ``failure_transport`` configured at the transport level.
|
|
|
|
.. code-block:: terminal
|
|
|
|
# see all messages in "failure_transport" transport
|
|
$ php bin/console messenger:failed:show --transport=failure_transport
|
|
|
|
# retry specific messages from "failure_transport"
|
|
$ php bin/console messenger:failed:retry 20 30 --transport=failure_transport --force
|
|
|
|
# remove a message without retrying it from "failure_transport"
|
|
$ php bin/console messenger:failed:remove 20 --transport=failure_transport
|
|
|
|
.. _messenger-transports-config:
|
|
|
|
Transport Configuration
|
|
-----------------------
|
|
|
|
Messenger supports a number of different transport types, each with their own
|
|
options. Options can be passed to the transport via a DSN string or configuration.
|
|
|
|
.. code-block:: env
|
|
|
|
# .env
|
|
MESSENGER_TRANSPORT_DSN=amqp://localhost/%2f/messages?auto_setup=false
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
transports:
|
|
my_transport:
|
|
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
|
|
options:
|
|
auto_setup: false
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<framework:messenger>
|
|
<framework:transport name="my_transport" dsn="%env(MESSENGER_TRANSPORT_DSN)%">
|
|
<framework:options auto-setup="false"/>
|
|
</framework:transport>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
$messenger = $framework->messenger();
|
|
|
|
$messenger->transport('my_transport')
|
|
->dsn(env('MESSENGER_TRANSPORT_DSN'))
|
|
->options(['auto_setup' => false]);
|
|
};
|
|
|
|
Options defined under ``options`` take precedence over ones defined in the DSN.
|
|
|
|
AMQP Transport
|
|
~~~~~~~~~~~~~~
|
|
|
|
The AMQP transport uses the AMQP PHP extension to send messages to queues like
|
|
RabbitMQ. Install it by running:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ composer require symfony/amqp-messenger
|
|
|
|
The AMQP transport DSN may look like this:
|
|
|
|
.. code-block:: env
|
|
|
|
# .env
|
|
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
|
|
|
|
# or use the AMQPS protocol
|
|
MESSENGER_TRANSPORT_DSN=amqps://guest:guest@localhost/%2f/messages
|
|
|
|
If you want to use TLS/SSL encrypted AMQP, you must also provide a CA certificate.
|
|
Define the certificate path in the ``amqp.cacert`` PHP.ini setting
|
|
(e.g. ``amqp.cacert = /etc/ssl/certs``) or in the ``cacert`` parameter of the
|
|
DSN (e.g ``amqps://localhost?cacert=/etc/ssl/certs/``).
|
|
|
|
The default port used by TLS/SSL encrypted AMQP is 5671, but you can overwrite
|
|
it in the ``port`` parameter of the DSN (e.g. ``amqps://localhost?cacert=/etc/ssl/certs/&port=12345``).
|
|
|
|
.. note::
|
|
|
|
By default, the transport will automatically create any exchanges, queues and
|
|
binding keys that are needed. That can be disabled, but some functionality
|
|
may not work correctly (like delayed queues).
|
|
To not autocreate any queues, you can configure a transport with ``queues: []``.
|
|
|
|
.. note::
|
|
|
|
You can limit the consumer of an AMQP transport to only process messages
|
|
from some queues of an exchange. See :ref:`messenger-limit-queues`.
|
|
|
|
The transport has a number of other options, including ways to configure
|
|
the exchange, queues binding keys and more. See the documentation on
|
|
:class:`Symfony\\Component\\Messenger\\Bridge\\Amqp\\Transport\\Connection`.
|
|
|
|
The transport has a number of options:
|
|
|
|
``auto_setup`` (default: ``true``)
|
|
Whether the exchanges and queues should be created automatically during
|
|
send / get.
|
|
|
|
``cacert``
|
|
Path to the CA cert file in PEM format.
|
|
|
|
``cert``
|
|
Path to the client certificate in PEM format.
|
|
|
|
``channel_max``
|
|
Specifies highest channel number that the server permits. 0 means standard
|
|
extension limit
|
|
|
|
``confirm_timeout``
|
|
Timeout in seconds for confirmation; if none specified, transport will not
|
|
wait for message confirmation. Note: 0 or greater seconds. May be
|
|
fractional.
|
|
|
|
``connect_timeout``
|
|
Connection timeout. Note: 0 or greater seconds. May be fractional.
|
|
|
|
``frame_max``
|
|
The largest frame size that the server proposes for the connection,
|
|
including frame header and end-byte. 0 means standard extension limit
|
|
(depends on librabbimq default frame size limit)
|
|
|
|
``heartbeat``
|
|
The delay, in seconds, of the connection heartbeat that the server wants. 0
|
|
means the server does not want a heartbeat. Note, librabbitmq has limited
|
|
heartbeat support, which means heartbeats checked only during blocking
|
|
calls.
|
|
|
|
``host``
|
|
Hostname of the AMQP service
|
|
|
|
``key``
|
|
Path to the client key in PEM format.
|
|
|
|
``login``
|
|
Username to use to connect the AMQP service
|
|
|
|
``password``
|
|
Password to use to connect to the AMQP service
|
|
|
|
``persistent`` (default: ``'false'``)
|
|
Whether the connection is persistent
|
|
|
|
``port``
|
|
Port of the AMQP service
|
|
|
|
``read_timeout``
|
|
Timeout in for income activity. Note: 0 or greater seconds. May be
|
|
fractional.
|
|
|
|
``retry``
|
|
(no description available)
|
|
|
|
``sasl_method``
|
|
(no description available)
|
|
|
|
``connection_name``
|
|
For custom connection names (requires at least version 1.10 of the PHP AMQP
|
|
extension)
|
|
|
|
``verify``
|
|
Enable or disable peer verification. If peer verification is enabled then
|
|
the common name in the server certificate must match the server name. Peer
|
|
verification is enabled by default.
|
|
|
|
``vhost``
|
|
Virtual Host to use with the AMQP service
|
|
|
|
``write_timeout``
|
|
Timeout in for outcome activity. Note: 0 or greater seconds. May be
|
|
fractional.
|
|
|
|
``delay[queue_name_pattern]`` (default: ``delay_%exchange_name%_%routing_key%_%delay%``)
|
|
Pattern to use to create the queues
|
|
|
|
``delay[exchange_name]`` (default: ``delays``)
|
|
Name of the exchange to be used for the delayed/retried messages
|
|
|
|
``queues[name][arguments]``
|
|
Extra arguments
|
|
|
|
``queues[name][binding_arguments]``
|
|
Arguments to be used while binding the queue.
|
|
|
|
``queues[name][binding_keys]``
|
|
The binding keys (if any) to bind to this queue
|
|
|
|
``queues[name][flags]`` (default: ``AMQP_DURABLE``)
|
|
Queue flags
|
|
|
|
``exchange[arguments]``
|
|
Extra arguments for the exchange (e.g. ``alternate-exchange``)
|
|
|
|
``exchange[default_publish_routing_key]``
|
|
Routing key to use when publishing, if none is specified on the message
|
|
|
|
``exchange[flags]`` (default: ``AMQP_DURABLE``)
|
|
Exchange flags
|
|
|
|
``exchange[name]``
|
|
Name of the exchange. Use an empty string to use the default exchange.
|
|
|
|
``exchange[type]`` (default: ``fanout``)
|
|
Type of exchange
|
|
|
|
.. versionadded:: 7.3
|
|
|
|
Empty string support for ``exchange[name]`` was introduced in Symfony 7.3.
|
|
|
|
You can also configure AMQP-specific settings on your message by adding
|
|
:class:`Symfony\\Component\\Messenger\\Bridge\\Amqp\\Transport\\AmqpStamp` to
|
|
your Envelope::
|
|
|
|
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
|
|
// ...
|
|
|
|
$attributes = [];
|
|
$bus->dispatch(new SmsNotification(), [
|
|
new AmqpStamp('custom-routing-key', AMQP_NOPARAM, $attributes),
|
|
]);
|
|
|
|
The AMQP transport automatically adds a
|
|
:class:`Symfony\\Component\\Messenger\\Stamp\\TransportMessageIdStamp` to
|
|
messages when they are sent and received. This stamp tracks the AMQP message
|
|
ID, which improves logging context when messages fail and are retried.
|
|
|
|
.. versionadded:: 7.3
|
|
|
|
The ``TransportMessageIdStamp`` support in the AMQP transport was
|
|
introduced in Symfony 7.3.
|
|
|
|
.. warning::
|
|
|
|
The consumers do not show up in an admin panel as this transport does not rely on
|
|
``\AmqpQueue::consume()`` which is blocking. Having a blocking receiver makes
|
|
the ``--time-limit/--memory-limit`` options of the ``messenger:consume`` command as well as
|
|
the ``messenger:stop-workers`` command inefficient, as they all rely on the fact that
|
|
the receiver returns immediately no matter if it finds a message or not. The consume
|
|
worker is responsible for iterating until it receives a message to handle and/or until one
|
|
of the stop conditions is reached. Therefore, the worker's stop logic cannot be reached if it
|
|
is stuck in a blocking call.
|
|
|
|
.. tip::
|
|
|
|
If your application faces socket exceptions or `high connection churn`_
|
|
(shown by the rapid creation and deletion of connections), consider using
|
|
`AMQProxy`_. This tool works as a gateway between Symfony Messenger and AMQP server,
|
|
maintaining stable connections and minimizing overheads (which also improves
|
|
the overall performance).
|
|
|
|
Doctrine Transport
|
|
~~~~~~~~~~~~~~~~~~
|
|
|
|
The Doctrine transport can be used to store messages in a database table.
|
|
Install it by running:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ composer require symfony/doctrine-messenger
|
|
|
|
The Doctrine transport DSN may look like this:
|
|
|
|
.. code-block:: env
|
|
|
|
# .env
|
|
MESSENGER_TRANSPORT_DSN=doctrine://default
|
|
|
|
The format is ``doctrine://<connection_name>``, in case you have multiple connections
|
|
and want to use one other than the "default". The transport will automatically create
|
|
a table named ``messenger_messages``.
|
|
|
|
If you want to change the default table name, pass a custom table name in the
|
|
DSN by using the ``table_name`` option:
|
|
|
|
.. code-block:: env
|
|
|
|
# .env
|
|
MESSENGER_TRANSPORT_DSN=doctrine://default?table_name=your_custom_table_name
|
|
|
|
Or, to create the table yourself, set the ``auto_setup`` option to ``false`` and
|
|
:ref:`generate a migration <doctrine-creating-the-database-tables-schema>`.
|
|
|
|
The transport has a number of options:
|
|
|
|
``table_name`` (default: ``messenger_messages``)
|
|
Name of the table
|
|
|
|
``queue_name`` (default: ``default``)
|
|
Name of the queue (a column in the table, to use one table for multiple
|
|
transports)
|
|
|
|
``redeliver_timeout`` (default: ``3600``)
|
|
Timeout before retrying a message that's in the queue but in the "handling"
|
|
state (if a worker stopped for some reason, this will occur, eventually you
|
|
should retry the message) - in seconds.
|
|
|
|
.. note::
|
|
|
|
Set ``redeliver_timeout`` to a greater value than your longest message
|
|
duration. Otherwise, some messages will start a second time while the
|
|
first one is still being handled.
|
|
|
|
``auto_setup``
|
|
Whether the table should be created automatically during send / get.
|
|
|
|
When using PostgreSQL, you have access to the following options to leverage
|
|
the `LISTEN/NOTIFY`_ feature. This allow for a more performant approach
|
|
than the default polling behavior of the Doctrine transport because
|
|
PostgreSQL will directly notify the workers when a new message is inserted
|
|
in the table.
|
|
|
|
``use_notify`` (default: ``true``)
|
|
Whether to use LISTEN/NOTIFY.
|
|
|
|
``check_delayed_interval`` (default: ``60000``)
|
|
The interval to check for delayed messages, in milliseconds. Set to 0 to
|
|
disable checks.
|
|
|
|
``get_notify_timeout`` (default: ``0``)
|
|
The length of time to wait for a response when calling
|
|
``PDO::pgsqlGetNotify``, in milliseconds.
|
|
|
|
The Doctrine transport supports the ``--keepalive`` option by periodically updating
|
|
the ``delivered_at`` timestamp to prevent the message from being redelivered.
|
|
|
|
.. versionadded:: 7.3
|
|
|
|
Keepalive support was introduced in Symfony 7.3.
|
|
|
|
Beanstalkd Transport
|
|
~~~~~~~~~~~~~~~~~~~~
|
|
|
|
The Beanstalkd transport sends messages directly to a Beanstalkd work queue. Install
|
|
it by running:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ composer require symfony/beanstalkd-messenger
|
|
|
|
The Beanstalkd transport DSN may looks like this:
|
|
|
|
.. code-block:: env
|
|
|
|
# .env
|
|
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost:11300?tube_name=foo&timeout=4&ttr=120
|
|
|
|
# If no port, it will default to 11300
|
|
MESSENGER_TRANSPORT_DSN=beanstalkd://localhost
|
|
|
|
The transport has a number of options:
|
|
|
|
``bury_on_reject`` (default: ``false``)
|
|
When set to ``true``, rejected messages are placed into a "buried" state
|
|
in Beanstalkd instead of being deleted.
|
|
|
|
.. versionadded:: 7.3
|
|
|
|
The ``bury_on_reject`` option was introduced in Symfony 7.3.
|
|
|
|
``timeout`` (default: ``0``)
|
|
Message reservation timeout - in seconds. 0 will cause the server to
|
|
immediately return either a response or a TransportException will be thrown.
|
|
|
|
``ttr`` (default: ``90``)
|
|
The message time to run before it is put back in the ready queue - in
|
|
seconds.
|
|
|
|
``tube_name`` (default: ``default``)
|
|
Name of the queue
|
|
|
|
The Beanstalkd transport supports the ``--keepalive`` option by using Beanstalkd's
|
|
``touch`` command to periodically reset the job's ``ttr``.
|
|
|
|
.. versionadded:: 7.2
|
|
|
|
Keepalive support was introduced in Symfony 7.2.
|
|
|
|
The Beanstalkd transport lets you set the priority of the messages being dispatched.
|
|
Use the :class:`Symfony\\Component\\Messenger\\Bridge\\Beanstalkd\\Transport\\BeanstalkdPriorityStamp`
|
|
and pass a number to specify the priority (default = ``1024``; lower numbers mean higher priority)::
|
|
|
|
use App\Message\SomeMessage;
|
|
use Symfony\Component\Messenger\Stamp\BeanstalkdPriorityStamp;
|
|
|
|
$this->bus->dispatch(new SomeMessage('some data'), [
|
|
// 0 = highest priority
|
|
// 2**32 - 1 = lowest priority
|
|
new BeanstalkdPriorityStamp(0),
|
|
]);
|
|
|
|
.. versionadded:: 7.3
|
|
|
|
``BeanstalkdPriorityStamp`` support was introduced in Symfony 7.3.
|
|
|
|
.. _messenger-redis-transport:
|
|
|
|
Redis Transport
|
|
~~~~~~~~~~~~~~~
|
|
|
|
The Redis transport uses `streams`_ to queue messages. This transport requires
|
|
the Redis PHP extension (>=4.3) and a running Redis server (^5.0). Install it by
|
|
running:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ composer require symfony/redis-messenger
|
|
|
|
The Redis transport DSN may looks like this:
|
|
|
|
.. code-block:: env
|
|
|
|
# .env
|
|
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
|
|
# Full DSN Example
|
|
MESSENGER_TRANSPORT_DSN=redis://password@localhost:6379/messages/symfony/consumer?auto_setup=true&serializer=1&stream_max_entries=0&dbindex=0
|
|
# Redis Cluster Example
|
|
MESSENGER_TRANSPORT_DSN=redis://host-01:6379,redis://host-02:6379,redis://host-03:6379,redis://host-04:6379
|
|
# Unix Socket Example
|
|
MESSENGER_TRANSPORT_DSN=redis:///var/run/redis.sock
|
|
# TLS Example
|
|
MESSENGER_TRANSPORT_DSN=rediss://localhost:6379/messages
|
|
# Multiple Redis Sentinel Hosts Example
|
|
MESSENGER_TRANSPORT_DSN=redis:?host[redis1:26379]&host[redis2:26379]&host[redis3:26379]&sentinel_master=db
|
|
|
|
A number of options can be configured via the DSN or via the ``options`` key
|
|
under the transport in ``messenger.yaml``:
|
|
|
|
``stream`` (default: ``messages``)
|
|
The Redis stream name
|
|
|
|
``group`` (default: ``symfony``)
|
|
The Redis consumer group name
|
|
|
|
``consumer`` (default: ``consumer``)
|
|
Consumer name used in Redis. Allows setting an explicit consumer name identifier.
|
|
Recommended in environments with multiple workers to prevent duplicate message
|
|
processing. Typically set via an environment variable:
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
transports:
|
|
redis:
|
|
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
|
|
options:
|
|
consumer: '%env(MESSENGER_CONSUMER_NAME)%'
|
|
|
|
``auto_setup`` (default: ``true``)
|
|
Whether to create the Redis group automatically
|
|
|
|
``auth``
|
|
The Redis password
|
|
|
|
``delete_after_ack`` (default: ``true``)
|
|
If ``true``, messages are deleted automatically after processing them
|
|
|
|
``delete_after_reject`` (default: ``true``)
|
|
If ``true``, messages are deleted automatically if they are rejected
|
|
|
|
``lazy`` (default: ``false``)
|
|
Connect only when a connection is really needed
|
|
|
|
``serializer`` (default: ``Redis::SERIALIZER_PHP``)
|
|
How to serialize the final payload in Redis (the ``Redis::OPT_SERIALIZER`` option)
|
|
|
|
``stream_max_entries`` (default: ``0``)
|
|
The maximum number of entries which the stream will be trimmed to. Set it to
|
|
a large enough number to avoid losing pending messages
|
|
|
|
``redeliver_timeout`` (default: ``3600``)
|
|
Timeout (in seconds) before retrying a pending message which is owned by an abandoned consumer
|
|
(if a worker died for some reason, this will occur, eventually you should retry the message).
|
|
|
|
``claim_interval`` (default: ``60000``)
|
|
Interval on which pending/abandoned messages should be checked for to claim - in milliseconds
|
|
|
|
``persistent_id`` (default: ``null``)
|
|
String, if null connection is non-persistent.
|
|
|
|
``retry_interval`` (default: ``0``)
|
|
Int, value in milliseconds
|
|
|
|
``read_timeout`` (default: ``0``)
|
|
Float, value in seconds default indicates unlimited
|
|
|
|
``timeout`` (default: ``0``)
|
|
Connection timeout. Float, value in seconds default indicates unlimited
|
|
|
|
``sentinel_master`` (default: ``null``)
|
|
String, if null or empty Sentinel support is disabled
|
|
|
|
``redis_sentinel`` (default: ``null``)
|
|
An alias of the ``sentinel_master`` option
|
|
|
|
.. versionadded:: 7.1
|
|
|
|
The ``redis_sentinel`` option was introduced in Symfony 7.1.
|
|
|
|
``ssl`` (default: ``null``)
|
|
Map of `SSL context options`_ for the TLS channel. This is useful for example
|
|
to change the requirements for the TLS channel in tests:
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/test/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
transports:
|
|
redis:
|
|
dsn: "rediss://localhost"
|
|
options:
|
|
ssl:
|
|
allow_self_signed: true
|
|
capture_peer_cert: true
|
|
capture_peer_cert_chain: true
|
|
disable_compression: true
|
|
SNI_enabled: true
|
|
verify_peer: true
|
|
verify_peer_name: true
|
|
|
|
.. warning::
|
|
|
|
There should never be more than one ``messenger:consume`` command running with the same
|
|
combination of ``stream``, ``group`` and ``consumer``, or messages could end up being
|
|
handled more than once. If you run multiple queue workers, ``consumer`` can be set to an
|
|
environment variable, like ``%env(MESSENGER_CONSUMER_NAME)%``, set by Supervisor
|
|
(example below) or any other service used to manage the worker processes.
|
|
In a container environment, the ``HOSTNAME`` can be used as the consumer name, since
|
|
there is only one worker per container/host. If using Kubernetes to orchestrate the
|
|
containers, consider using a ``StatefulSet`` to have stable names.
|
|
|
|
.. tip::
|
|
|
|
Set ``delete_after_ack`` to ``true`` (if you use a single group) or define
|
|
``stream_max_entries`` (if you can estimate how many max entries is acceptable
|
|
in your case) to avoid memory leaks. Otherwise, all messages will remain
|
|
forever in Redis.
|
|
|
|
The Redis transport supports the ``--keepalive`` option by using Redis's ``XCLAIM``
|
|
command to periodically reset the message's idle time to zero.
|
|
|
|
.. versionadded:: 7.3
|
|
|
|
Keepalive support was introduced in Symfony 7.3.
|
|
|
|
In Memory Transport
|
|
~~~~~~~~~~~~~~~~~~~
|
|
|
|
The ``in-memory`` transport does not actually deliver messages. Instead, it
|
|
holds them in memory during the request, which can be useful for testing.
|
|
For example, if you have an ``async_priority_normal`` transport, you could
|
|
override it in the ``test`` environment to use this transport:
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/test/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
transports:
|
|
async_priority_normal: 'in-memory://'
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/test/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<framework:messenger>
|
|
<framework:transport name="async_priority_normal" dsn="in-memory://"/>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/test/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
$messenger = $framework->messenger();
|
|
|
|
$messenger->transport('async_priority_normal')
|
|
->dsn('in-memory://');
|
|
};
|
|
|
|
Then, while testing, messages will *not* be delivered to the real transport.
|
|
Even better, in a test, you can check that exactly one message was sent
|
|
during a request::
|
|
|
|
// tests/Controller/DefaultControllerTest.php
|
|
namespace App\Tests\Controller;
|
|
|
|
use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
|
|
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport;
|
|
|
|
class DefaultControllerTest extends WebTestCase
|
|
{
|
|
public function testSomething(): void
|
|
{
|
|
$client = static::createClient();
|
|
// ...
|
|
|
|
$this->assertSame(200, $client->getResponse()->getStatusCode());
|
|
|
|
/** @var InMemoryTransport $transport */
|
|
$transport = $this->getContainer()->get('messenger.transport.async_priority_normal');
|
|
$this->assertCount(1, $transport->getSent());
|
|
}
|
|
}
|
|
|
|
The transport has a number of options:
|
|
|
|
``serialize`` (boolean, default: ``false``)
|
|
Whether to serialize messages or not. This is useful to test an additional
|
|
layer, especially when you use your own message serializer.
|
|
|
|
.. note::
|
|
|
|
All ``in-memory`` transports will be reset automatically after each test **in**
|
|
test classes extending
|
|
:class:`Symfony\\Bundle\\FrameworkBundle\\Test\\KernelTestCase`
|
|
or :class:`Symfony\\Bundle\\FrameworkBundle\\Test\\WebTestCase`.
|
|
|
|
Amazon SQS
|
|
~~~~~~~~~~
|
|
|
|
The Amazon SQS transport is perfect for applications hosted on AWS. Install it by
|
|
running:
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ composer require symfony/amazon-sqs-messenger
|
|
|
|
The SQS transport DSN may looks like this:
|
|
|
|
.. code-block:: env
|
|
|
|
# .env
|
|
MESSENGER_TRANSPORT_DSN=https://sqs.eu-west-3.amazonaws.com/123456789012/messages?access_key=AKIAIOSFODNN7EXAMPLE&secret_key=j17M97ffSVoKI0briFoo9a
|
|
MESSENGER_TRANSPORT_DSN=sqs://localhost:9494/messages?sslmode=disable
|
|
|
|
.. note::
|
|
|
|
The transport will automatically create queues that are needed. This
|
|
can be disabled by setting the ``auto_setup`` option to ``false``.
|
|
|
|
.. tip::
|
|
|
|
Before sending or receiving a message, Symfony needs to convert the queue
|
|
name into an AWS queue URL by calling the ``GetQueueUrl`` API in AWS. This
|
|
extra API call can be avoided by providing a DSN which is the queue URL.
|
|
|
|
The transport has a number of options:
|
|
|
|
``access_key``
|
|
AWS access key (must be urlencoded)
|
|
|
|
``account`` (default: The owner of the credentials)
|
|
Identifier of the AWS account
|
|
|
|
``auto_setup`` (default: ``true``)
|
|
Whether the queue should be created automatically during send / get.
|
|
|
|
``buffer_size`` (default: ``9``)
|
|
Number of messages to prefetch
|
|
|
|
``debug`` (default: ``false``)
|
|
If ``true`` it logs all HTTP requests and responses (it impacts performance)
|
|
|
|
``endpoint`` (default: ``https://sqs.eu-west-1.amazonaws.com``)
|
|
Absolute URL to the SQS service
|
|
|
|
``poll_timeout`` (default: ``0.1``)
|
|
Wait for new message duration in seconds
|
|
|
|
``queue_name`` (default: ``messages``)
|
|
Name of the queue
|
|
|
|
``queue_attributes``
|
|
Attributes of a queue as per `SQS CreateQueue API`_. Array of strings indexed by keys of ``AsyncAws\Sqs\Enum\QueueAttributeName``.
|
|
|
|
``queue_tags``
|
|
Cost allocation tags of a queue as per `SQS CreateQueue API`_. Array of strings indexed by strings.
|
|
|
|
``region`` (default: ``eu-west-1``)
|
|
Name of the AWS region
|
|
|
|
``secret_key``
|
|
AWS secret key (must be urlencoded)
|
|
|
|
``session_token``
|
|
AWS session token
|
|
|
|
``visibility_timeout`` (default: Queue's configuration)
|
|
Amount of seconds the message will not be visible (`Visibility Timeout`_)
|
|
|
|
``wait_time`` (default: ``20``)
|
|
`Long polling`_ duration in seconds
|
|
|
|
.. versionadded:: 7.3
|
|
|
|
The ``queue_attributes`` and ``queue_tags`` options were introduced in Symfony 7.3.
|
|
|
|
.. note::
|
|
|
|
The ``wait_time`` parameter defines the maximum duration Amazon SQS should
|
|
wait until a message is available in a queue before sending a response.
|
|
It helps reducing the cost of using Amazon SQS by eliminating the number
|
|
of empty responses.
|
|
|
|
The ``poll_timeout`` parameter defines the duration the receiver should wait
|
|
before returning null. It avoids blocking other receivers from being called.
|
|
|
|
.. note::
|
|
|
|
If the queue name is suffixed by ``.fifo``, AWS will create a `FIFO queue`_.
|
|
Use the stamp :class:`Symfony\\Component\\Messenger\\Bridge\\AmazonSqs\\Transport\\AmazonSqsFifoStamp`
|
|
to define the ``Message group ID`` and the ``Message deduplication ID``.
|
|
|
|
Another possibility is to enable the
|
|
:class:`Symfony\\Component\\Messenger\\Bridge\\AmazonSqs\\Middleware\\AddFifoStampMiddleware`.
|
|
If your message implements
|
|
:class:`Symfony\\Component\\Messenger\\Bridge\\AmazonSqs\\MessageDeduplicationAwareInterface`,
|
|
the middleware will automatically add the
|
|
:class:`Symfony\\Component\\Messenger\\Bridge\\AmazonSqs\\Transport\\AmazonSqsFifoStamp`
|
|
and set the ``Message deduplication ID``. Additionally, if your message implements the
|
|
:class:`Symfony\\Component\\Messenger\\Bridge\\AmazonSqs\\MessageGroupAwareInterface`,
|
|
the middleware will automatically set the ``Message group ID`` of the stamp.
|
|
|
|
You can learn more about middlewares in
|
|
:ref:`the dedicated section <messenger_middleware>`.
|
|
|
|
FIFO queues don't support setting a delay per message, a value of ``delay: 0``
|
|
is required in the retry strategy settings.
|
|
|
|
The SQS transport supports the ``--keepalive`` option by using the ``ChangeMessageVisibility``
|
|
action to periodically update the ``VisibilityTimeout`` of the message.
|
|
|
|
.. versionadded:: 7.2
|
|
|
|
Keepalive support was introduced in Symfony 7.2.
|
|
|
|
Serializing Messages
|
|
~~~~~~~~~~~~~~~~~~~~
|
|
|
|
When messages are sent to (and received from) a transport, they're serialized
|
|
using PHP's native ``serialize()`` & ``unserialize()`` functions. You can change
|
|
this globally (or for each transport) to a service that implements
|
|
:class:`Symfony\\Component\\Messenger\\Transport\\Serialization\\SerializerInterface`:
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
serializer:
|
|
default_serializer: messenger.transport.symfony_serializer
|
|
symfony_serializer:
|
|
format: json
|
|
context: { }
|
|
|
|
transports:
|
|
async_priority_normal:
|
|
dsn: # ...
|
|
serializer: messenger.transport.symfony_serializer
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<framework:messenger>
|
|
<framework:serializer default-serializer="messenger.transport.symfony_serializer">
|
|
<framework:symfony-serializer format="json">
|
|
<framework:context/>
|
|
</framework:symfony-serializer>
|
|
</framework:serializer>
|
|
|
|
<framework:transport name="async_priority_normal" dsn="..." serializer="messenger.transport.symfony_serializer"/>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
$messenger = $framework->messenger();
|
|
|
|
$messenger->serializer()
|
|
->defaultSerializer('messenger.transport.symfony_serializer')
|
|
->symfonySerializer()
|
|
->format('json')
|
|
->context('foo', 'bar');
|
|
|
|
$messenger->transport('async_priority_normal')
|
|
->dsn('...')
|
|
->serializer('messenger.transport.symfony_serializer');
|
|
};
|
|
|
|
The ``messenger.transport.symfony_serializer`` is a built-in service that uses
|
|
the :doc:`Serializer component </serializer>` and can be configured in a few ways.
|
|
If you *do* choose to use the Symfony serializer, you can control the context
|
|
on a case-by-case basis via the :class:`Symfony\\Component\\Messenger\\Stamp\\SerializerStamp`
|
|
(see `Envelopes & Stamps`_).
|
|
|
|
.. tip::
|
|
|
|
When sending/receiving messages to/from another application, you may need
|
|
more control over the serialization process. Using a custom serializer
|
|
provides that control. See `SymfonyCasts' message serializer tutorial`_ for
|
|
details.
|
|
|
|
Closing Connections
|
|
~~~~~~~~~~~~~~~~~~~
|
|
|
|
When using a transport that requires a connection, you can close it by calling the
|
|
:method:`Symfony\\Component\\Messenger\\Transport\\CloseableTransportInterface::close`
|
|
method to free up resources in long-running processes.
|
|
|
|
This interface is implemented by the following transports: AmazonSqs, Amqp, and Redis.
|
|
If you need to close a Doctrine connection, you can do so
|
|
:ref:`using middleware <middleware-for-doctrine>`.
|
|
|
|
.. versionadded:: 7.3
|
|
|
|
The ``CloseableTransportInterface`` and its ``close()`` method were introduced
|
|
in Symfony 7.3.
|
|
|
|
Running Commands And External Processes
|
|
---------------------------------------
|
|
|
|
Trigger a Command
|
|
~~~~~~~~~~~~~~~~~
|
|
|
|
It is possible to trigger any command by dispatching a
|
|
:class:`Symfony\\Component\\Console\\Messenger\\RunCommandMessage`. Symfony
|
|
will take care of handling this message and execute the command passed
|
|
to the message parameter::
|
|
|
|
use Symfony\Component\Console\Messenger\RunCommandMessage;
|
|
use Symfony\Component\Messenger\MessageBusInterface;
|
|
|
|
class CleanUpService
|
|
{
|
|
public function __construct(private readonly MessageBusInterface $bus)
|
|
{
|
|
}
|
|
|
|
public function cleanUp(): void
|
|
{
|
|
// Long task with some caching...
|
|
|
|
// Once finished, dispatch some clean up commands
|
|
$this->bus->dispatch(new RunCommandMessage('app:my-cache:clean-up --dir=var/temp'));
|
|
$this->bus->dispatch(new RunCommandMessage('cache:clear'));
|
|
}
|
|
}
|
|
|
|
You can configure the behavior in the case of something going wrong during command
|
|
execution. To do so, you can use the ``throwOnFailure`` and ``catchExceptions``
|
|
parameters when creating your instance of
|
|
:class:`Symfony\\Component\\Console\\Messenger\\RunCommandMessage`.
|
|
|
|
Once handled, the handler will return a
|
|
:class:`Symfony\\Component\\Console\\Messenger\\RunCommandContext` which
|
|
contains many useful information such as the exit code or the output of the
|
|
process. You can refer to the page dedicated on
|
|
:ref:`handler results <messenger-getting-handler-results>` for more information.
|
|
|
|
Trigger An External Process
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
Messenger comes with a handy helper to run external processes by
|
|
dispatching a message. This takes advantages of the
|
|
:doc:`Process component </components/process>`. By dispatching a
|
|
:class:`Symfony\\Component\\Process\\Messenger\\RunProcessMessage`, Messenger
|
|
will take care of creating a new process with the parameters you passed::
|
|
|
|
use Symfony\Component\Messenger\MessageBusInterface;
|
|
use Symfony\Component\Process\Messenger\RunProcessMessage;
|
|
|
|
class CleanUpService
|
|
{
|
|
public function __construct(
|
|
private readonly MessageBusInterface $bus,
|
|
) {
|
|
}
|
|
|
|
public function cleanUp(): void
|
|
{
|
|
$this->bus->dispatch(new RunProcessMessage(['rm', '-rf', 'var/log/temp/*'], cwd: '/my/custom/working-dir'));
|
|
|
|
// ...
|
|
}
|
|
}
|
|
|
|
If you want to use shell features such as redirections or pipes, use the static
|
|
:method:`Symfony\\Component\\Process\\Messenger\\RunProcessMessage::fromShellCommandline` factory method::
|
|
|
|
use Symfony\Component\Messenger\MessageBusInterface;
|
|
use Symfony\Component\Process\Messenger\RunProcessMessage;
|
|
|
|
class CleanUpService
|
|
{
|
|
public function __construct(
|
|
private readonly MessageBusInterface $bus,
|
|
) {
|
|
}
|
|
|
|
public function cleanUp(): void
|
|
{
|
|
$this->bus->dispatch(RunProcessMessage::fromShellCommandline('echo "Hello World" > var/log/hello.txt'));
|
|
|
|
// ...
|
|
}
|
|
}
|
|
|
|
For more information, read the documentation about
|
|
:ref:`using features from the OS shell <process-using-features-from-the-os-shell>`.
|
|
|
|
.. versionadded:: 7.3
|
|
|
|
The ``RunProcessMessage::fromShellCommandline()`` method was introduced in Symfony 7.3.
|
|
|
|
Once handled, the handler will return a
|
|
:class:`Symfony\\Component\\Process\\Messenger\\RunProcessContext` which
|
|
contains many useful information such as the exit code or the output of the
|
|
process. You can refer to the page dedicated on
|
|
:ref:`handler results <messenger-getting-handler-results>` for more information.
|
|
|
|
Pinging A Webservice
|
|
--------------------
|
|
|
|
Sometimes, you may need to regularly ping a webservice to get its status, e.g.
|
|
is it up or down. It is possible to do so by dispatching a
|
|
:class:`Symfony\\Component\\HttpClient\\Messenger\\PingWebhookMessage`::
|
|
|
|
use Symfony\Component\HttpClient\Messenger\PingWebhookMessage;
|
|
use Symfony\Component\Messenger\MessageBusInterface;
|
|
|
|
class LivenessService
|
|
{
|
|
public function __construct(private readonly MessageBusInterface $bus)
|
|
{
|
|
}
|
|
|
|
public function ping(): void
|
|
{
|
|
// An HttpExceptionInterface is thrown on 3xx/4xx/5xx
|
|
$this->bus->dispatch(new PingWebhookMessage('GET', 'https://example.com/status'));
|
|
|
|
// Ping, but does not throw on 3xx/4xx/5xx
|
|
$this->bus->dispatch(new PingWebhookMessage('GET', 'https://example.com/status', throw: false));
|
|
|
|
// Any valid HttpClientInterface option can be used
|
|
$this->bus->dispatch(new PingWebhookMessage('POST', 'https://example.com/status', [
|
|
'headers' => [
|
|
'Authorization' => 'Bearer ...'
|
|
],
|
|
'json' => [
|
|
'data' => 'some-data',
|
|
],
|
|
]));
|
|
}
|
|
}
|
|
|
|
The handler will return a
|
|
:class:`Symfony\\Contracts\\HttpClient\\ResponseInterface`, allowing you to
|
|
gather and process information returned by the HTTP request.
|
|
|
|
Getting Results from your Handlers
|
|
----------------------------------
|
|
|
|
When a message is handled, the :class:`Symfony\\Component\\Messenger\\Middleware\\HandleMessageMiddleware`
|
|
adds a :class:`Symfony\\Component\\Messenger\\Stamp\\HandledStamp` for each object that handled the message.
|
|
You can use this to get the value returned by the handler(s)::
|
|
|
|
use Symfony\Component\Messenger\MessageBusInterface;
|
|
use Symfony\Component\Messenger\Stamp\HandledStamp;
|
|
|
|
$envelope = $messageBus->dispatch(new SomeMessage());
|
|
|
|
// get the value that was returned by the last message handler
|
|
$handledStamp = $envelope->last(HandledStamp::class);
|
|
$handledStamp->getResult();
|
|
|
|
// or get info about all of handlers
|
|
$handledStamps = $envelope->all(HandledStamp::class);
|
|
|
|
.. _messenger-getting-handler-results:
|
|
|
|
Getting Results when Working with Command & Query Buses
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
The Messenger component can be used in CQRS architectures where command & query
|
|
buses are central pieces of the application. Read Martin Fowler's
|
|
`article about CQRS`_ to learn more and
|
|
:ref:`how to configure multiple buses <messenger-multiple-buses>`.
|
|
|
|
As queries are usually synchronous and expected to be handled once,
|
|
getting the result from the handler is a common need.
|
|
|
|
A :class:`Symfony\\Component\\Messenger\\HandleTrait` exists to get the result
|
|
of the handler when processing synchronously. It also ensures that exactly one
|
|
handler is registered. The ``HandleTrait`` can be used in any class that has a
|
|
``$messageBus`` property::
|
|
|
|
// src/Action/ListItems.php
|
|
namespace App\Action;
|
|
|
|
use App\Message\ListItemsQuery;
|
|
use App\MessageHandler\ListItemsQueryResult;
|
|
use Symfony\Component\Messenger\HandleTrait;
|
|
use Symfony\Component\Messenger\MessageBusInterface;
|
|
|
|
class ListItems
|
|
{
|
|
use HandleTrait;
|
|
|
|
// the parameter must be named $queryBus to trigger autowiring of the 'query.bus' service
|
|
public function __construct(MessageBusInterface $queryBus)
|
|
{
|
|
// HandleTrait requires a property named $messageBus
|
|
$this->messageBus = $queryBus;
|
|
}
|
|
|
|
public function __invoke(): void
|
|
{
|
|
$result = $this->query(new ListItemsQuery(/* ... */));
|
|
|
|
// Do something with the result
|
|
// ...
|
|
}
|
|
|
|
// Creating such a method is optional, but allows type-hinting the result
|
|
private function query(ListItemsQuery $query): ListItemsQueryResult
|
|
{
|
|
return $this->handle($query);
|
|
}
|
|
}
|
|
|
|
Therefore, you can use the trait to create command & query bus classes.
|
|
For example, you could create a special ``QueryBus`` class and inject it
|
|
wherever you need a query bus behavior instead of the ``MessageBusInterface``::
|
|
|
|
// src/MessageBus/QueryBus.php
|
|
namespace App\MessageBus;
|
|
|
|
use Symfony\Component\Messenger\Envelope;
|
|
use Symfony\Component\Messenger\HandleTrait;
|
|
use Symfony\Component\Messenger\MessageBusInterface;
|
|
|
|
class QueryBus
|
|
{
|
|
use HandleTrait;
|
|
|
|
// the parameter must be named $queryBus to trigger autowiring of the 'query.bus' service
|
|
public function __construct(MessageBusInterface $queryBus)
|
|
{
|
|
// HandleTrait requires a property named $messageBus
|
|
$this->messageBus = $queryBus;
|
|
}
|
|
|
|
/**
|
|
* @param object|Envelope $query
|
|
*
|
|
* @return mixed The handler returned value
|
|
*/
|
|
public function query($query): mixed
|
|
{
|
|
return $this->handle($query);
|
|
}
|
|
}
|
|
|
|
You can also add new stamps when handling a message; they will be appended
|
|
to the existing ones::
|
|
|
|
$this->handle(new SomeMessage($data), [new SomeStamp(), new AnotherStamp()]);
|
|
|
|
.. versionadded:: 7.3
|
|
|
|
The ``$stamps`` parameter of the ``handle()`` method was introduced in Symfony 7.3.
|
|
|
|
Customizing Handlers
|
|
--------------------
|
|
|
|
.. _messenger-handler-config:
|
|
|
|
Manually Configuring Handlers
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
Symfony will normally :ref:`find and register your handler automatically <messenger-handler>`.
|
|
But, you can also configure a handler manually - and pass it some extra config -
|
|
while using ``#AsMessageHandler`` attribute or tagging the handler service
|
|
with ``messenger.message_handler``.
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: php-attributes
|
|
|
|
// src/MessageHandler/SmsNotificationHandler.php
|
|
namespace App\MessageHandler;
|
|
|
|
use App\Message\OtherSmsNotification;
|
|
use App\Message\SmsNotification;
|
|
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
|
|
|
|
#[AsMessageHandler(fromTransport: 'async', priority: 10)]
|
|
class SmsNotificationHandler
|
|
{
|
|
public function __invoke(SmsNotification $message): void
|
|
{
|
|
// ...
|
|
}
|
|
}
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/services.yaml
|
|
services:
|
|
App\MessageHandler\SmsNotificationHandler:
|
|
tags: [messenger.message_handler]
|
|
|
|
# or configure with options
|
|
tags:
|
|
-
|
|
name: messenger.message_handler
|
|
# only needed if can't be guessed by type-hint
|
|
handles: App\Message\SmsNotification
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/services.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd">
|
|
|
|
<services>
|
|
<service id="App\MessageHandler\SmsNotificationHandler">
|
|
<!-- handles is only needed if it can't be guessed by type-hint -->
|
|
<tag name="messenger.message_handler"
|
|
handles="App\Message\SmsNotification"/>
|
|
</service>
|
|
</services>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/services.php
|
|
use App\Message\SmsNotification;
|
|
use App\MessageHandler\SmsNotificationHandler;
|
|
|
|
$container->register(SmsNotificationHandler::class)
|
|
->addTag('messenger.message_handler', [
|
|
// only needed if can't be guessed by type-hint
|
|
'handles' => SmsNotification::class,
|
|
]);
|
|
|
|
Possible options to configure with tags are:
|
|
|
|
``bus``
|
|
Name of the bus from which the handler can receive messages, by default all buses.
|
|
|
|
``from_transport``
|
|
Name of the transport from which the handler can receive messages, by default
|
|
all transports.
|
|
|
|
``handles``
|
|
Type of messages (FQCN) that can be processed by the handler, only needed if
|
|
can't be guessed by type-hint.
|
|
|
|
``method``
|
|
Name of the method that will process the message.
|
|
|
|
``priority``
|
|
Defines the order in which the handler is executed when multiple handlers
|
|
can process the same message; those with higher priority run first.
|
|
|
|
.. _handler-subscriber-options:
|
|
|
|
Handling Multiple Messages
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
A single handler class can handle multiple messages. For that add the
|
|
``#AsMessageHandler`` attribute to all the handling methods::
|
|
|
|
// src/MessageHandler/SmsNotificationHandler.php
|
|
namespace App\MessageHandler;
|
|
|
|
use App\Message\OtherSmsNotification;
|
|
use App\Message\SmsNotification;
|
|
|
|
class SmsNotificationHandler
|
|
{
|
|
#[AsMessageHandler]
|
|
public function handleSmsNotification(SmsNotification $message): void
|
|
{
|
|
// ...
|
|
}
|
|
|
|
#[AsMessageHandler]
|
|
public function handleOtherSmsNotification(OtherSmsNotification $message): void
|
|
{
|
|
// ...
|
|
}
|
|
}
|
|
|
|
.. _messenger-transactional-messages:
|
|
|
|
Transactional Messages: Handle New Messages After Handling is Done
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
A message handler can ``dispatch`` new messages while handling others, to either
|
|
the same or a different bus (if the application has
|
|
:ref:`multiple buses <messenger-multiple-buses>`). Any errors or exceptions that
|
|
occur during this process can have unintended consequences, such as:
|
|
|
|
#. If using the ``DoctrineTransactionMiddleware`` and a dispatched message throws
|
|
an exception, then any database transactions in the original handler will be
|
|
rolled back.
|
|
#. If the message is dispatched to a different bus, then the dispatched message
|
|
will be handled even if some code later in the current handler throws an exception.
|
|
|
|
An Example ``RegisterUser`` Process
|
|
...................................
|
|
|
|
Consider an application with both a *command* and an *event* bus. The application
|
|
dispatches a command named ``RegisterUser`` to the command bus. The command is
|
|
handled by the ``RegisterUserHandler`` which creates a ``User`` object, stores
|
|
that object to a database and dispatches a ``UserRegistered`` message to the event bus.
|
|
|
|
There are many handlers to the ``UserRegistered`` message, one handler may send
|
|
a welcome email to the new user. We are using the ``DoctrineTransactionMiddleware``
|
|
to wrap all database queries in one database transaction.
|
|
|
|
**Problem 1:** If an exception is thrown when sending the welcome email, then
|
|
the user will not be created because the ``DoctrineTransactionMiddleware`` will
|
|
rollback the Doctrine transaction, in which the user has been created.
|
|
|
|
**Problem 2:** If an exception is thrown when saving the user to the database,
|
|
the welcome email is still sent because it is handled asynchronously.
|
|
|
|
DispatchAfterCurrentBusMiddleware Middleware
|
|
............................................
|
|
|
|
For many applications, the desired behavior is to *only* handle messages that
|
|
are dispatched by a handler once that handler has fully finished. This can be done by
|
|
using the ``DispatchAfterCurrentBusMiddleware`` and adding a
|
|
``DispatchAfterCurrentBusStamp`` stamp to :ref:`the message Envelope <messenger-envelopes>`::
|
|
|
|
// src/Messenger/CommandHandler/RegisterUserHandler.php
|
|
namespace App\Messenger\CommandHandler;
|
|
|
|
use App\Entity\User;
|
|
use App\Messenger\Command\RegisterUser;
|
|
use App\Messenger\Event\UserRegistered;
|
|
use Doctrine\ORM\EntityManagerInterface;
|
|
use Symfony\Component\Messenger\Envelope;
|
|
use Symfony\Component\Messenger\MessageBusInterface;
|
|
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
|
|
|
|
class RegisterUserHandler
|
|
{
|
|
public function __construct(
|
|
private MessageBusInterface $eventBus,
|
|
private EntityManagerInterface $em,
|
|
) {
|
|
}
|
|
|
|
public function __invoke(RegisterUser $command): void
|
|
{
|
|
$user = new User($command->getUuid(), $command->getName(), $command->getEmail());
|
|
$this->em->persist($user);
|
|
|
|
// The DispatchAfterCurrentBusStamp marks the event message to be handled
|
|
// only if this handler does not throw an exception.
|
|
|
|
$event = new UserRegistered($command->getUuid());
|
|
$this->eventBus->dispatch(
|
|
(new Envelope($event))
|
|
->with(new DispatchAfterCurrentBusStamp())
|
|
);
|
|
|
|
// ...
|
|
}
|
|
}
|
|
|
|
.. code-block:: php
|
|
|
|
// src/Messenger/EventSubscriber/WhenUserRegisteredThenSendWelcomeEmail.php
|
|
namespace App\Messenger\EventSubscriber;
|
|
|
|
use App\Entity\User;
|
|
use App\Messenger\Event\UserRegistered;
|
|
use Doctrine\ORM\EntityManagerInterface;
|
|
use Symfony\Component\Mailer\MailerInterface;
|
|
use Symfony\Component\Mime\RawMessage;
|
|
|
|
class WhenUserRegisteredThenSendWelcomeEmail
|
|
{
|
|
public function __construct(
|
|
private MailerInterface $mailer,
|
|
private EntityManagerInterface $em,
|
|
) {
|
|
}
|
|
|
|
public function __invoke(UserRegistered $event): void
|
|
{
|
|
$user = $this->em->getRepository(User::class)->find($event->getUuid());
|
|
|
|
$this->mailer->send(new RawMessage('Welcome '.$user->getFirstName()));
|
|
}
|
|
}
|
|
|
|
This means that the ``UserRegistered`` message would not be handled until
|
|
*after* the ``RegisterUserHandler`` had completed and the new ``User`` was
|
|
persisted to the database. If the ``RegisterUserHandler`` encounters an
|
|
exception, the ``UserRegistered`` event will never be handled. And if an
|
|
exception is thrown while sending the welcome email, the Doctrine transaction
|
|
will not be rolled back.
|
|
|
|
.. note::
|
|
|
|
If ``WhenUserRegisteredThenSendWelcomeEmail`` throws an exception, that
|
|
exception will be wrapped into a ``DelayedMessageHandlingException``. Using
|
|
``DelayedMessageHandlingException::getWrappedExceptions`` will give you all
|
|
exceptions that are thrown while handling a message with the
|
|
``DispatchAfterCurrentBusStamp``.
|
|
|
|
The ``dispatch_after_current_bus`` middleware is enabled by default. If you're
|
|
configuring your middleware manually, be sure to register
|
|
``dispatch_after_current_bus`` before ``doctrine_transaction`` in the middleware
|
|
chain. Also, the ``dispatch_after_current_bus`` middleware must be loaded for
|
|
*all* of the buses being used.
|
|
|
|
Binding Handlers to Different Transports
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
Each message can have multiple handlers, and when a message is consumed
|
|
*all* of its handlers are called. But you can also configure a handler to only
|
|
be called when it's received from a *specific* transport. This allows you to
|
|
have a single message where each handler is called by a different "worker"
|
|
that's consuming a different transport.
|
|
|
|
Suppose you have an ``UploadedImage`` message with two handlers:
|
|
|
|
* ``ThumbnailUploadedImageHandler``: you want this to be handled by
|
|
a transport called ``image_transport``
|
|
|
|
* ``NotifyAboutNewUploadedImageHandler``: you want this to be handled
|
|
by a transport called ``async_priority_normal``
|
|
|
|
To do this, add the ``from_transport`` option to each handler. For example::
|
|
|
|
// src/MessageHandler/ThumbnailUploadedImageHandler.php
|
|
namespace App\MessageHandler;
|
|
|
|
use App\Message\UploadedImage;
|
|
|
|
#[AsMessageHandler(fromTransport: 'image_transport')]
|
|
class ThumbnailUploadedImageHandler
|
|
{
|
|
public function __invoke(UploadedImage $uploadedImage): void
|
|
{
|
|
// do some thumbnailing
|
|
}
|
|
}
|
|
|
|
And similarly::
|
|
|
|
// src/MessageHandler/NotifyAboutNewUploadedImageHandler.php
|
|
// ...
|
|
|
|
#[AsMessageHandler(fromTransport: 'async_priority_normal')]
|
|
class NotifyAboutNewUploadedImageHandler
|
|
{
|
|
// ...
|
|
}
|
|
|
|
Then, make sure to "route" your message to *both* transports:
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
transports:
|
|
async_priority_normal: # ...
|
|
image_transport: # ...
|
|
|
|
routing:
|
|
# ...
|
|
'App\Message\UploadedImage': [image_transport, async_priority_normal]
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<framework:messenger>
|
|
<framework:transport name="async_priority_normal" dsn="..."/>
|
|
<framework:transport name="image_transport" dsn="..."/>
|
|
|
|
<framework:routing message-class="App\Message\UploadedImage">
|
|
<framework:sender service="image_transport"/>
|
|
<framework:sender service="async_priority_normal"/>
|
|
</framework:routing>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
$messenger = $framework->messenger();
|
|
|
|
$messenger->transport('async_priority_normal')->dsn('...');
|
|
$messenger->transport('image_transport')->dsn('...');
|
|
|
|
$messenger->routing('App\Message\UploadedImage')
|
|
->senders(['image_transport', 'async_priority_normal']);
|
|
};
|
|
|
|
That's it! You can now consume each transport:
|
|
|
|
.. code-block:: terminal
|
|
|
|
# will only call ThumbnailUploadedImageHandler when handling the message
|
|
$ php bin/console messenger:consume image_transport -vv
|
|
|
|
$ php bin/console messenger:consume async_priority_normal -vv
|
|
|
|
.. warning::
|
|
|
|
If a handler does *not* have ``from_transport`` config, it will be executed
|
|
on *every* transport that the message is received from.
|
|
|
|
Process Messages by Batches
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
You can declare "special" handlers which will process messages by batch.
|
|
By doing so, the handler will wait for a certain amount of messages to be
|
|
pending before processing them. The declaration of a batch handler is done
|
|
by implementing
|
|
:class:`Symfony\\Component\\Messenger\\Handler\\BatchHandlerInterface`. The
|
|
:class:`Symfony\\Component\\Messenger\\Handler\\BatchHandlerTrait` is also
|
|
provided in order to ease the declaration of these special handlers::
|
|
|
|
use Symfony\Component\Messenger\Handler\Acknowledger;
|
|
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
|
|
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
|
|
|
|
class MyBatchHandler implements BatchHandlerInterface
|
|
{
|
|
use BatchHandlerTrait;
|
|
|
|
public function __invoke(MyMessage $message, ?Acknowledger $ack = null): mixed
|
|
{
|
|
return $this->handle($message, $ack);
|
|
}
|
|
|
|
private function process(array $jobs): void
|
|
{
|
|
foreach ($jobs as [$message, $ack]) {
|
|
try {
|
|
// Compute $result from $message...
|
|
|
|
// Acknowledge the processing of the message
|
|
$ack->ack($result);
|
|
} catch (\Throwable $e) {
|
|
$ack->nack($e);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Optionally, you can override some of the trait methods, such as the
|
|
// `getBatchSize()` method, to specify your own batch size...
|
|
private function getBatchSize(): int
|
|
{
|
|
return 100;
|
|
}
|
|
}
|
|
|
|
.. note::
|
|
|
|
When the ``$ack`` argument of ``__invoke()`` is ``null``, the message is
|
|
expected to be handled synchronously. Otherwise, ``__invoke()`` is
|
|
expected to return the number of pending messages. The
|
|
:class:`Symfony\\Component\\Messenger\\Handler\\BatchHandlerTrait` handles
|
|
this for you.
|
|
|
|
.. note::
|
|
|
|
By default, pending batches are flushed when the worker is idle as well
|
|
as when it is stopped.
|
|
|
|
Extending Messenger
|
|
-------------------
|
|
|
|
Envelopes & Stamps
|
|
~~~~~~~~~~~~~~~~~~
|
|
|
|
A message can be any PHP object. Sometimes, you may need to configure something
|
|
extra about the message - like the way it should be handled inside AMQP or adding
|
|
a delay before the message should be handled. You can do that by adding a "stamp"
|
|
to your message::
|
|
|
|
use Symfony\Component\Messenger\Envelope;
|
|
use Symfony\Component\Messenger\MessageBusInterface;
|
|
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
|
|
|
public function index(MessageBusInterface $bus): void
|
|
{
|
|
// wait 5 seconds before processing
|
|
$bus->dispatch(new SmsNotification('...'), [
|
|
new DelayStamp(5000),
|
|
]);
|
|
|
|
// or explicitly create an Envelope
|
|
$bus->dispatch(new Envelope(new SmsNotification('...'), [
|
|
new DelayStamp(5000),
|
|
]));
|
|
|
|
// ...
|
|
}
|
|
|
|
Internally, each message is wrapped in an ``Envelope``, which holds the message
|
|
and stamps. You can create this manually or allow the message bus to do it. There
|
|
are a variety of different stamps for different purposes and they're used internally
|
|
to track information about a message - like the message bus that's handling it
|
|
or if it's being retried after failure.
|
|
|
|
.. _messenger_middleware:
|
|
|
|
Middleware
|
|
~~~~~~~~~~
|
|
|
|
What happens when you dispatch a message to a message bus depends on its
|
|
collection of middleware and their order. By default, the middleware configured
|
|
for each bus looks like this:
|
|
|
|
#. ``add_bus_name_stamp_middleware`` - adds a stamp to record which bus this
|
|
message was dispatched into;
|
|
|
|
#. ``dispatch_after_current_bus``- see :ref:`messenger-transactional-messages`;
|
|
|
|
#. ``failed_message_processing_middleware`` - processes messages that are being
|
|
retried via the :ref:`failure transport <messenger-failure-transport>` to make
|
|
them properly function as if they were being received from their original transport;
|
|
|
|
#. Your own collection of middleware_;
|
|
|
|
#. ``send_message`` - if routing is configured for the transport, this sends
|
|
messages to that transport and stops the middleware chain;
|
|
|
|
#. ``handle_message`` - calls the message handler(s) for the given message.
|
|
|
|
.. note::
|
|
|
|
These middleware names are actually shortcut names. The real service ids
|
|
are prefixed with ``messenger.middleware.`` (e.g. ``messenger.middleware.handle_message``).
|
|
|
|
The middleware are executed when the message is dispatched but *also* again when
|
|
a message is received via the worker (for messages that were sent to a transport
|
|
to be handled asynchronously). Keep this in mind if you create your own middleware.
|
|
|
|
You can add your own middleware to this list, or completely disable the default
|
|
middleware and *only* include your own.
|
|
|
|
If a middleware service is abstract, you can configure its constructor's arguments
|
|
and a different instance will be created per bus.
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
buses:
|
|
messenger.bus.default:
|
|
# disable the default middleware
|
|
default_middleware: false
|
|
|
|
middleware:
|
|
# use and configure parts of the default middleware you want
|
|
- 'add_bus_name_stamp_middleware': ['messenger.bus.default']
|
|
|
|
# add your own services that implement Symfony\Component\Messenger\Middleware\MiddlewareInterface
|
|
- 'App\Middleware\MyMiddleware'
|
|
- 'App\Middleware\AnotherMiddleware'
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<framework:messenger>
|
|
<!-- default-middleware: disable the default middleware -->
|
|
<framework:bus name="messenger.bus.default" default-middleware="false">
|
|
|
|
<!-- use and configure parts of the default middleware you want -->
|
|
<framework:middleware id="add_bus_name_stamp_middleware">
|
|
<framework:argument>messenger.bus.default</framework:argument>
|
|
</framework:middleware>
|
|
|
|
<!-- add your own services that implement Symfony\Component\Messenger\Middleware\MiddlewareInterface -->
|
|
<framework:middleware id="App\Middleware\MyMiddleware"/>
|
|
<framework:middleware id="App\Middleware\AnotherMiddleware"/>
|
|
</framework:bus>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
$messenger = $framework->messenger();
|
|
|
|
$bus = $messenger->bus('messenger.bus.default')
|
|
->defaultMiddleware(false); // disable the default middleware
|
|
|
|
// use and configure parts of the default middleware you want
|
|
$bus->middleware()->id('add_bus_name_stamp_middleware')->arguments(['messenger.bus.default']);
|
|
|
|
// add your own services that implement Symfony\Component\Messenger\Middleware\MiddlewareInterface
|
|
$bus->middleware()->id('App\Middleware\MyMiddleware');
|
|
$bus->middleware()->id('App\Middleware\AnotherMiddleware');
|
|
};
|
|
|
|
.. tip::
|
|
|
|
If you have installed the MakerBundle, you can use the ``make:messenger-middleware``
|
|
command to bootstrap the creation of your own messenger middleware.
|
|
|
|
Message Deduplication
|
|
~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
.. versionadded:: 7.3
|
|
|
|
Message deduplication was introduced in Symfony 7.3.
|
|
|
|
Symfony provides a middleware to prevent the same message from being
|
|
dispatched or processed multiple times using :doc:`locks </lock>`.
|
|
|
|
This behavior is enabled by adding a
|
|
:class:`Symfony\\Component\\Messenger\\Stamp\\DeduplicateStamp`
|
|
to the message envelope. The middleware uses the stamp ``key`` to determine
|
|
whether a message should be skipped::
|
|
|
|
use Symfony\Component\Messenger\Stamp\DeduplicateStamp;
|
|
|
|
$message = new MyMessage($projectId);
|
|
|
|
// prevent processing multiple messages for the same project at the same time
|
|
$deduplicationKey = 'my_message.project.'.$projectId;
|
|
|
|
$bus->dispatch($message, [
|
|
new DeduplicateStamp($deduplicationKey),
|
|
]);
|
|
|
|
// use the second argument to define the TTL of the lock (300 seconds by default)
|
|
new DeduplicateStamp($deduplicationKey, 3600),
|
|
|
|
The deduplication key is a **business-level choice**: it encodes what "same message"
|
|
means for your application. It does not need to be globally unique, but it should
|
|
identify when two messages should be considered duplicates (for example, using a
|
|
project ID, an order ID, or a combination of relevant fields).
|
|
|
|
By default, deduplication applies while the message is in the queue and while
|
|
it is being processed. The lock is released when processing finishes. If you want
|
|
deduplication only while the message is queued, set the third argument to ``true``::
|
|
|
|
new DeduplicateStamp($deduplicationKey, 300, true)
|
|
|
|
In this mode, the lock is released as soon as the worker receives the message,
|
|
so another message with the same key can be processed concurrently.
|
|
|
|
.. note::
|
|
|
|
This middleware is automatically enabled when the :doc:`Lock component </lock>`
|
|
is installed.
|
|
|
|
.. _middleware-doctrine:
|
|
|
|
Middleware for Doctrine
|
|
~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
If you use Doctrine in your app, a number of optional middleware exist that you
|
|
may want to use:
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
buses:
|
|
command_bus:
|
|
middleware:
|
|
# each time a message is handled, the Doctrine connection
|
|
# is "pinged" and reconnected if it's closed. Useful
|
|
# if your workers run for a long time and the database
|
|
# connection is sometimes lost
|
|
- doctrine_ping_connection
|
|
|
|
# After handling, the Doctrine connection is closed,
|
|
# which can free up database connections in a worker,
|
|
# instead of keeping them open forever
|
|
- doctrine_close_connection
|
|
|
|
# logs an error when a Doctrine transaction was opened but not closed
|
|
- doctrine_open_transaction_logger
|
|
|
|
# wraps all handlers in a single Doctrine transaction
|
|
# handlers do not need to call flush() and an error
|
|
# in any handler will cause a rollback
|
|
- doctrine_transaction
|
|
|
|
# or pass a different entity manager to any
|
|
#- doctrine_transaction: ['custom']
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<framework:messenger>
|
|
<framework:bus name="command_bus">
|
|
<framework:middleware id="doctrine_transaction"/>
|
|
<framework:middleware id="doctrine_ping_connection"/>
|
|
<framework:middleware id="doctrine_close_connection"/>
|
|
<framework:middleware id="doctrine_open_transaction_logger"/>
|
|
|
|
<!-- or pass a different entity manager to any -->
|
|
<!--
|
|
<framework:middleware id="doctrine_transaction">
|
|
<framework:argument>custom</framework:argument>
|
|
</framework:middleware>
|
|
-->
|
|
</framework:bus>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
$messenger = $framework->messenger();
|
|
|
|
$bus = $messenger->bus('command_bus');
|
|
$bus->middleware()->id('doctrine_transaction');
|
|
$bus->middleware()->id('doctrine_ping_connection');
|
|
$bus->middleware()->id('doctrine_close_connection');
|
|
$bus->middleware()->id('doctrine_open_transaction_logger');
|
|
// Using another entity manager
|
|
$bus->middleware()->id('doctrine_transaction')
|
|
->arguments(['custom']);
|
|
};
|
|
|
|
Other Middlewares
|
|
~~~~~~~~~~~~~~~~~
|
|
|
|
Add the ``router_context`` middleware if you need to generate absolute URLs in
|
|
the consumer (e.g. render a template with links). This middleware stores the
|
|
original request context (i.e. the host, the HTTP port, etc.) which is needed
|
|
when building absolute URLs.
|
|
|
|
Add the ``validation`` middleware if you need to validate the message
|
|
object using the :doc:`Validator component </components/validator>` before handling it.
|
|
If validation fails, a ``ValidationFailedException`` will be thrown. The
|
|
:class:`Symfony\\Component\\Messenger\\Stamp\\ValidationStamp` can be used
|
|
to configure the validation groups.
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
buses:
|
|
command_bus:
|
|
middleware:
|
|
- router_context
|
|
- validation
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<framework:messenger>
|
|
<framework:bus name="command_bus">
|
|
<framework:middleware id="router_context"/>
|
|
<framework:middleware id="validation"/>
|
|
</framework:bus>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
$messenger = $framework->messenger();
|
|
|
|
$bus = $messenger->bus('command_bus');
|
|
$bus->middleware()->id('router_context');
|
|
$bus->middleware()->id('validation');
|
|
};
|
|
|
|
Messenger Events
|
|
~~~~~~~~~~~~~~~~
|
|
|
|
In addition to middleware, Messenger also dispatches several events. You can
|
|
:doc:`create an event listener </event_dispatcher>` to hook into various parts
|
|
of the process. For each, the event class is the event name:
|
|
|
|
* :class:`Symfony\\Component\\Messenger\\Event\\SendMessageToTransportsEvent`
|
|
* :class:`Symfony\\Component\\Messenger\\Event\\WorkerMessageFailedEvent`
|
|
* :class:`Symfony\\Component\\Messenger\\Event\\WorkerMessageHandledEvent`
|
|
* :class:`Symfony\\Component\\Messenger\\Event\\WorkerMessageReceivedEvent`
|
|
* :class:`Symfony\\Component\\Messenger\\Event\\WorkerMessageRetriedEvent`
|
|
* :class:`Symfony\\Component\\Messenger\\Event\\WorkerRateLimitedEvent`
|
|
* :class:`Symfony\\Component\\Messenger\\Event\\WorkerRunningEvent`
|
|
* :class:`Symfony\\Component\\Messenger\\Event\\WorkerStartedEvent`
|
|
* :class:`Symfony\\Component\\Messenger\\Event\\WorkerStoppedEvent`
|
|
|
|
Additional Handler Arguments
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
It's possible to have messenger pass additional data to the message handler
|
|
using the :class:`Symfony\\Component\\Messenger\\Stamp\\HandlerArgumentsStamp`.
|
|
Add this stamp to the envelope in a middleware and fill it with any additional
|
|
data you want to have available in the handler::
|
|
|
|
// src/Messenger/AdditionalArgumentMiddleware.php
|
|
namespace App\Messenger;
|
|
|
|
use Symfony\Component\Messenger\Envelope;
|
|
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
|
|
use Symfony\Component\Messenger\Middleware\StackInterface;
|
|
use Symfony\Component\Messenger\Stamp\HandlerArgumentsStamp;
|
|
|
|
final class AdditionalArgumentMiddleware implements MiddlewareInterface
|
|
{
|
|
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
|
{
|
|
$envelope = $envelope->with(new HandlerArgumentsStamp([
|
|
$this->resolveAdditionalArgument($envelope->getMessage()),
|
|
]));
|
|
|
|
return $stack->next()->handle($envelope, $stack);
|
|
}
|
|
|
|
private function resolveAdditionalArgument(object $message): mixed
|
|
{
|
|
// ...
|
|
}
|
|
}
|
|
|
|
Then your handler will look like this::
|
|
|
|
// src/MessageHandler/SmsNotificationHandler.php
|
|
namespace App\MessageHandler;
|
|
|
|
use App\Message\SmsNotification;
|
|
|
|
final class SmsNotificationHandler
|
|
{
|
|
public function __invoke(SmsNotification $message, mixed $additionalArgument)
|
|
{
|
|
// ...
|
|
}
|
|
}
|
|
|
|
Message Serializer For Custom Data Formats
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
If you receive messages from other applications, it's possible that they are not
|
|
exactly in the format you need. Not all applications will return a JSON message
|
|
with ``body`` and ``headers`` fields. In those cases, you'll need to create a
|
|
new message serializer implementing the
|
|
:class:`Symfony\\Component\\Messenger\\Transport\\Serialization\\SerializerInterface`.
|
|
Let's say you want to create a message decoder::
|
|
|
|
namespace App\Messenger\Serializer;
|
|
|
|
use Symfony\Component\Messenger\Envelope;
|
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
|
|
|
class MessageWithTokenDecoder implements SerializerInterface
|
|
{
|
|
public function decode(array $encodedEnvelope): Envelope
|
|
{
|
|
try {
|
|
// parse the data you received with your custom fields
|
|
$data = $encodedEnvelope['data'];
|
|
$data['token'] = $encodedEnvelope['token'];
|
|
|
|
// other operations like getting information from stamps
|
|
} catch (\Throwable $throwable) {
|
|
// wrap any exception that may occur in the envelope to send it to the failure transport
|
|
return new Envelope($throwable);
|
|
}
|
|
|
|
return new Envelope($data);
|
|
}
|
|
|
|
public function encode(Envelope $envelope): array
|
|
{
|
|
// this decoder does not encode messages, but you can implement it by returning
|
|
// an array with serialized stamps if you need to send messages in a custom format
|
|
throw new \LogicException('This serializer is only used for decoding messages.');
|
|
}
|
|
}
|
|
|
|
The next step is to tell Symfony to use this serializer in one or more of your
|
|
transports:
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/packages/messenger.yaml
|
|
framework:
|
|
messenger:
|
|
transports:
|
|
my_transport:
|
|
dsn: '%env(MY_TRANSPORT_DSN)%'
|
|
serializer: 'App\Messenger\Serializer\MessageWithTokenDecoder'
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<framework:messenger>
|
|
<framework:transport name="my_transport" dsn="%env(MY_TRANSPORT_DSN)%" serializer="App\Messenger\Serializer\MessageWithTokenDecoder">
|
|
<!-- ... -->
|
|
</framework:transport>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use App\Messenger\Serializer\MessageWithTokenDecoder;
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
$messenger = $framework->messenger();
|
|
|
|
$messenger->transport('my_transport')
|
|
->dsn('%env(MY_TRANSPORT_DSN)%')
|
|
->serializer(MessageWithTokenDecoder::class);
|
|
};
|
|
|
|
.. _messenger-multiple-buses:
|
|
|
|
Multiple Buses, Command & Event Buses
|
|
-------------------------------------
|
|
|
|
Messenger gives you a single message bus service by default. But, you can configure
|
|
as many as you want, creating "command", "query" or "event" buses and controlling
|
|
their middleware.
|
|
|
|
A common architecture when building applications is to separate commands from
|
|
queries. Commands are actions that do something and queries fetch data. This
|
|
is called CQRS (Command Query Responsibility Segregation). See Martin Fowler's
|
|
`article about CQRS`_ to learn more. This architecture could be used together
|
|
with the Messenger component by defining multiple buses.
|
|
|
|
A **command bus** is a little different from a **query bus**. For example, command
|
|
buses usually don't provide any results and query buses are rarely asynchronous.
|
|
You can configure these buses and their rules by using middleware.
|
|
|
|
It might also be a good idea to separate actions from reactions by introducing
|
|
an **event bus**. The event bus could have zero or more subscribers.
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
framework:
|
|
messenger:
|
|
# The bus that is going to be injected when injecting MessageBusInterface
|
|
default_bus: command.bus
|
|
buses:
|
|
command.bus:
|
|
middleware:
|
|
- validation
|
|
- doctrine_transaction
|
|
query.bus:
|
|
middleware:
|
|
- validation
|
|
event.bus:
|
|
default_middleware:
|
|
enabled: true
|
|
# set "allow_no_handlers" to true (default is false) to allow having
|
|
# no handler configured for this bus without throwing an exception
|
|
allow_no_handlers: false
|
|
# set "allow_no_senders" to false (default is true) to throw an exception
|
|
# if no sender is configured for this bus
|
|
allow_no_senders: true
|
|
middleware:
|
|
- validation
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/packages/messenger.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xmlns:framework="http://symfony.com/schema/dic/symfony"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd
|
|
http://symfony.com/schema/dic/symfony
|
|
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
|
|
|
|
<framework:config>
|
|
<!-- The bus that is going to be injected when injecting MessageBusInterface -->
|
|
<framework:messenger default-bus="command.bus">
|
|
<framework:bus name="command.bus">
|
|
<framework:middleware id="validation"/>
|
|
<framework:middleware id="doctrine_transaction"/>
|
|
</framework:bus>
|
|
<framework:bus name="query.bus">
|
|
<framework:middleware id="validation"/>
|
|
</framework:bus>
|
|
<framework:bus name="event.bus">
|
|
<!-- set "allow-no-handlers" to true (default is false) to allow having
|
|
no handler configured for this bus without throwing an exception -->
|
|
<!-- set "allow-no-senders" to false (default is true) to throw an exception
|
|
if no sender is configured for this bus -->
|
|
<framework:default-middleware enabled="true" allow-no-handlers="false" allow-no-senders="true"/>
|
|
<framework:middleware id="validation"/>
|
|
</framework:bus>
|
|
</framework:messenger>
|
|
</framework:config>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/packages/messenger.php
|
|
use Symfony\Config\FrameworkConfig;
|
|
|
|
return static function (FrameworkConfig $framework): void {
|
|
// The bus that is going to be injected when injecting MessageBusInterface
|
|
$framework->messenger()->defaultBus('command.bus');
|
|
|
|
$commandBus = $framework->messenger()->bus('command.bus');
|
|
$commandBus->middleware()->id('validation');
|
|
$commandBus->middleware()->id('doctrine_transaction');
|
|
|
|
$queryBus = $framework->messenger()->bus('query.bus');
|
|
$queryBus->middleware()->id('validation');
|
|
|
|
$eventBus = $framework->messenger()->bus('event.bus');
|
|
$eventBus->defaultMiddleware()
|
|
->enabled(true)
|
|
// set "allowNoHandlers" to true (default is false) to allow having
|
|
// no handler configured for this bus without throwing an exception
|
|
->allowNoHandlers(false)
|
|
// set "allowNoSenders" to false (default is true) to throw an exception
|
|
// if no sender is configured for this bus
|
|
->allowNoSenders(true)
|
|
;
|
|
$eventBus->middleware()->id('validation');
|
|
};
|
|
|
|
This will create three new services:
|
|
|
|
* ``command.bus``: autowireable with the :class:`Symfony\\Component\\Messenger\\MessageBusInterface`
|
|
type-hint (because this is the ``default_bus``);
|
|
|
|
* ``query.bus``: autowireable with ``MessageBusInterface $queryBus``;
|
|
|
|
* ``event.bus``: autowireable with ``MessageBusInterface $eventBus``.
|
|
|
|
Restrict Handlers per Bus
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
By default, each handler will be available to handle messages on *all*
|
|
of your buses. To prevent dispatching a message to the wrong bus without an error,
|
|
you can restrict each handler to a specific bus using the ``messenger.message_handler`` tag:
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/services.yaml
|
|
services:
|
|
App\MessageHandler\SomeCommandHandler:
|
|
tags: [{ name: messenger.message_handler, bus: command.bus }]
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/services.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd">
|
|
|
|
<services>
|
|
<service id="App\MessageHandler\SomeCommandHandler">
|
|
<tag name="messenger.message_handler" bus="command.bus"/>
|
|
</service>
|
|
</services>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/services.php
|
|
$container->services()
|
|
->set(App\MessageHandler\SomeCommandHandler::class)
|
|
->tag('messenger.message_handler', ['bus' => 'command.bus']);
|
|
|
|
This way, the ``App\MessageHandler\SomeCommandHandler`` handler will only be
|
|
known by the ``command.bus`` bus.
|
|
|
|
You can also automatically add this tag to a number of classes by using
|
|
the :ref:`_instanceof service configuration <di-instanceof>`. Using this,
|
|
you can determine the message bus based on an implemented interface:
|
|
|
|
.. configuration-block::
|
|
|
|
.. code-block:: yaml
|
|
|
|
# config/services.yaml
|
|
services:
|
|
# ...
|
|
|
|
_instanceof:
|
|
# all services implementing the CommandHandlerInterface
|
|
# will be registered on the command.bus bus
|
|
App\MessageHandler\CommandHandlerInterface:
|
|
tags:
|
|
- { name: messenger.message_handler, bus: command.bus }
|
|
|
|
# while those implementing QueryHandlerInterface will be
|
|
# registered on the query.bus bus
|
|
App\MessageHandler\QueryHandlerInterface:
|
|
tags:
|
|
- { name: messenger.message_handler, bus: query.bus }
|
|
|
|
.. code-block:: xml
|
|
|
|
<!-- config/services.xml -->
|
|
<?xml version="1.0" encoding="UTF-8" ?>
|
|
<container xmlns="http://symfony.com/schema/dic/services"
|
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
xsi:schemaLocation="http://symfony.com/schema/dic/services
|
|
https://symfony.com/schema/dic/services/services-1.0.xsd">
|
|
|
|
<services>
|
|
<!-- ... -->
|
|
|
|
<!-- all services implementing the CommandHandlerInterface
|
|
will be registered on the command.bus bus -->
|
|
<instanceof id="App\MessageHandler\CommandHandlerInterface">
|
|
<tag name="messenger.message_handler" bus="command.bus"/>
|
|
</instanceof>
|
|
|
|
<!-- while those implementing QueryHandlerInterface will be
|
|
registered on the query.bus bus -->
|
|
<instanceof id="App\MessageHandler\QueryHandlerInterface">
|
|
<tag name="messenger.message_handler" bus="query.bus"/>
|
|
</instanceof>
|
|
</services>
|
|
</container>
|
|
|
|
.. code-block:: php
|
|
|
|
// config/services.php
|
|
namespace Symfony\Component\DependencyInjection\Loader\Configurator;
|
|
|
|
use App\MessageHandler\CommandHandlerInterface;
|
|
use App\MessageHandler\QueryHandlerInterface;
|
|
|
|
return function(ContainerConfigurator $container): void {
|
|
$services = $container->services();
|
|
|
|
// ...
|
|
|
|
// all services implementing the CommandHandlerInterface
|
|
// will be registered on the command.bus bus
|
|
$services->instanceof(CommandHandlerInterface::class)
|
|
->tag('messenger.message_handler', ['bus' => 'command.bus']);
|
|
|
|
// while those implementing QueryHandlerInterface will be
|
|
// registered on the query.bus bus
|
|
$services->instanceof(QueryHandlerInterface::class)
|
|
->tag('messenger.message_handler', ['bus' => 'query.bus']);
|
|
};
|
|
|
|
Debugging the Buses
|
|
~~~~~~~~~~~~~~~~~~~
|
|
|
|
The ``debug:messenger`` command lists available messages & handlers per bus.
|
|
You can also restrict the list to a specific bus by providing its name as an argument.
|
|
|
|
.. code-block:: terminal
|
|
|
|
$ php bin/console debug:messenger
|
|
|
|
Messenger
|
|
=========
|
|
|
|
command.bus
|
|
-----------
|
|
|
|
The following messages can be dispatched:
|
|
|
|
---------------------------------------------------------------------------------------
|
|
App\Message\DummyCommand
|
|
handled by App\MessageHandler\DummyCommandHandler
|
|
App\Message\MultipleBusesMessage
|
|
handled by App\MessageHandler\MultipleBusesMessageHandler
|
|
---------------------------------------------------------------------------------------
|
|
|
|
query.bus
|
|
---------
|
|
|
|
The following messages can be dispatched:
|
|
|
|
---------------------------------------------------------------------------------------
|
|
App\Message\DummyQuery
|
|
handled by App\MessageHandler\DummyQueryHandler
|
|
App\Message\MultipleBusesMessage
|
|
handled by App\MessageHandler\MultipleBusesMessageHandler
|
|
---------------------------------------------------------------------------------------
|
|
|
|
.. tip::
|
|
|
|
The command will also show the PHPDoc description of the message and handler classes.
|
|
|
|
Redispatching a Message
|
|
-----------------------
|
|
|
|
If you want to redispatch a message (using the same transport and envelope), create
|
|
a new :class:`Symfony\\Component\\Messenger\\Message\\RedispatchMessage` and dispatch
|
|
it through your bus. Reusing the same ``SmsNotification`` example shown earlier::
|
|
|
|
// src/MessageHandler/SmsNotificationHandler.php
|
|
namespace App\MessageHandler;
|
|
|
|
use App\Message\SmsNotification;
|
|
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
|
|
use Symfony\Component\Messenger\Message\RedispatchMessage;
|
|
use Symfony\Component\Messenger\MessageBusInterface;
|
|
|
|
#[AsMessageHandler]
|
|
class SmsNotificationHandler
|
|
{
|
|
public function __construct(private MessageBusInterface $bus)
|
|
{
|
|
}
|
|
|
|
public function __invoke(SmsNotification $message): void
|
|
{
|
|
// do something with the message
|
|
// then redispatch it based on your own logic
|
|
|
|
if ($needsRedispatch) {
|
|
$this->bus->dispatch(new RedispatchMessage($message));
|
|
}
|
|
}
|
|
}
|
|
|
|
The built-in :class:`Symfony\\Component\\Messenger\\Handler\\RedispatchMessageHandler`
|
|
will take care of this message to redispatch it through the same bus it was
|
|
dispatched at first. You can also use the second argument of the ``RedispatchMessage``
|
|
constructor to provide transports to use when redispatching the message.
|
|
|
|
Learn more
|
|
----------
|
|
|
|
.. toctree::
|
|
:maxdepth: 1
|
|
:glob:
|
|
|
|
/messenger/*
|
|
|
|
.. _`Enqueue's transport`: https://github.com/sroze/messenger-enqueue-transport
|
|
.. _`streams`: https://redis.io/topics/streams-intro
|
|
.. _`Supervisor docs`: http://supervisord.org/
|
|
.. _`PCNTL`: https://www.php.net/manual/book.pcntl.php
|
|
.. _`systemd docs`: https://systemd.io/
|
|
.. _`SymfonyCasts' message serializer tutorial`: https://symfonycasts.com/screencast/messenger/transport-serializer
|
|
.. _`Long polling`: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html
|
|
.. _`Visibility Timeout`: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html
|
|
.. _`FIFO queue`: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html
|
|
.. _`LISTEN/NOTIFY`: https://www.postgresql.org/docs/current/sql-notify.html
|
|
.. _`AMQProxy`: https://github.com/cloudamqp/amqproxy
|
|
.. _`high connection churn`: https://www.rabbitmq.com/connections.html#high-connection-churn
|
|
.. _`article about CQRS`: https://martinfowler.com/bliki/CQRS.html
|
|
.. _`SSL context options`: https://php.net/context.ssl
|
|
.. _`predefined constants`: https://www.php.net/pcntl.constants
|
|
.. _`SQS CreateQueue API`: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_CreateQueue.html
|