mirror of
https://github.com/symfony/symfony.git
synced 2026-03-24 00:32:15 +01:00
[Messenger] Add AmqpPriorityStamp for per-message priority on AMQP transport
This commit is contained in:
committed by
Nicolas Grekas
parent
6903a715af
commit
23991d4abd
@@ -47,7 +47,7 @@ $worker = new Worker(['the_receiver' => $receiver], new class implements Message
|
||||
}
|
||||
}, $eventDispatcher);
|
||||
|
||||
pcntl_signal(15, fn () => $worker->stop());
|
||||
pcntl_signal(15, static fn () => $worker->stop());
|
||||
|
||||
echo "Receiving messages...\n";
|
||||
$worker->run();
|
||||
|
||||
@@ -14,12 +14,14 @@ namespace Symfony\Component\Messenger\Bridge\Amqp\Tests\Transport;
|
||||
use PHPUnit\Framework\Attributes\RequiresPhpExtension;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Bridge\Amqp\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpPriorityStamp;
|
||||
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceivedStamp;
|
||||
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpSender;
|
||||
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
|
||||
use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\TransportException;
|
||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
|
||||
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
|
||||
@@ -73,6 +75,36 @@ class AmqpSenderTest extends TestCase
|
||||
$sender->send($envelope);
|
||||
}
|
||||
|
||||
public function testItSendsWithDelay()
|
||||
{
|
||||
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new DelayStamp(1000));
|
||||
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
|
||||
|
||||
$serializer = $this->createMock(SerializerInterface::class);
|
||||
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
|
||||
|
||||
$connection = $this->createMock(Connection::class);
|
||||
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 1000);
|
||||
|
||||
$sender = new AmqpSender($connection, $serializer);
|
||||
$sender->send($envelope);
|
||||
}
|
||||
|
||||
public function testItSendsWithPriority()
|
||||
{
|
||||
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new AmqpPriorityStamp(255));
|
||||
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
|
||||
|
||||
$serializer = $this->createMock(SerializerInterface::class);
|
||||
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
|
||||
|
||||
$connection = $this->createMock(Connection::class);
|
||||
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $this->callback(static fn (AmqpStamp $stamp) => 255 === $stamp->getAttributes()['priority']));
|
||||
|
||||
$sender = new AmqpSender($connection, $serializer);
|
||||
$sender->send($envelope);
|
||||
}
|
||||
|
||||
public function testContentTypeHeaderIsMovedToAttribute()
|
||||
{
|
||||
$envelope = new Envelope(new DummyMessage('Oy'));
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
|
||||
|
||||
use Symfony\Component\Messenger\Stamp\StampInterface;
|
||||
|
||||
/**
|
||||
* Apply this stamp to set the per-message priority on an AMQP queue.
|
||||
*
|
||||
* The queue MUST be declared with the "x-max-priority" argument for the broker to honour this value.
|
||||
*
|
||||
* @author Valentin Nazarov <i.kozlice@protonmail.com>
|
||||
*/
|
||||
final class AmqpPriorityStamp implements StampInterface
|
||||
{
|
||||
public function __construct(
|
||||
public readonly int $priority,
|
||||
) {
|
||||
}
|
||||
}
|
||||
@@ -41,27 +41,27 @@ class AmqpSender implements SenderInterface
|
||||
{
|
||||
$encodedMessage = $this->serializer->encode($envelope);
|
||||
|
||||
/** @var DelayStamp|null $delayStamp */
|
||||
$delayStamp = $envelope->last(DelayStamp::class);
|
||||
$delay = $delayStamp ? $delayStamp->getDelay() : 0;
|
||||
|
||||
/** @var AmqpStamp|null $amqpStamp */
|
||||
$amqpStamp = $envelope->last(AmqpStamp::class);
|
||||
if (isset($encodedMessage['headers']['Content-Type'])) {
|
||||
$contentType = $encodedMessage['headers']['Content-Type'];
|
||||
if ($contentType = $encodedMessage['headers']['Content-Type'] ?? null) {
|
||||
unset($encodedMessage['headers']['Content-Type']);
|
||||
|
||||
if (!$amqpStamp || !isset($amqpStamp->getAttributes()['content_type'])) {
|
||||
if (!($amqpStamp?->getAttributes()['content_type'] ?? null)) {
|
||||
$amqpStamp = AmqpStamp::createWithAttributes(['content_type' => $contentType], $amqpStamp);
|
||||
}
|
||||
}
|
||||
|
||||
if ($amqpStamp instanceof AmqpStamp && isset($amqpStamp->getAttributes()['message_id'])) {
|
||||
if ($priorityStamp = $envelope->last(AmqpPriorityStamp::class)) {
|
||||
$amqpStamp = AmqpStamp::createWithAttributes(['priority' => $priorityStamp->priority], $amqpStamp);
|
||||
}
|
||||
|
||||
if ($amqpStamp?->getAttributes()['message_id'] ?? null) {
|
||||
$envelope = $envelope->with(new TransportMessageIdStamp($amqpStamp->getAttributes()['message_id']));
|
||||
}
|
||||
|
||||
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
|
||||
if ($amqpReceivedStamp instanceof AmqpReceivedStamp) {
|
||||
if ($amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class)) {
|
||||
$amqpStamp = AmqpStamp::createFromAmqpEnvelope(
|
||||
$amqpReceivedStamp->getAmqpEnvelope(),
|
||||
$amqpStamp,
|
||||
|
||||
@@ -12,6 +12,7 @@ CHANGELOG
|
||||
* Add a `--fetch-size` option to the `messenger:consume` command to control how many messages are fetched per iteration
|
||||
* Add `MessageExecutionStrategyInterface` and `SyncMessageExecutionStrategy` to decouple message execution from the `Worker`
|
||||
* Allow configuring the service reset interval in the `messenger:consume` command via the `--no-reset` option
|
||||
* Add `AmqpPriorityStamp` to set per-message priority on the AMQP transport
|
||||
|
||||
8.0
|
||||
---
|
||||
|
||||
Reference in New Issue
Block a user