[Messenger] Add ListableReceiverInterface support to RedisReceiver

This commit is contained in:
Mudassar
2025-08-20 00:45:54 +05:00
committed by Nicolas Grekas
parent ad4c444e5d
commit f9d7785dd2
6 changed files with 508 additions and 16 deletions

View File

@@ -5,6 +5,7 @@ CHANGELOG
---
* Add option `cluster` to force cluster mode
* Implement the `ListableReceiverInterface` to enable listing and finding messages in Redis streams
7.4
---

View File

@@ -670,4 +670,96 @@ class ConnectionTest extends TestCase
return (new \ReflectionFunction($initializer))->getStaticVariables();
}
public function testFindAllReturnsAllMessages()
{
$redis = $this->createRedisMock();
$redis->expects($this->exactly(1))->method('xRange')
->with('queue', '-', '+')
->willReturn([
'1234567890-0' => ['message' => json_encode(['body' => 'test1', 'headers' => []])],
'1234567890-1' => ['message' => json_encode(['body' => 'test2', 'headers' => []])],
]);
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$messages = $connection->findAll();
$this->assertCount(2, $messages);
$this->assertEquals('1234567890-0', $messages[0]['id']);
$this->assertEquals('1234567890-1', $messages[1]['id']);
$this->assertArrayHasKey('data', $messages[0]);
$this->assertArrayHasKey('data', $messages[1]);
}
public function testFindAllWithLimit()
{
$redis = $this->createRedisMock();
$redis->expects($this->exactly(1))->method('xRange')
->with('queue', '-', '+', 1)
->willReturn([
'1234567890-0' => ['message' => json_encode(['body' => 'test1', 'headers' => []])],
]);
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$messages = $connection->findAll(1);
$this->assertCount(1, $messages);
$this->assertEquals('1234567890-0', $messages[0]['id']);
}
public function testFindAllWhenRedisExceptionOccurs()
{
$redis = $this->createRedisMock();
$redis->expects($this->exactly(1))->method('xRange')
->with('queue', '-', '+')
->willThrowException($exception = new \RedisException('Something went wrong'));
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
$connection->findAll();
}
public function testFindReturnsMessageById()
{
$redis = $this->createRedisMock();
$redis->expects($this->exactly(1))->method('xRange')
->with('queue', '1234567890-0', '1234567890-0', 1)
->willReturn([
'1234567890-0' => ['message' => json_encode(['body' => 'test1', 'headers' => []])],
]);
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$message = $connection->find('1234567890-0');
$this->assertNotNull($message);
$this->assertEquals('1234567890-0', $message['id']);
$this->assertArrayHasKey('data', $message);
}
public function testFindReturnsNullForNonExistentMessage()
{
$redis = $this->createRedisMock();
$redis->expects($this->exactly(1))->method('xRange')
->with('queue', '9999999999-0', '9999999999-0', 1)
->willReturn([]);
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$message = $connection->find('9999999999-0');
$this->assertNull($message);
}
public function testFindReturnsNullForInvalidId()
{
$connection = Connection::fromDsn('redis://localhost/queue', [], $this->createRedisMock());
$message = $connection->find(123);
$this->assertNull($message);
}
}

View File

@@ -0,0 +1,122 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Tests\Transport;
use PHPUnit\Framework\Attributes\Group;
use PHPUnit\Framework\Attributes\RequiresPhpExtension;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\Redis\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection;
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisReceivedStamp;
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisReceiver;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
#[RequiresPhpExtension('redis')]
#[Group('integration')]
class RedisListableIntegrationTest extends TestCase
{
private ?Connection $connection = null;
private ?\Redis $redis = null;
private string $streamName;
protected function setUp(): void
{
$this->streamName = 'test-stream-'.uniqid();
$this->redis = new \Redis();
try {
$this->redis->connect('127.0.0.1', 6379);
$this->redis->ping();
} catch (\RedisException $e) {
$this->markTestSkipped('Redis server is not available: '.$e->getMessage());
}
$this->connection = Connection::fromDsn('redis://127.0.0.1:6379/'.$this->streamName, [], $this->redis);
}
protected function tearDown(): void
{
if ($this->redis) {
$this->redis->del($this->streamName);
$this->redis->close();
}
}
public function testAllReturnsAllMessages()
{
$receiver = $this->createReceiver();
$this->assertInstanceOf(ListableReceiverInterface::class, $receiver);
$this->addEnvelope(new Envelope(new DummyMessage('Hi')));
$this->addEnvelope(new Envelope(new DummyMessage('Hello')));
$envelopes = iterator_to_array($receiver->all());
$this->assertCount(2, $envelopes);
$this->assertEquals(new DummyMessage('Hi'), $envelopes[0]->getMessage());
$this->assertEquals(new DummyMessage('Hello'), $envelopes[1]->getMessage());
$this->assertNotNull($envelopes[0]->last(TransportMessageIdStamp::class));
$this->assertNotNull($envelopes[0]->last(RedisReceivedStamp::class));
}
public function testAllWithLimit()
{
$this->addEnvelope(new Envelope(new DummyMessage('Hi')));
$this->addEnvelope(new Envelope(new DummyMessage('Hello')));
$envelopes = iterator_to_array($this->createReceiver()->all(1));
$this->assertCount(1, $envelopes);
}
public function testFindReturnsMessageById()
{
$this->addEnvelope(new Envelope(new DummyMessage('Hi')));
$messageId = $this->connection->findAll()[0]['id'];
$foundEnvelope = $this->createReceiver()->find($messageId);
$this->assertNotNull($foundEnvelope);
$this->assertEquals(new DummyMessage('Hi'), $foundEnvelope->getMessage());
$this->assertNotNull($foundEnvelope->last(TransportMessageIdStamp::class));
$this->assertNotNull($foundEnvelope->last(RedisReceivedStamp::class));
}
public function testFindReturnsNullForNonExistentMessage()
{
$this->assertNull($this->createReceiver()->find('9999999999-0'));
}
private function createReceiver(): RedisReceiver
{
return new RedisReceiver($this->connection, $this->createSerializer());
}
private function createSerializer(): Serializer
{
return new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
}
private function addEnvelope(Envelope $envelope): void
{
$encoded = $this->createSerializer()->encode($envelope);
$this->connection->add($encoded['body'], $encoded['headers']);
}
}

