mirror of
https://github.com/symfony/redis-messenger.git
synced 2026-03-24 01:12:15 +01:00
[Messenger] Route decode failures through failure handling
This commit is contained in:
@@ -53,17 +53,17 @@ class RedisReceiverTest extends TestCase
|
||||
#[DataProvider('rejectedRedisEnvelopeProvider')]
|
||||
public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException(array $redisEnvelope)
|
||||
{
|
||||
$this->expectException(MessageDecodingFailedException::class);
|
||||
|
||||
$serializer = $this->createStub(PhpSerializer::class);
|
||||
$serializer->method('decode')->willThrowException(new MessageDecodingFailedException());
|
||||
|
||||
$connection = $this->createMock(Connection::class);
|
||||
$connection = $this->createStub(Connection::class);
|
||||
$connection->method('get')->willReturn($redisEnvelope);
|
||||
$connection->expects($this->once())->method('reject');
|
||||
|
||||
$receiver = new RedisReceiver($connection, $serializer);
|
||||
$receiver->get();
|
||||
$envelopes = $receiver->get();
|
||||
|
||||
$this->assertCount(1, $envelopes);
|
||||
$this->assertInstanceOf(MessageDecodingFailedException::class, $envelopes[0]->getMessage());
|
||||
}
|
||||
|
||||
public static function redisEnvelopeProvider(): \Generator
|
||||
|
||||
@@ -59,27 +59,29 @@ class RedisReceiver implements KeepaliveReceiverInterface, MessageCountAwareInte
|
||||
return [];
|
||||
}
|
||||
|
||||
$stamps = [
|
||||
new RedisReceivedStamp($message['id']),
|
||||
new TransportMessageIdStamp($message['id']),
|
||||
];
|
||||
|
||||
try {
|
||||
if (\array_key_exists('body', $redisEnvelope) && \array_key_exists('headers', $redisEnvelope)) {
|
||||
$envelope = $this->serializer->decode([
|
||||
$envelope = $this->serializer->decode($redisEnvelope = [
|
||||
'body' => $redisEnvelope['body'],
|
||||
'headers' => $redisEnvelope['headers'],
|
||||
]);
|
||||
} else {
|
||||
$envelope = $this->serializer->decode($redisEnvelope);
|
||||
}
|
||||
} catch (MessageDecodingFailedException $exception) {
|
||||
$this->connection->reject($message['id']);
|
||||
|
||||
throw $exception;
|
||||
} catch (MessageDecodingFailedException $e) {
|
||||
return [
|
||||
MessageDecodingFailedException::wrap($redisEnvelope, $e->getMessage(), $e->getCode(), $e)->with(...$stamps),
|
||||
];
|
||||
}
|
||||
|
||||
return [$envelope
|
||||
->withoutAll(TransportMessageIdStamp::class)
|
||||
->with(
|
||||
new RedisReceivedStamp($message['id']),
|
||||
new TransportMessageIdStamp($message['id'])
|
||||
)];
|
||||
return [
|
||||
$envelope->withoutAll(TransportMessageIdStamp::class)->with(...$stamps),
|
||||
];
|
||||
}
|
||||
|
||||
public function ack(Envelope $envelope): void
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
"require": {
|
||||
"php": ">=8.4",
|
||||
"ext-redis": "*",
|
||||
"symfony/messenger": "^7.4|^8.0"
|
||||
"symfony/messenger": "^8.1"
|
||||
},
|
||||
"require-dev": {
|
||||
"symfony/property-access": "^7.4|^8.0",
|
||||
|
||||
Reference in New Issue
Block a user