[Messenger] Move Transports to separate packages

This commit is contained in:
Nyholm
2020-01-21 17:10:46 +01:00
committed by Fabien Potencier
commit 7b2a8d6121
20 changed files with 1324 additions and 0 deletions

3
.gitattributes vendored Normal file
View File

@@ -0,0 +1,3 @@
/Tests export-ignore
/phpunit.xml.dist export-ignore
/.gitignore export-ignore

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
vendor/
composer.lock
phpunit.xml

7
CHANGELOG.md Normal file
View File

@@ -0,0 +1,7 @@
CHANGELOG
=========
5.1.0
-----
* Introduced the Redis bridge.

19
LICENSE Normal file
View File

@@ -0,0 +1,19 @@
Copyright (c) 2018-2020 Fabien Potencier
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished
to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

12
README.md Normal file
View File

@@ -0,0 +1,12 @@
Redis Messenger
===============
Provides Redis integration for Symfony Messenger.
Resources
---------
* [Contributing](https://symfony.com/doc/current/contributing/index.html)
* [Report issues](https://github.com/symfony/symfony/issues) and
[send Pull Requests](https://github.com/symfony/symfony/pulls)
in the [main Symfony repository](https://github.com/symfony/symfony)

View File

@@ -0,0 +1,18 @@
<?php
namespace Symfony\Component\Messenger\Bridge\Redis\Tests\Fixtures;
class DummyMessage
{
private $message;
public function __construct(string $message)
{
$this->message = $message;
}
public function getMessage(): string
{
return $this->message;
}
}

View File

@@ -0,0 +1,250 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Tests\Transport;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection;
/**
* @requires extension redis >= 4.3.0
*/
class ConnectionTest extends TestCase
{
public static function setUpBeforeClass(): void
{
$redis = Connection::fromDsn('redis://localhost/queue');
try {
$redis->get();
} catch (TransportException $e) {
if (0 === strpos($e->getMessage(), 'ERR unknown command \'X')) {
self::markTestSkipped('Redis server >= 5 is required');
}
throw $e;
}
}
public function testFromInvalidDsn()
{
$this->expectException(\InvalidArgumentException::class);
$this->expectExceptionMessage('The given Redis DSN "redis://" is invalid.');
Connection::fromDsn('redis://');
}
public function testFromDsn()
{
$this->assertEquals(
new Connection(['stream' => 'queue'], [
'host' => 'localhost',
'port' => 6379,
]),
Connection::fromDsn('redis://localhost/queue')
);
}
public function testFromDsnOnUnixSocket()
{
$this->assertEquals(
new Connection(['stream' => 'queue'], [
'host' => '/var/run/redis/redis.sock',
'port' => 0,
], [], $redis = $this->createMock(\Redis::class)),
Connection::fromDsn('redis:///var/run/redis/redis.sock', ['stream' => 'queue'], $redis)
);
}
public function testFromDsnWithOptions()
{
$this->assertEquals(
Connection::fromDsn('redis://localhost', ['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false, 'serializer' => 2]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2&auto_setup=0')
);
}
public function testFromDsnWithQueryOptions()
{
$this->assertEquals(
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
'host' => 'localhost',
'port' => 6379,
], [
'serializer' => 2,
]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2')
);
}
public function testKeepGettingPendingMessages()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$redis->expects($this->exactly(3))->method('xreadgroup')
->with('symfony', 'consumer', ['queue' => 0], 1, null)
->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]);
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$this->assertNotNull($connection->get());
$this->assertNotNull($connection->get());
$this->assertNotNull($connection->get());
}
public function testAuth()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$redis->expects($this->exactly(1))->method('auth')
->with('password')
->willReturn(true);
Connection::fromDsn('redis://password@localhost/queue', [], $redis);
}
public function testFailedAuth()
{
$this->expectException(\InvalidArgumentException::class);
$this->expectExceptionMessage('Redis connection failed');
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$redis->expects($this->exactly(1))->method('auth')
->with('password')
->willReturn(false);
Connection::fromDsn('redis://password@localhost/queue', [], $redis);
}
public function testDbIndex()
{
$redis = new \Redis();
Connection::fromDsn('redis://localhost/queue?dbindex=2', [], $redis);
$this->assertSame(2, $redis->getDbNum());
}
public function testFirstGetPendingMessagesThenNewMessages()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$count = 0;
$redis->expects($this->exactly(2))->method('xreadgroup')
->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$count) {
++$count;
if (1 === $count) {
return '0' === $arr_streams['queue'];
}
return '>' === $arr_streams['queue'];
}), 1, null)
->willReturn(['queue' => []]);
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$connection->get();
}
public function testUnexpectedRedisError()
{
$this->expectException(TransportException::class);
$this->expectExceptionMessage('Redis error happens');
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
$redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');
$connection = Connection::fromDsn('redis://localhost/queue', ['auto_setup' => false], $redis);
$connection->get();
}
public function testGetAfterReject()
{
$redis = new \Redis();
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis);
$connection->add('1', []);
$connection->add('2', []);
$failing = $connection->get();
$connection->reject($failing['id']);
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
$this->assertNotNull($connection->get());
$redis->del('messenger-rejectthenget');
}
public function testGetNonBlocking()
{
$redis = new \Redis();
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
$this->assertNull($connection->get()); // no message, should return null immediately
$connection->add('1', []);
$this->assertNotEmpty($message = $connection->get());
$connection->reject($message['id']);
$redis->del('messenger-getnonblocking');
}
public function testJsonError()
{
$redis = new \Redis();
$connection = Connection::fromDsn('redis://localhost/json-error', [], $redis);
try {
$connection->add("\xB1\x31", []);
} catch (TransportException $e) {
}
$this->assertSame('Malformed UTF-8 characters, possibly incorrectly encoded', $e->getMessage());
}
public function testMaxEntries()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$redis->expects($this->exactly(1))->method('xadd')
->with('queue', '*', ['message' => '{"body":"1","headers":[]}'], 20000, true)
->willReturn(1);
$connection = Connection::fromDsn('redis://localhost/queue?stream_max_entries=20000', [], $redis); // 1 = always
$connection->add('1', []);
}
public function testLastErrorGetsCleared()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$redis->expects($this->once())->method('xadd')->willReturn(0);
$redis->expects($this->once())->method('xack')->willReturn(0);
$redis->method('getLastError')->willReturnOnConsecutiveCalls('xadd error', 'xack error');
$redis->expects($this->exactly(2))->method('clearLastError');
$connection = Connection::fromDsn('redis://localhost/messenger-clearlasterror', ['auto_setup' => false], $redis);
try {
$connection->add('message', []);
} catch (TransportException $e) {
}
$this->assertSame('xadd error', $e->getMessage());
try {
$connection->ack('1');
} catch (TransportException $e) {
}
$this->assertSame('xack error', $e->getMessage());
}
}