View File

@@ -144,4 +144,176 @@ class RedisReceiverTest extends TestCase
));
$receiver->keepalive(new Envelope(new DummyMessage('foo'), [new RedisReceivedStamp('redisid-123')]));
}
public function testAllReturnsAllMessages()
{
$messages = [
[
'id' => '1',
'data' => [
'message' => json_encode([
'body' => '{"message": "Hi"}',
'headers' => [
'type' => DummyMessage::class,
],
]),
],
],
[
'id' => '2',
'data' => [
'message' => json_encode([
'body' => '{"message": "Hello"}',
'headers' => [
'type' => DummyMessage::class,
],
]),
],
],
];
$connection = $this->createStub(Connection::class);
$connection->method('findAll')->willReturn($messages);
$receiver = new RedisReceiver($connection, new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
));
$envelopes = iterator_to_array($receiver->all());
$this->assertCount(2, $envelopes);
$this->assertEquals(new DummyMessage('Hi'), $envelopes[0]->getMessage());
$this->assertEquals(new DummyMessage('Hello'), $envelopes[1]->getMessage());
$this->assertNotNull($envelopes[0]->last(TransportMessageIdStamp::class));
$this->assertNotNull($envelopes[0]->last(RedisReceivedStamp::class));
$this->assertNotNull($envelopes[1]->last(TransportMessageIdStamp::class));
$this->assertNotNull($envelopes[1]->last(RedisReceivedStamp::class));
}
public function testAllWithLimit()
{
$messages = [
[
'id' => '1',
'data' => [
'message' => json_encode([
'body' => '{"message": "Hi"}',
'headers' => [
'type' => DummyMessage::class,
],
]),
],
],
];
$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('findAll')->with(1)->willReturn($messages);
$receiver = new RedisReceiver($connection, new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
));
$envelopes = iterator_to_array($receiver->all(1));
$this->assertCount(1, $envelopes);
}
public function testAllSkipsInvalidMessages()
{
$messages = [
[
'id' => '1',
'data' => null,
],
[
'id' => '2',
'data' => [
'message' => 'invalid-json',
],
],
[
'id' => '3',
'data' => [
'message' => json_encode([
'body' => '{"message": "Hi"}',
'headers' => [
'type' => DummyMessage::class,
],
]),
],
],
];
$connection = $this->createStub(Connection::class);
$connection->method('findAll')->willReturn($messages);
$receiver = new RedisReceiver($connection, new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
));
$envelopes = iterator_to_array($receiver->all());
$this->assertCount(1, $envelopes);
$this->assertEquals(new DummyMessage('Hi'), $envelopes[0]->getMessage());
}
public function testFindReturnsMessageById()
{
$message = [
'id' => '123',
'data' => [
'message' => json_encode([
'body' => '{"message": "Hi"}',
'headers' => [
'type' => DummyMessage::class,
],
]),
],
];
$connection = $this->createMock(Connection::class);
$connection->method('find')->with('123')->willReturn($message);
$receiver = new RedisReceiver($connection, new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
));
$envelope = $receiver->find('123');
$this->assertNotNull($envelope);
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
$this->assertNotNull($envelope->last(TransportMessageIdStamp::class));
$this->assertNotNull($envelope->last(RedisReceivedStamp::class));
}
public function testFindReturnsNullForNonExistentMessage()
{
$connection = $this->createMock(Connection::class);
$connection->method('find')->with('999')->willReturn(null);
$receiver = new RedisReceiver($connection, new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
));
$envelope = $receiver->find('999');
$this->assertNull($envelope);
}
public function testFindReturnsNullForInvalidJson()
{
$message = [
'id' => '123',
'data' => [
'message' => 'invalid-json',
],
];
$connection = $this->createMock(Connection::class);
$connection->method('find')->with('123')->willReturn($message);
$receiver = new RedisReceiver($connection, new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
));
$envelope = $receiver->find('123');
$this->assertNull($envelope);
}
}

