[Messenger] Add MessageExecutionStrategyInterface and refactor Worker to use it

This commit is contained in:
Nicolas Grekas
2026-03-15 19:25:23 +01:00
parent 754e299ec9
commit ce5ffb39bb
9 changed files with 306 additions and 46 deletions

View File

@@ -297,7 +297,7 @@ class ConnectionTest extends TestCase
$series = [
[[['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
'VisibilityTimeout' => null,
'MaxNumberOfMessages' => 12,
'MaxNumberOfMessages' => 10,
'MessageAttributeNames' => ['All'],
'WaitTimeSeconds' => 20]], $firstResult],
[[['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',

View File

@@ -10,6 +10,7 @@ CHANGELOG
* Add an idle timeout option to the `BatchHandlerTrait`
* Add argument `$fetchSize` to `ReceiverInterface::get()` and `QueueReceiverInterface::getFromQueues()`, and to all bridges
* Add a `--fetch-size` option to the `messenger:consume` command to control how many messages are fetched per iteration
* Add `MessageExecutionStrategyInterface` and `SyncMessageExecutionStrategy` to decouple message execution from the `Worker`
8.0
---

View File

@@ -0,0 +1,67 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Execution;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Execution\Message\DeferredBatchMessage;
/**
* @internal
*/
final class DeferredBatchMessageQueue
{
/** @var \SplObjectStorage<object, DeferredBatchMessage>|null */
private ?\SplObjectStorage $messages = null;
public function hasPending(): bool
{
return null !== $this->messages;
}
public function add(object $batchHandler, string $transportName, Envelope $envelope, bool &$acked, float $queuedAt): void
{
$this->messages ??= new \SplObjectStorage();
$this->messages[$batchHandler] = new DeferredBatchMessage($transportName, $envelope, $acked, $queuedAt);
}
/**
* @return \SplObjectStorage<object, DeferredBatchMessage>
*/
public function popFlushable(bool|float $force, float $now): \SplObjectStorage
{
if (!$this->messages) {
return new \SplObjectStorage();
}
if (\is_bool($force)) {
$messages = $this->messages;
$this->messages = null;
return $messages;
}
$remaining = new \SplObjectStorage();
$flushable = new \SplObjectStorage();
foreach ($this->messages as $handler) {
if ($force <= $now - $this->messages[$handler]->queuedAt) {
$flushable[$handler] = $this->messages[$handler];
} else {
$remaining[$handler] = $this->messages[$handler];
}
}
$this->messages = $remaining->count() ? $remaining : null;
return $flushable;
}
}

View File

@@ -0,0 +1,31 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Execution\Message;
use Symfony\Component\Messenger\Envelope;
/**
* @internal
*/
final class DeferredBatchMessage
{
public bool $acked;
public function __construct(
public readonly string $transportName,
public readonly Envelope $envelope,
bool &$acked,
public readonly float $queuedAt,
) {
$this->acked = &$acked;
}
}

View File

@@ -0,0 +1,36 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Execution;
use Symfony\Component\Messenger\Envelope;
interface MessageExecutionStrategyInterface
{
/**
* @param callable(Envelope, string, bool, ?\Throwable): void $onHandled
*/
public function execute(Envelope $envelope, string $transportName, callable $onHandled): void;
public function shouldPauseConsumption(): bool;
/**
* @param callable(Envelope, string, bool, ?\Throwable): void $onHandled
*/
public function wait(callable $onHandled): bool;
/**
* @param callable(Envelope, string, bool, ?\Throwable): void $onHandled
*/
public function flush(callable $onHandled, bool|float $force = false): bool;
public function shutdown(): void;
}

View File

@@ -0,0 +1,64 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Execution;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\AckStamp;
final class SyncMessageExecutionStrategy implements MessageExecutionStrategyInterface
{
public function __construct(
private readonly MessageBusInterface $bus,
private readonly \Closure $onAcknowledge,
) {
}
public function execute(Envelope $envelope, string $transportName, callable $onHandled): void
{
$acked = false;
$error = null;
$ack = function (Envelope $handledEnvelope, ?\Throwable $handledError = null) use (&$envelope, &$acked, &$error, $transportName): void {
$envelope = $handledEnvelope;
$acked = true;
$error = $handledError;
($this->onAcknowledge)($transportName, $handledEnvelope, $handledError);
};
try {
$envelope = $this->bus->dispatch($envelope->with(new AckStamp($ack)));
$onHandled($envelope, $transportName, $acked, $error);
} catch (\Throwable $e) {
$onHandled($envelope, $transportName, $acked, $e);
}
}
public function shouldPauseConsumption(): bool
{
return false;
}
public function wait(callable $onHandled): bool
{
return false;
}
public function flush(callable $onHandled, bool|float $force = false): bool
{
return false;
}
public function shutdown(): void
{
}
}

View File

@@ -0,0 +1,63 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Execution;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Execution\DeferredBatchMessageQueue;
use Symfony\Component\Messenger\Execution\Message\DeferredBatchMessage;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
class DeferredBatchMessageQueueTest extends TestCase
{
public function testItReturnsAllEntriesWhenForceFlushing()
{
$queue = new DeferredBatchMessageQueue();
$batchHandler = new \stdClass();
$acked = false;
$envelope = new Envelope(new DummyMessage('Hello'));
$queue->add($batchHandler, 7, $envelope, $acked, 10.0);
$flushable = $queue->popFlushable(true, 10.0);
$this->assertCount(1, $flushable);
$this->assertFalse($queue->hasPending());
$this->assertEquals(new DeferredBatchMessage(7, $envelope, $acked, 10.0), $flushable[$batchHandler]);
}
public function testItKeepsRecentEntriesWhenFlushingByIdleTimeout()
{
$queue = new DeferredBatchMessageQueue();
$readyHandler = new \stdClass();
$waitingHandler = new \stdClass();
$readyAcked = false;
$waitingAcked = false;
$readyEnvelope = new Envelope(new DummyMessage('ready'));
$waitingEnvelope = new Envelope(new DummyMessage('waiting'));
$queue->add($readyHandler, 1, $readyEnvelope, $readyAcked, 10.0);
$queue->add($waitingHandler, 2, $waitingEnvelope, $waitingAcked, 19.5);
$flushable = $queue->popFlushable(5.0, 20.0);
$this->assertCount(1, $flushable);
$this->assertTrue($queue->hasPending());
$this->assertEquals(new DeferredBatchMessage(1, $readyEnvelope, $readyAcked, 10.0), $flushable[$readyHandler]);
$remaining = $queue->popFlushable(true, 20.0);
$this->assertCount(1, $remaining);
$this->assertFalse($queue->hasPending());
$this->assertEquals(new DeferredBatchMessage(2, $waitingEnvelope, $waitingAcked, 19.5), $remaining[$waitingHandler]);
}
}

View File

@@ -32,6 +32,7 @@ use Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener;
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\Exception\RuntimeException;
use Symfony\Component\Messenger\Execution\DeferredBatchMessageQueue;
use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
@@ -814,8 +815,9 @@ class WorkerTest extends TestCase
$dummyHandler = new DummyBatchHandler();
$envelopeWithNoAutoAck = $envelope->with(new NoAutoAckStamp(new HandlerDescriptor($dummyHandler)));
$unacks = new \SplObjectStorage();
$unacks[$dummyHandler] = [$envelopeWithNoAutoAck, 'transport', false, 0];
$unacks = new DeferredBatchMessageQueue();
$acked = false;
$unacks->add($dummyHandler, 'transport', $envelopeWithNoAutoAck, $acked, 0.0);
(new \ReflectionProperty($worker, 'unacks'))->setValue($worker, $unacks);
$worker->run();

View File

@@ -25,6 +25,9 @@ use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Exception\EnvelopeAwareExceptionInterface;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Exception\RuntimeException;
use Symfony\Component\Messenger\Execution\DeferredBatchMessageQueue;
use Symfony\Component\Messenger\Execution\MessageExecutionStrategyInterface;
use Symfony\Component\Messenger\Execution\SyncMessageExecutionStrategy;
use Symfony\Component\Messenger\Stamp\AckStamp;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
@@ -47,12 +50,14 @@ class Worker
private bool $shouldStop = false;
private WorkerMetadata $metadata;
private array $acks = [];
private ?\SplObjectStorage $unacks = null;
private ?DeferredBatchMessageQueue $unacks = null;
/**
* @var \SplObjectStorage<object, array{0: string, 1: Envelope}>
*/
private \SplObjectStorage $keepalives;
private readonly MessageExecutionStrategyInterface $messageExecutionStrategy;
/**
* @param ReceiverInterface[] $receivers Where the key is the transport name
*/
@@ -63,11 +68,13 @@ class Worker
private ?LoggerInterface $logger = null,
private ?array $rateLimiters = null,
private ClockInterface $clock = new Clock(),
?MessageExecutionStrategyInterface $messageExecutionStrategy = null,
) {
$this->metadata = new WorkerMetadata([
'transportNames' => array_keys($receivers),
]);
$this->keepalives = new \SplObjectStorage();
$this->messageExecutionStrategy = $messageExecutionStrategy ?? new SyncMessageExecutionStrategy($this->bus, $this->enqueueAck(...));
}
/**
@@ -161,25 +168,27 @@ class Worker
return;
}
$acked = false;
$ack = function (Envelope $envelope, ?\Throwable $e = null) use ($transportName, &$acked) {
$acked = true;
$this->acks[] = [$transportName, $envelope, $e];
};
$this->messageExecutionStrategy->execute(
$envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp()),
$transportName,
$this->preAck(...),
);
}
try {
$e = null;
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp(), new AckStamp($ack)));
} catch (\Throwable $e) {
}
private function enqueueAck(string $transportName, Envelope $envelope, ?\Throwable $e = null): void
{
$this->acks[] = [$transportName, $envelope, $e];
}
private function preAck(Envelope $envelope, string $transportName, bool &$acked, ?\Throwable $e = null): void
{
$noAutoAckStamp = $envelope->last(NoAutoAckStamp::class);
if (!$acked && !$noAutoAckStamp) {
$this->acks[] = [$transportName, $envelope, $e];
} elseif ($noAutoAckStamp) {
$this->unacks ??= new \SplObjectStorage();
$this->unacks[$noAutoAckStamp->getHandlerDescriptor()->getBatchHandler()] = [$envelope->withoutAll(AckStamp::class), $transportName, &$acked, $this->clock->now()->format('U.u')];
$this->unacks ??= new DeferredBatchMessageQueue();
$this->unacks->add($noAutoAckStamp->getHandlerDescriptor()->getBatchHandler(), $transportName, $envelope->withoutAll(AckStamp::class), $acked, (float) $this->clock->now()->format('U.u'));
}
$this->ack();
@@ -263,39 +272,25 @@ class Worker
private function flush(bool|float $force): bool
{
if (!$this->unacks) {
return false;
$flushed = $this->messageExecutionStrategy->flush($this->preAck(...), $force);
if (!$this->unacks?->hasPending()) {
return $flushed;
}
if (\is_bool($force)) {
$unacks = $this->unacks;
$this->unacks = null;
} else {
$now = $this->clock->now()->format('U.u');
$remaining = new \SplObjectStorage();
$unacks = new \SplObjectStorage();
foreach ($this->unacks as $handler) {
if ($force <= $now - $this->unacks[$handler][3]) {
$unacks[$handler] = $this->unacks[$handler];
} else {
$remaining[$handler] = $this->unacks[$handler];
}
}
$this->unacks = $remaining->count() ? $remaining : null;
$force = true;
}
$unacks = $this->unacks->popFlushable($force, (float) $this->clock->now()->format('U.u'));
if (!$unacks->count()) {
return false;
return $flushed;
}
foreach ($unacks as $handler) {
[$envelope, $transportName, $acked] = $unacks[$handler];
$deferredMessage = $unacks[$handler];
$transportName = $deferredMessage->transportName;
$envelope = $deferredMessage->envelope;
try {
$e = null;
$this->bus->dispatch($envelope->with(new FlushBatchHandlersStamp($force)));
$this->bus->dispatch($envelope->with(new FlushBatchHandlersStamp(true === $force || !\is_bool($force))));
} catch (\Throwable $e) {
$envelope = $envelope->withoutAll(NoAutoAckStamp::class);
$this->acks[] = [$transportName, $envelope, $e];
@@ -304,15 +299,15 @@ class Worker
$noAutoAckStamp = $envelope->last(NoAutoAckStamp::class);
if (!$acked && !$noAutoAckStamp) {
if (!$deferredMessage->acked && !$noAutoAckStamp) {
$this->acks[] = [$transportName, $envelope, $e];
} elseif ($noAutoAckStamp) {
$this->unacks ??= new \SplObjectStorage();
$this->unacks[$noAutoAckStamp->getHandlerDescriptor()->getBatchHandler()] = [$envelope->withoutAll(AckStamp::class), $transportName, &$acked, $this->clock->now()->format('U.u')];
$this->unacks ??= new DeferredBatchMessageQueue();
$this->unacks->add($noAutoAckStamp->getHandlerDescriptor()->getBatchHandler(), $transportName, $envelope->withoutAll(AckStamp::class), $deferredMessage->acked, (float) $this->clock->now()->format('U.u'));
}
}
return $this->ack();
return $this->ack() || $flushed;
}
public function stop(): void
@@ -326,8 +321,9 @@ class Worker
{
foreach ($this->keepalives as $message) {
[$transportName, $envelope] = $this->keepalives[$message];
$receiver = $this->receivers[$transportName];
if (!$this->receivers[$transportName] instanceof KeepaliveReceiverInterface) {
if (!$receiver instanceof KeepaliveReceiverInterface) {
throw new RuntimeException(\sprintf('Receiver for "%s" does not implement "%s".', $transportName, KeepaliveReceiverInterface::class));
}
@@ -335,7 +331,7 @@ class Worker
'transport' => $transportName,
'message_id' => $envelope->last(TransportMessageIdStamp::class)?->getId(),
]);
$this->receivers[$transportName]->keepalive($envelope, $seconds);
$receiver->keepalive($envelope, $seconds);
}
}