View File

@@ -0,0 +1,103 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Tests\Transport;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\Redis\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection;
/**
* @requires extension redis
* @group time-sensitive
*/
class RedisExtIntegrationTest extends TestCase
{
private $redis;
private $connection;
protected function setUp(): void
{
if (!getenv('MESSENGER_REDIS_DSN')) {
$this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.');
}
$this->redis = new \Redis();
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
$this->connection->cleanup();
$this->connection->setup();
}
public function testConnectionSendAndGet()
{
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}
public function testGetTheFirstAvailableMessage()
{
$this->connection->add('{"message": "Hi1"}', ['type' => DummyMessage::class]);
$this->connection->add('{"message": "Hi2"}', ['type' => DummyMessage::class]);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi1"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi2"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}
public function testConnectionSendWithSameContent()
{
$body = '{"message": "Hi"}';
$headers = ['type' => DummyMessage::class];
$this->connection->add($body, $headers);
$this->connection->add($body, $headers);
$encoded = $this->connection->get();
$this->assertEquals($body, $encoded['body']);
$this->assertEquals($headers, $encoded['headers']);
$encoded = $this->connection->get();
$this->assertEquals($body, $encoded['body']);
$this->assertEquals($headers, $encoded['headers']);
}
public function testConnectionSendAndGetDelayed()
{
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class], 500);
$encoded = $this->connection->get();
$this->assertNull($encoded);
sleep(2);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}
public function testConnectionSendDelayedMessagesWithSameContent()
{
$body = '{"message": "Hi"}';
$headers = ['type' => DummyMessage::class];
$this->connection->add($body, $headers, 500);
$this->connection->add($body, $headers, 500);
sleep(2);
$encoded = $this->connection->get();
$this->assertEquals($body, $encoded['body']);
$this->assertEquals($headers, $encoded['headers']);
$encoded = $this->connection->get();
$this->assertEquals($body, $encoded['body']);
$this->assertEquals($headers, $encoded['headers']);
}
}

