[Messenger] Add AmqpPriorityStamp for per-message priority on AMQP transport

This commit is contained in:
Valentin Nazarov
2026-02-24 19:39:49 +01:00
committed by Nicolas Grekas
parent 6903a715af
commit 23991d4abd
5 changed files with 71 additions and 9 deletions

View File

@@ -47,7 +47,7 @@ $worker = new Worker(['the_receiver' => $receiver], new class implements Message
}
}, $eventDispatcher);
pcntl_signal(15, fn () => $worker->stop());
pcntl_signal(15, static fn () => $worker->stop());
echo "Receiving messages...\n";
$worker->run();

View File

@@ -14,12 +14,14 @@ namespace Symfony\Component\Messenger\Bridge\Amqp\Tests\Transport;
use PHPUnit\Framework\Attributes\RequiresPhpExtension;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\Amqp\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpPriorityStamp;
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceivedStamp;
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpSender;
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
@@ -73,6 +75,36 @@ class AmqpSenderTest extends TestCase
$sender->send($envelope);
}
public function testItSendsWithDelay()
{
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new DelayStamp(1000));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
$serializer = $this->createMock(SerializerInterface::class);
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 1000);
$sender = new AmqpSender($connection, $serializer);
$sender->send($envelope);
}
public function testItSendsWithPriority()
{
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new AmqpPriorityStamp(255));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
$serializer = $this->createMock(SerializerInterface::class);
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $this->callback(static fn (AmqpStamp $stamp) => 255 === $stamp->getAttributes()['priority']));
$sender = new AmqpSender($connection, $serializer);
$sender->send($envelope);
}
public function testContentTypeHeaderIsMovedToAttribute()
{
$envelope = new Envelope(new DummyMessage('Oy'));

View File

@@ -0,0 +1,29 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
use Symfony\Component\Messenger\Stamp\StampInterface;
/**
* Apply this stamp to set the per-message priority on an AMQP queue.
*
* The queue MUST be declared with the "x-max-priority" argument for the broker to honour this value.
*
* @author Valentin Nazarov <i.kozlice@protonmail.com>
*/
final class AmqpPriorityStamp implements StampInterface
{
public function __construct(
public readonly int $priority,
) {
}
}

View File

@@ -41,27 +41,27 @@ class AmqpSender implements SenderInterface
{
$encodedMessage = $this->serializer->encode($envelope);
/** @var DelayStamp|null $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
$delay = $delayStamp ? $delayStamp->getDelay() : 0;
/** @var AmqpStamp|null $amqpStamp */
$amqpStamp = $envelope->last(AmqpStamp::class);
if (isset($encodedMessage['headers']['Content-Type'])) {
$contentType = $encodedMessage['headers']['Content-Type'];
if ($contentType = $encodedMessage['headers']['Content-Type'] ?? null) {
unset($encodedMessage['headers']['Content-Type']);
if (!$amqpStamp || !isset($amqpStamp->getAttributes()['content_type'])) {
if (!($amqpStamp?->getAttributes()['content_type'] ?? null)) {
$amqpStamp = AmqpStamp::createWithAttributes(['content_type' => $contentType], $amqpStamp);
}
}
if ($amqpStamp instanceof AmqpStamp && isset($amqpStamp->getAttributes()['message_id'])) {
if ($priorityStamp = $envelope->last(AmqpPriorityStamp::class)) {
$amqpStamp = AmqpStamp::createWithAttributes(['priority' => $priorityStamp->priority], $amqpStamp);
}
if ($amqpStamp?->getAttributes()['message_id'] ?? null) {
$envelope = $envelope->with(new TransportMessageIdStamp($amqpStamp->getAttributes()['message_id']));
}
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if ($amqpReceivedStamp instanceof AmqpReceivedStamp) {
if ($amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class)) {
$amqpStamp = AmqpStamp::createFromAmqpEnvelope(
$amqpReceivedStamp->getAmqpEnvelope(),
$amqpStamp,

View File

@@ -12,6 +12,7 @@ CHANGELOG
* Add a `--fetch-size` option to the `messenger:consume` command to control how many messages are fetched per iteration
* Add `MessageExecutionStrategyInterface` and `SyncMessageExecutionStrategy` to decouple message execution from the `Worker`
* Allow configuring the service reset interval in the `messenger:consume` command via the `--no-reset` option
* Add `AmqpPriorityStamp` to set per-message priority on the AMQP transport
8.0
---