View File

@@ -766,6 +766,70 @@ class Connection
return $this->redis;
}
public function findAll(?int $limit = null): array
{
$redis = $this->getRedis();
try {
if (null === $limit) {
$range = $redis->xRange($this->stream, '-', '+');
} else {
$range = $redis->xRange($this->stream, '-', '+', $limit);
}
} catch (\RedisException|\Relay\Exception $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if (!$range) {
if ($error = $redis->getLastError() ?: null) {
$redis->clearLastError();
}
return [];
}
$messages = [];
foreach ($range as $id => $data) {
$messages[] = [
'id' => $id,
'data' => $data,
];
}
return $messages;
}
public function find(mixed $id): ?array
{
if (!\is_string($id)) {
return null;
}
$redis = $this->getRedis();
try {
$range = $redis->xRange($this->stream, $id, $id, 1);
} catch (\RedisException|\Relay\Exception $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if (!$range) {
if ($error = $redis->getLastError() ?: null) {
$redis->clearLastError();
}
return null;
}
$data = current($range);
$messageId = key($range);
return [
'id' => $messageId,
'data' => $data,
];
}
public function close(): void
{
$this->redis = null;

View File

@@ -17,6 +17,7 @@ use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -25,7 +26,7 @@ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
* @author Alexander Schranz <alexander@sulu.io>
* @author Antoine Bluchet <soyuka@gmail.com>
*/
class RedisReceiver implements KeepaliveReceiverInterface, MessageCountAwareInterface
class RedisReceiver implements KeepaliveReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface
{
private SerializerInterface $serializer;
@@ -38,9 +39,7 @@ class RedisReceiver implements KeepaliveReceiverInterface, MessageCountAwareInte
public function get(): iterable
{
$message = $this->connection->get();
if (null === $message) {
if (null === $message = $this->connection->get()) {
return [];
}
@@ -56,9 +55,7 @@ class RedisReceiver implements KeepaliveReceiverInterface, MessageCountAwareInte
return $this->get();
}
$redisEnvelope = json_decode($message['data']['message'] ?? '', true);
if (null === $redisEnvelope) {
if (null === $redisEnvelope = json_decode($message['data']['message'] ?? '', true)) {
return [];
}
@@ -87,17 +84,17 @@ class RedisReceiver implements KeepaliveReceiverInterface, MessageCountAwareInte
public function ack(Envelope $envelope): void
{
$this->connection->ack($this->findRedisReceivedStamp($envelope)->getId());
$this->connection->ack($this->findRedisReceivedStampId($envelope));
}
public function reject(Envelope $envelope): void
{
$this->connection->reject($this->findRedisReceivedStamp($envelope)->getId());
$this->connection->reject($this->findRedisReceivedStampId($envelope));
}
public function keepalive(Envelope $envelope, ?int $seconds = null): void
{
$this->connection->keepalive($this->findRedisReceivedStamp($envelope)->getId(), $seconds);
$this->connection->keepalive($this->findRedisReceivedStampId($envelope), $seconds);
}
public function getMessageCount(): int
@@ -105,15 +102,59 @@ class RedisReceiver implements KeepaliveReceiverInterface, MessageCountAwareInte
return $this->connection->getMessageCount();
}
private function findRedisReceivedStamp(Envelope $envelope): RedisReceivedStamp
public function all(?int $limit = null): iterable
{
/** @var RedisReceivedStamp|null $redisReceivedStamp */
$redisReceivedStamp = $envelope->last(RedisReceivedStamp::class);
$messages = $this->connection->findAll($limit);
if (null === $redisReceivedStamp) {
throw new LogicException('No RedisReceivedStamp found on the Envelope.');
foreach ($messages as $message) {
if (null !== $envelope = $this->createEnvelopeFromData($message['id'], $message['data']['message'] ?? null)) {
yield $envelope;
}
}
}
public function find(mixed $id): ?Envelope
{
if (null === $message = $this->connection->find($id)) {
return null;
}
return $redisReceivedStamp;
return $this->createEnvelopeFromData($message['id'], $message['data']['message'] ?? null);
}
private function createEnvelopeFromData(string $id, ?string $json): ?Envelope
{
if (null === $json) {
return null;
}
if (null === $redisEnvelope = json_decode($json, true)) {
return null;
}
try {
if (\array_key_exists('body', $redisEnvelope) && \array_key_exists('headers', $redisEnvelope)) {
$envelope = $this->serializer->decode([
'body' => $redisEnvelope['body'],
'headers' => $redisEnvelope['headers'],
]);
} else {
$envelope = $this->serializer->decode($redisEnvelope);
}
} catch (MessageDecodingFailedException) {
return null;
}
return $envelope
->withoutAll(TransportMessageIdStamp::class)
->with(
new RedisReceivedStamp($id),
new TransportMessageIdStamp($id)
);
}
private function findRedisReceivedStampId(Envelope $envelope): string
{
return $envelope->last(RedisReceivedStamp::class)?->getId() ?? throw new LogicException('No RedisReceivedStamp found on the Envelope.');
}
}