View File

@@ -0,0 +1,76 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Tests\Transport;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Bridge\Redis\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection;
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisReceiver;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
class RedisReceiverTest extends TestCase
{
public function testItReturnsTheDecodedMessageToTheHandler()
{
$serializer = $this->createSerializer();
$redisEnvelop = $this->createRedisEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($redisEnvelop);
$receiver = new RedisReceiver($connection, $serializer);
$actualEnvelopes = $receiver->get();
$this->assertCount(1, $actualEnvelopes);
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
}
public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
{
$this->expectException(MessageDecodingFailedException::class);
$serializer = $this->createMock(PhpSerializer::class);
$serializer->method('decode')->willThrowException(new MessageDecodingFailedException());
$redisEnvelop = $this->createRedisEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($redisEnvelop);
$connection->expects($this->once())->method('reject');
$receiver = new RedisReceiver($connection, $serializer);
$receiver->get();
}
private function createRedisEnvelope(): array
{
return [
'id' => 1,
'body' => '{"message": "Hi"}',
'headers' => [
'type' => DummyMessage::class,
],
];
}
private function createSerializer(): Serializer
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
return $serializer;
}
}

View File

@@ -0,0 +1,39 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Tests\Transport;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Bridge\Redis\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection;
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisSender;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
class RedisSenderTest extends TestCase
{
public function testSend()
{
$envelope = new Envelope(new DummyMessage('Oy'));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->getMock();
$connection->expects($this->once())->method('add')->with($encoded['body'], $encoded['headers']);
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
$sender = new RedisSender($connection, $serializer);
$sender->send($envelope);
}
}

View File

@@ -0,0 +1,42 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Tests\Transport;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection;
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransport;
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/**
* @requires extension redis
*/
class RedisTransportFactoryTest extends TestCase
{
public function testSupportsOnlyRedisTransports()
{
$factory = new RedisTransportFactory();
$this->assertTrue($factory->supports('redis://localhost', []));
$this->assertFalse($factory->supports('sqs://localhost', []));
$this->assertFalse($factory->supports('invalid-dsn', []));
}
public function testCreateTransport()
{
$factory = new RedisTransportFactory();
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
$expectedTransport = new RedisTransport(Connection::fromDsn('redis://localhost', ['foo' => 'bar']), $serializer);
$this->assertEquals($expectedTransport, $factory->createTransport('redis://localhost', ['foo' => 'bar'], $serializer));
}
}

View File

@@ -0,0 +1,60 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Tests\Transport;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Bridge\Redis\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection;
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransport;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
class RedisTransportTest extends TestCase
{
public function testItIsATransport()
{
$transport = $this->getTransport();
$this->assertInstanceOf(TransportInterface::class, $transport);
}
public function testReceivesMessages()
{
$transport = $this->getTransport(
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(),
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock()
);
$decodedMessage = new DummyMessage('Decoded.');
$redisEnvelope = [
'id' => '5',
'body' => 'body',
'headers' => ['my' => 'header'],
];
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('get')->willReturn($redisEnvelope);
$envelopes = $transport->get();
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
}
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null): RedisTransport
{
$serializer = $serializer ?: $this->getMockBuilder(SerializerInterface::class)->getMock();
$connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
return new RedisTransport($connection, $serializer);
}
}

329
Transport/Connection.php Normal file
View File

