[Messenger] Route decode failures through failure handling

This commit is contained in:
Nicolas Grekas
2025-12-29 14:53:05 +01:00
parent a365ef0c11
commit 71b5f9c08f
3 changed files with 25 additions and 31 deletions

View File

@@ -40,20 +40,20 @@ class AmazonSqsReceiverTest extends TestCase
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
}
public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
public function testItReturnsSerializedEnvelopeWhenDecodingFails()
{
$this->expectException(MessageDecodingFailedException::class);
$serializer = $this->createStub(PhpSerializer::class);
$serializer->method('decode')->willThrowException(new MessageDecodingFailedException());
$sqsEnvelop = $this->createSqsEnvelope();
$connection = $this->createMock(Connection::class);
$connection = $this->createStub(Connection::class);
$connection->method('get')->willReturn($sqsEnvelop);
$connection->expects($this->once())->method('reject');
$receiver = new AmazonSqsReceiver($connection, $serializer);
iterator_to_array($receiver->get());
$envelopes = iterator_to_array($receiver->get());
$this->assertCount(1, $envelopes);
$this->assertInstanceOf(MessageDecodingFailedException::class, $envelopes[0]->getMessage());
}
public function testKeepalive()

View File

@@ -16,6 +16,7 @@ use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -38,32 +39,32 @@ class AmazonSqsReceiver implements KeepaliveReceiverInterface, MessageCountAware
public function get(): iterable
{
try {
$sqsEnvelope = $this->connection->get();
if (!$sqsEnvelope = $this->connection->get()) {
return;
}
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if (null === $sqsEnvelope) {
return;
}
$stamps = [
new AmazonSqsReceivedStamp($sqsEnvelope['id']),
new TransportMessageIdStamp($sqsEnvelope['id']),
];
try {
$envelope = $this->serializer->decode([
yield $this->serializer->decode($sqsEnvelope = [
'body' => $sqsEnvelope['body'],
'headers' => $sqsEnvelope['headers'],
]);
} catch (MessageDecodingFailedException $exception) {
$this->connection->reject($sqsEnvelope['id']);
throw $exception;
])->with(...$stamps);
} catch (MessageDecodingFailedException $e) {
yield MessageDecodingFailedException::wrap($sqsEnvelope, $e->getMessage(), $e->getCode(), $e)->with(...$stamps);
}
yield $envelope->with(new AmazonSqsReceivedStamp($sqsEnvelope['id']));
}
public function ack(Envelope $envelope): void
{
try {
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
$this->connection->delete($this->findSqsReceivedStampId($envelope));
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
@@ -72,7 +73,7 @@ class AmazonSqsReceiver implements KeepaliveReceiverInterface, MessageCountAware
public function reject(Envelope $envelope): void
{
try {
$this->connection->reject($this->findSqsReceivedStamp($envelope)->getId());
$this->connection->reject($this->findSqsReceivedStampId($envelope));
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
@@ -81,7 +82,7 @@ class AmazonSqsReceiver implements KeepaliveReceiverInterface, MessageCountAware
public function keepalive(Envelope $envelope, ?int $seconds = null): void
{
try {
$this->connection->keepalive($this->findSqsReceivedStamp($envelope)->getId(), $seconds);
$this->connection->keepalive($this->findSqsReceivedStampId($envelope), $seconds);
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
@@ -96,15 +97,8 @@ class AmazonSqsReceiver implements KeepaliveReceiverInterface, MessageCountAware
}
}
private function findSqsReceivedStamp(Envelope $envelope): AmazonSqsReceivedStamp
private function findSqsReceivedStampId(Envelope $envelope): string
{
/** @var AmazonSqsReceivedStamp|null $sqsReceivedStamp */
$sqsReceivedStamp = $envelope->last(AmazonSqsReceivedStamp::class);
if (null === $sqsReceivedStamp) {
throw new LogicException('No AmazonSqsReceivedStamp found on the Envelope.');
}
return $sqsReceivedStamp;
return $envelope->last(AmazonSqsReceivedStamp::class)?->getId() ?? throw new LogicException('No AmazonSqsReceivedStamp found on the Envelope.');
}
}

View File

@@ -20,7 +20,7 @@
"async-aws/core": "^1.7",
"async-aws/sqs": "^1.0|^2.0",
"psr/log": "^1|^2|^3",
"symfony/messenger": "^7.4|^8.0",
"symfony/messenger": "^8.1",
"symfony/service-contracts": "^2.5|^3"
},
"require-dev": {