From 5543eaf1de9b2ea3da9d65c3cf09b3319dcb2fec Mon Sep 17 00:00:00 2001 From: Nicolas Grekas Date: Mon, 29 Dec 2025 14:53:05 +0100 Subject: [PATCH] [Messenger] Route decode failures through failure handling --- Tests/Transport/RedisReceiverTest.php | 10 +++++----- Transport/RedisReceiver.php | 24 +++++++++++++----------- composer.json | 2 +- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/Tests/Transport/RedisReceiverTest.php b/Tests/Transport/RedisReceiverTest.php index 801bcc6..05f31c3 100644 --- a/Tests/Transport/RedisReceiverTest.php +++ b/Tests/Transport/RedisReceiverTest.php @@ -53,17 +53,17 @@ class RedisReceiverTest extends TestCase #[DataProvider('rejectedRedisEnvelopeProvider')] public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException(array $redisEnvelope) { - $this->expectException(MessageDecodingFailedException::class); - $serializer = $this->createStub(PhpSerializer::class); $serializer->method('decode')->willThrowException(new MessageDecodingFailedException()); - $connection = $this->createMock(Connection::class); + $connection = $this->createStub(Connection::class); $connection->method('get')->willReturn($redisEnvelope); - $connection->expects($this->once())->method('reject'); $receiver = new RedisReceiver($connection, $serializer); - $receiver->get(); + $envelopes = $receiver->get(); + + $this->assertCount(1, $envelopes); + $this->assertInstanceOf(MessageDecodingFailedException::class, $envelopes[0]->getMessage()); } public static function redisEnvelopeProvider(): \Generator diff --git a/Transport/RedisReceiver.php b/Transport/RedisReceiver.php index ff7aa38..15a1e4f 100644 --- a/Transport/RedisReceiver.php +++ b/Transport/RedisReceiver.php @@ -59,27 +59,29 @@ class RedisReceiver implements KeepaliveReceiverInterface, MessageCountAwareInte return []; } + $stamps = [ + new RedisReceivedStamp($message['id']), + new TransportMessageIdStamp($message['id']), + ]; + try { if (\array_key_exists('body', $redisEnvelope) && \array_key_exists('headers', $redisEnvelope)) { - $envelope = $this->serializer->decode([ + $envelope = $this->serializer->decode($redisEnvelope = [ 'body' => $redisEnvelope['body'], 'headers' => $redisEnvelope['headers'], ]); } else { $envelope = $this->serializer->decode($redisEnvelope); } - } catch (MessageDecodingFailedException $exception) { - $this->connection->reject($message['id']); - - throw $exception; + } catch (MessageDecodingFailedException $e) { + return [ + MessageDecodingFailedException::wrap($redisEnvelope, $e->getMessage(), $e->getCode(), $e)->with(...$stamps), + ]; } - return [$envelope - ->withoutAll(TransportMessageIdStamp::class) - ->with( - new RedisReceivedStamp($message['id']), - new TransportMessageIdStamp($message['id']) - )]; + return [ + $envelope->withoutAll(TransportMessageIdStamp::class)->with(...$stamps), + ]; } public function ack(Envelope $envelope): void diff --git a/composer.json b/composer.json index 8572797..237a44c 100644 --- a/composer.json +++ b/composer.json @@ -18,7 +18,7 @@ "require": { "php": ">=8.4", "ext-redis": "*", - "symfony/messenger": "^7.4|^8.0" + "symfony/messenger": "^8.1" }, "require-dev": { "symfony/property-access": "^7.4|^8.0",