@@ -0,0 +1,329 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\TransportException;
/**
* A Redis connection.
*
* @author Alexander Schranz <alexander@sulu.io>
* @author Antoine Bluchet <soyuka@gmail.com>
* @author Robin Chalas <robin.chalas@gmail.com>
*
* @internal
* @final
*/
class Connection
{
private const DEFAULT_OPTIONS = [
'stream' => 'messages',
'group' => 'symfony',
'consumer' => 'consumer',
'auto_setup' => true,
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
'dbindex' => 0,
];
private $connection;
private $stream;
private $queue;
private $group;
private $consumer;
private $autoSetup;
private $maxEntries;
private $couldHavePendingMessages = true;
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
{
if (version_compare(phpversion('redis'), '4.3.0', '<')) {
throw new LogicException('The redis transport requires php-redis 4.3.0 or higher.');
}
$this->connection = $redis ?: new \Redis();
$this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379);
$this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP);
if (isset($connectionCredentials['auth']) && !$this->connection->auth($connectionCredentials['auth'])) {
throw new InvalidArgumentException(sprintf('Redis connection failed: %s', $redis->getLastError()));
}
if (($dbIndex = $configuration['dbindex'] ?? self::DEFAULT_OPTIONS['dbindex']) && !$this->connection->select($dbIndex)) {
throw new InvalidArgumentException(sprintf('Redis connection failed: %s', $redis->getLastError()));
}
$this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
$this->queue = $this->stream.'__queue';
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
}
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
{
$url = $dsn;
if (preg_match('#^redis:///([^:@])+$#', $dsn)) {
$url = str_replace('redis:', 'file:', $dsn);
}
if (false === $parsedUrl = parse_url($url)) {
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
}
if (isset($parsedUrl['query'])) {
parse_str($parsedUrl['query'], $redisOptions);
}
$autoSetup = null;
if (\array_key_exists('auto_setup', $redisOptions)) {
$autoSetup = filter_var($redisOptions['auto_setup'], FILTER_VALIDATE_BOOLEAN);
unset($redisOptions['auto_setup']);
}
$maxEntries = null;
if (\array_key_exists('stream_max_entries', $redisOptions)) {
$maxEntries = filter_var($redisOptions['stream_max_entries'], FILTER_VALIDATE_INT);
unset($redisOptions['stream_max_entries']);
}
$dbIndex = null;
if (\array_key_exists('dbindex', $redisOptions)) {
$dbIndex = filter_var($redisOptions['dbindex'], FILTER_VALIDATE_INT);
unset($redisOptions['dbindex']);
}
$configuration = [
'stream' => $redisOptions['stream'] ?? null,
'group' => $redisOptions['group'] ?? null,
'consumer' => $redisOptions['consumer'] ?? null,
'auto_setup' => $autoSetup,
'stream_max_entries' => $maxEntries,
'dbindex' => $dbIndex,
];
if (isset($parsedUrl['host'])) {
$connectionCredentials = [
'host' => $parsedUrl['host'] ?? '127.0.0.1',
'port' => $parsedUrl['port'] ?? 6379,
'auth' => $parsedUrl['pass'] ?? $parsedUrl['user'] ?? null,
];
$pathParts = explode('/', $parsedUrl['path'] ?? '');
$configuration['stream'] = $pathParts[1] ?? $configuration['stream'];
$configuration['group'] = $pathParts[2] ?? $configuration['group'];
$configuration['consumer'] = $pathParts[3] ?? $configuration['consumer'];
} else {
$connectionCredentials = [
'host' => $parsedUrl['path'],
'port' => 0,
];
}
return new self($configuration, $connectionCredentials, $redisOptions, $redis);
}
public function get(): ?array
{
if ($this->autoSetup) {
$this->setup();
}
try {
$queuedMessageCount = $this->connection->zcount($this->queue, 0, $this->getCurrentTimeInMilliseconds());
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if ($queuedMessageCount) {
for ($i = 0; $i < $queuedMessageCount; ++$i) {
try {
$queuedMessages = $this->connection->zpopmin($this->queue, 1);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
foreach ($queuedMessages as $queuedMessage => $time) {
$queuedMessage = json_decode($queuedMessage, true);
// if a futured placed message is actually popped because of a race condition with
// another running message consumer, the message is readded to the queue by add function
// else its just added stream and will be available for all stream consumers
$this->add(
$queuedMessage['body'],
$queuedMessage['headers'],
$time - $this->getCurrentTimeInMilliseconds()
);
}
}
}
$messageId = '>'; // will receive new messages
if ($this->couldHavePendingMessages) {
$messageId = '0'; // will receive consumers pending messages
}
try {
$messages = $this->connection->xreadgroup(
$this->group,
$this->consumer,
[$this->stream => $messageId],
1
);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if (false === $messages) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? 'Could not read messages from the redis stream.');
}
if ($this->couldHavePendingMessages && empty($messages[$this->stream])) {
$this->couldHavePendingMessages = false;
// No pending messages so get a new one
return $this->get();
}
foreach ($messages[$this->stream] ?? [] as $key => $message) {
$redisEnvelope = json_decode($message['message'], true);
return [
'id' => $key,
'body' => $redisEnvelope['body'],
'headers' => $redisEnvelope['headers'],
];
}
return null;
}
public function ack(string $id): void
{
try {
$acknowledged = $this->connection->xack($this->stream, $this->group, [$id]);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if (!$acknowledged) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? sprintf('Could not acknowledge redis message "%s".', $id));
}
}
public function reject(string $id): void
{
try {
$deleted = $this->connection->xack($this->stream, $this->group, [$id]);
$deleted = $this->connection->xdel($this->stream, [$id]) && $deleted;
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if (!$deleted) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? sprintf('Could not delete message "%s" from the redis stream.', $id));
}
}
public function add(string $body, array $headers, int $delayInMs = 0): void
{
if ($this->autoSetup) {
$this->setup();
}
try {
if ($delayInMs > 0) { // the delay could be smaller 0 in a queued message
$message = json_encode([
'body' => $body,
'headers' => $headers,
// Entry need to be unique in the sorted set else it would only be added once to the delayed messages queue
'uniqid' => uniqid('', true),
]);
if (false === $message) {
throw new TransportException(json_last_error_msg());
}
$score = (int) ($this->getCurrentTimeInMilliseconds() + $delayInMs);
$added = $this->connection->zadd($this->queue, ['NX'], $score, $message);
} else {
$message = json_encode([
'body' => $body,
'headers' => $headers,
]);
if (false === $message) {
throw new TransportException(json_last_error_msg());
}
if ($this->maxEntries) {
$added = $this->connection->xadd($this->stream, '*', ['message' => $message], $this->maxEntries, true);
} else {
$added = $this->connection->xadd($this->stream, '*', ['message' => $message]);
}
}
} catch (\RedisException $e) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? $e->getMessage(), 0, $e);
}
if (!$added) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? 'Could not add a message to the redis stream.');
}
}
public function setup(): void
{
try {
$this->connection->xgroup('CREATE', $this->stream, $this->group, 0, true);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
// group might already exist, ignore
if ($this->connection->getLastError()) {
$this->connection->clearLastError();
}
$this->autoSetup = false;
}
private function getCurrentTimeInMilliseconds(): int
{
return (int) (microtime(true) * 1000);
}
public function cleanup(): void
{
$this->connection->del($this->stream);
$this->connection->del($this->queue);
}
}
class_alias(Connection::class, \Symfony\Component\Messenger\Transport\RedisExt\Connection::class);

View File

@@ -0,0 +1,33 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
/**
* @author Alexander Schranz <alexander@sulu.io>
*/
class RedisReceivedStamp implements NonSendableStampInterface
{
private $id;
public function __construct(string $id)
{
$this->id = $id;
}
public function getId(): string
{
return $this->id;
}
}
class_alias(RedisReceivedStamp::class, \Symfony\Component\Messenger\Transport\RedisExt\RedisReceivedStamp::class);

View File

@@ -0,0 +1,89 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/**
* @author Alexander Schranz <alexander@sulu.io>
* @author Antoine Bluchet <soyuka@gmail.com>
*/
class RedisReceiver implements ReceiverInterface
{
private $connection;
private $serializer;
public function __construct(Connection $connection, SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? new PhpSerializer();
}
/**
* {@inheritdoc}
*/
public function get(): iterable
{
$redisEnvelope = $this->connection->get();
if (null === $redisEnvelope) {
return [];
}
try {
$envelope = $this->serializer->decode([
'body' => $redisEnvelope['body'],
'headers' => $redisEnvelope['headers'],
]);
} catch (MessageDecodingFailedException $exception) {
$this->connection->reject($redisEnvelope['id']);
throw $exception;
}
return [$envelope->with(new RedisReceivedStamp($redisEnvelope['id']))];
}
/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
$this->connection->ack($this->findRedisReceivedStamp($envelope)->getId());
}
/**
* {@inheritdoc}
*/
public function reject(Envelope $envelope): void
{
$this->connection->reject($this->findRedisReceivedStamp($envelope)->getId());
}
private function findRedisReceivedStamp(Envelope $envelope): RedisReceivedStamp
{
/** @var RedisReceivedStamp|null $redisReceivedStamp */
$redisReceivedStamp = $envelope->last(RedisReceivedStamp::class);
if (null === $redisReceivedStamp) {
throw new LogicException('No RedisReceivedStamp found on the Envelope.');
}
return $redisReceivedStamp;
}
}
class_alias(RedisReceiver::class, \Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver::class);

50
Transport/RedisSender.php Normal file
View File

@@ -0,0 +1,50 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/**
* @author Alexander Schranz <alexander@sulu.io>
* @author Antoine Bluchet <soyuka@gmail.com>
*/
class RedisSender implements SenderInterface
{
private $connection;
private $serializer;
public function __construct(Connection $connection, SerializerInterface $serializer)
{
$this->connection = $connection;
$this->serializer = $serializer;
}
/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);
/** @var DelayStamp|null $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
$delayInMs = null !== $delayStamp ? $delayStamp->getDelay() : 0;
$this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs);
return $envelope;
}
}
class_alias(RedisSender::class, \Symfony\Component\Messenger\Transport\RedisExt\RedisSender::class);

View File

@@ -0,0 +1,87 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @author Alexander Schranz <alexander@sulu.io>
* @author Antoine Bluchet <soyuka@gmail.com>
*/
class RedisTransport implements TransportInterface, SetupableTransportInterface
{
private $serializer;
private $connection;
private $receiver;
private $sender;
public function __construct(Connection $connection, SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? new PhpSerializer();
}
/**
* {@inheritdoc}
*/
public function get(): iterable
{
return ($this->receiver ?? $this->getReceiver())->get();
}
/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
($this->receiver ?? $this->getReceiver())->ack($envelope);
}
/**
* {@inheritdoc}
*/
public function reject(Envelope $envelope): void
{
($this->receiver ?? $this->getReceiver())->reject($envelope);
}
/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): Envelope
{
return ($this->sender ?? $this->getSender())->send($envelope);
}
/**
* {@inheritdoc}
*/
public function setup(): void
{
$this->connection->setup();
}
private function getReceiver(): RedisReceiver
{
return $this->receiver = new RedisReceiver($this->connection, $this->serializer);
}
private function getSender(): RedisSender
{
return $this->sender = new RedisSender($this->connection, $this->serializer);
}
}
class_alias(RedisTransport::class, \Symfony\Component\Messenger\Transport\RedisExt\RedisTransport::class);

View File

@@ -0,0 +1,36 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @author Alexander Schranz <alexander@suluio>
* @author Antoine Bluchet <soyuka@gmail.com>
*/
class RedisTransportFactory implements TransportFactoryInterface
{
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
unset($options['transport_name']);
return new RedisTransport(Connection::fromDsn($dsn, $options), $serializer);
}
public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'redis://');
}
}
class_alias(RedisTransportFactory::class, \Symfony\Component\Messenger\Transport\RedisExt\RedisTransportFactory::class);

38
composer.json Normal file
View File

@@ -0,0 +1,38 @@
{
"name": "symfony/redis-messenger",
"type": "symfony-bridge",
"description": "Symfony Redis extension Messenger Bridge",
"keywords": [],
"homepage": "https://symfony.com",
"license": "MIT",
"authors": [
{
"name": "Fabien Potencier",
"email": "fabien@symfony.com"
},
{
"name": "Symfony Community",
"homepage": "https://symfony.com/contributors"
}
],
"require": {
"php": "^7.2.5",
"symfony/messenger": "^5.1"
},
"require-dev": {
"symfony/property-access": "^4.4|^5.0",
"symfony/serializer": "^4.4|^5.0"
},
"autoload": {
"psr-4": { "Symfony\\Component\\Messenger\\Bridge\\Redis\\": "" },
"exclude-from-classmap": [
"/Tests/"
]
},
"minimum-stability": "dev",
"extra": {
"branch-alias": {
"dev-master": "5.1-dev"
}
}
}

30
phpunit.xml.dist Normal file
View File

@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="http://schema.phpunit.de/5.2/phpunit.xsd"
backupGlobals="false"
colors="true"
bootstrap="vendor/autoload.php"
failOnRisky="true"
failOnWarning="true"
>
<php>
<ini name="error_reporting" value="-1" />
</php>
<testsuites>
<testsuite name="Symfony Redis Messenger Component Test Suite">
<directory>./Tests/</directory>
</testsuite>
</testsuites>
<filter>
<whitelist>
<directory>./</directory>
<exclude>
<directory>./Tests</directory>
<directory>./vendor</directory>
</exclude>
</whitelist>
</filter>
</phpunit>