diff --git a/UPGRADE-8.1.md b/UPGRADE-8.1.md index dd6cae645e4..ae7f0aec1d3 100644 --- a/UPGRADE-8.1.md +++ b/UPGRADE-8.1.md @@ -94,6 +94,7 @@ Messenger custom serializers that still throw are supported via a BC fallback in receivers * Receivers no longer delete messages from the queue on decode failure; they are routed through the normal retry/failure transport path instead + * Add argument `$fetchSize` to `ReceiverInterface::get()` and `QueueReceiverInterface::getFromQueues()` Security -------- diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php index 814e8472c8a..013baaa83b5 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php @@ -53,14 +53,14 @@ class AmazonSqsIntegrationTest extends TestCase usleep(5000); } - $this->assertEquals('{"message": "Hi"}', $encoded['body']); - $this->assertEquals(['type' => DummyMessage::class, DummyMessage::class => 'special'], $encoded['headers']); + $this->assertEquals('{"message": "Hi"}', $encoded[0]['body']); + $this->assertEquals(['type' => DummyMessage::class, DummyMessage::class => 'special'], $encoded[0]['headers']); $this->waitUntilElapsed(seconds: 1.0, since: $messageSentAt); - $connection->keepalive($encoded['id']); + $connection->keepalive($encoded[0]['id']); $this->waitUntilElapsed(seconds: 2.0, since: $messageSentAt); $this->assertSame(0, $connection->getMessageCount(), 'The queue should be empty since visibility timeout was extended'); - $connection->delete($encoded['id']); + $connection->delete($encoded[0]['id']); } private function waitUntilElapsed(float $seconds, float $since): void diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsReceiverTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsReceiverTest.php index cb241c7d100..086403c184a 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsReceiverTest.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsReceiverTest.php @@ -32,7 +32,7 @@ class AmazonSqsReceiverTest extends TestCase $sqsEnvelop = $this->createSqsEnvelope(); $connection = $this->createStub(Connection::class); - $connection->method('get')->willReturn($sqsEnvelop); + $connection->method('get')->willReturn([$sqsEnvelop]); $receiver = new AmazonSqsReceiver($connection, $serializer); $actualEnvelopes = iterator_to_array($receiver->get()); @@ -40,6 +40,44 @@ class AmazonSqsReceiverTest extends TestCase $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); } + public function testGetUsesFetchSizeWhenProvided() + { + $serializer = $this->createSerializer(); + + $sqsEnvelope = $this->createSqsEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->expects($this->once())->method('get')->with(7)->willReturn([$sqsEnvelope]); + + $receiver = new AmazonSqsReceiver($connection, $serializer); + $actualEnvelopes = iterator_to_array($receiver->get(7)); + + $this->assertCount(1, $actualEnvelopes); + } + + public function testItReturnsMultipleDecodedMessagesWhenAvailable() + { + $serializer = $this->createSerializer(); + + $connection = $this->createStub(Connection::class); + $connection->method('get')->willReturn([ + $this->createSqsEnvelope(), + [ + 'id' => 2, + 'body' => '{"message": "Hello"}', + 'headers' => [ + 'type' => DummyMessage::class, + ], + ], + ]); + + $receiver = new AmazonSqsReceiver($connection, $serializer); + $actualEnvelopes = iterator_to_array($receiver->get(2)); + + $this->assertCount(2, $actualEnvelopes); + $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); + $this->assertEquals(new DummyMessage('Hello'), $actualEnvelopes[1]->getMessage()); + } + public function testItReturnsSerializedEnvelopeWhenDecodingFails() { $serializer = $this->createStub(PhpSerializer::class); @@ -47,7 +85,7 @@ class AmazonSqsReceiverTest extends TestCase $sqsEnvelop = $this->createSqsEnvelope(); $connection = $this->createStub(Connection::class); - $connection->method('get')->willReturn($sqsEnvelop); + $connection->method('get')->willReturn([$sqsEnvelop]); $receiver = new AmazonSqsReceiver($connection, $serializer); $envelopes = iterator_to_array($receiver->get()); diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportTest.php index 698f276d48a..fe82a6b6fab 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportTest.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportTest.php @@ -54,7 +54,7 @@ class AmazonSqsTransportTest extends TestCase ]; $serializer->expects($this->once())->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage)); - $connection->method('get')->willReturn($sqsEnvelope); + $connection->method('get')->willReturn([$sqsEnvelope]); $envelopes = iterator_to_array($transport->get()); $this->assertSame($decodedMessage, $envelopes[0]->getMessage()); diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php index 36e769bcdaf..c77ad26a5c0 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php @@ -280,6 +280,48 @@ class ConnectionTest extends TestCase $this->assertNull($connection->get()); } + public function testGetUsesMaxOfFetchSizeAndConfiguredBufferSize() + { + $client = $this->createMock(SqsClient::class); + $client + ->method('getQueueUrl') + ->willReturnMap([ + [['QueueName' => 'queue', 'QueueOwnerAWSAccountId' => 123], ResultMockFactory::create(GetQueueUrlResult::class, ['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue'])], + ]); + + $firstResult = ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [ + new Message(['MessageId' => 1, 'Body' => 'this is a test']), + ]]); + $secondResult = ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => []]); + + $series = [ + [[['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue', + 'VisibilityTimeout' => null, + 'MaxNumberOfMessages' => 12, + 'MessageAttributeNames' => ['All'], + 'WaitTimeSeconds' => 20]], $firstResult], + [[['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue', + 'VisibilityTimeout' => null, + 'MaxNumberOfMessages' => 12, + 'MessageAttributeNames' => ['All'], + 'WaitTimeSeconds' => 20]], $secondResult], + ]; + + $client->expects($this->exactly(2)) + ->method('receiveMessage') + ->willReturnCallback(function (...$args) use (&$series) { + [$expectedArgs, $return] = array_shift($series); + $this->assertSame($expectedArgs, $args); + + return $return; + }) + ; + + $connection = new Connection(['queue_name' => 'queue', 'account' => 123, 'auto_setup' => false, 'buffer_size' => 9], $client); + $this->assertNotNull($connection->get(12)); + $this->assertNull($connection->get(12)); + } + public function testUnexpectedSqsError() { $this->expectException(HttpException::class); diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php index de989334f33..1c96f010b1f 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php @@ -36,28 +36,35 @@ class AmazonSqsReceiver implements KeepaliveReceiverInterface, MessageCountAware $this->serializer = $serializer ?? new PhpSerializer(); } - public function get(): iterable + /** + * @param int $fetchSize + */ + public function get(/* int $fetchSize = 1 */): iterable { + $fetchSize = \func_num_args() > 0 ? max(1, func_get_arg(0)) : 1; + try { - if (!$sqsEnvelope = $this->connection->get()) { + if (!$sqsEnvelopes = $this->connection->get($fetchSize)) { return; } } catch (HttpException $e) { throw new TransportException($e->getMessage(), 0, $e); } - $stamps = [ - new AmazonSqsReceivedStamp($sqsEnvelope['id']), - new TransportMessageIdStamp($sqsEnvelope['id']), - ]; + foreach ($sqsEnvelopes as $sqsEnvelope) { + $stamps = [ + new AmazonSqsReceivedStamp($sqsEnvelope['id']), + new TransportMessageIdStamp($sqsEnvelope['id']), + ]; - try { - yield $this->serializer->decode($sqsEnvelope = [ - 'body' => $sqsEnvelope['body'], - 'headers' => $sqsEnvelope['headers'], - ])->with(...$stamps); - } catch (MessageDecodingFailedException $e) { - yield MessageDecodingFailedException::wrap($sqsEnvelope, $e->getMessage(), $e->getCode(), $e)->with(...$stamps); + try { + yield $this->serializer->decode($sqsEnvelope = [ + 'body' => $sqsEnvelope['body'], + 'headers' => $sqsEnvelope['headers'], + ])->with(...$stamps); + } catch (MessageDecodingFailedException $e) { + yield MessageDecodingFailedException::wrap($sqsEnvelope, $e->getMessage(), $e->getCode(), $e)->with(...$stamps); + } } } diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php index df36c9d3a89..0e7dba655ee 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php @@ -43,9 +43,14 @@ class AmazonSqsTransport implements TransportInterface, KeepaliveReceiverInterfa $this->serializer = $serializer ?? new PhpSerializer(); } - public function get(): iterable + /** + * @param int $fetchSize + */ + public function get(/* int $fetchSize = 1 */): iterable { - return $this->getReceiver()->get(); + $fetchSize = \func_num_args() > 0 ? func_get_arg(0) : 1; + + return $this->getReceiver()->get($fetchSize); } public function ack(Envelope $envelope): void diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php index e014103e300..927c5724d53 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php @@ -99,7 +99,7 @@ class Connection * * access_key: AWS access key * * secret_key: AWS secret key * * session_token: AWS session token (required only when using temporary credentials) - * * buffer_size: number of messages to prefetch (Default: 9) + * * buffer_size: number of messages to prefetch (Default: 9, Max: 10) * * wait_time: long polling duration in seconds (Default: 20) * * poll_timeout: amount of seconds the transport should wait for new message * * visibility_timeout: amount of seconds the message won't be visible @@ -188,61 +188,58 @@ class Connection return new self($configuration, new SqsClient($clientConfiguration, null, $client, $logger), $queueUrl); } - public function get(): ?array + public function get(int $fetchSize = 1): ?array { if ($this->configuration['auto_setup']) { $this->setup(); } - foreach ($this->getNextMessages() as $message) { - return $message; + $fetchSize = max(1, $fetchSize); + $messages = $this->getPendingMessages($fetchSize); + + if (\count($messages) < $fetchSize + && $this->fetchMessages(max($fetchSize, $this->configuration['buffer_size'])) + ) { + $messages = [...$messages, ...$this->getPendingMessages($fetchSize - \count($messages))]; } - return null; + return $messages ?: null; } /** - * @return \Generator + * @return list */ - private function getNextMessages(): \Generator + private function getPendingMessages(int $fetchSize): array { - yield from $this->getPendingMessages(); - yield from $this->getNewMessages(); - } + $messages = []; - /** - * @return \Generator - */ - private function getPendingMessages(): \Generator - { - while ($this->buffer) { - yield array_shift($this->buffer); + while ($fetchSize-- > 0 && $this->buffer) { + $messages[] = array_shift($this->buffer); } + + return $messages; } - /** - * @return \Generator - */ - private function getNewMessages(): \Generator + private function fetchMessages(int $fetchSize): bool { if (null === $this->currentResponse) { $this->currentResponse = $this->client->receiveMessage([ 'QueueUrl' => $this->getQueueUrl(), 'VisibilityTimeout' => $this->configuration['visibility_timeout'], - 'MaxNumberOfMessages' => $this->configuration['buffer_size'], + 'MaxNumberOfMessages' => min($fetchSize, 10), // SQS limitation 'MessageAttributeNames' => ['All'], 'WaitTimeSeconds' => $this->configuration['wait_time'], ]); } - if (!$this->fetchMessage()) { - return; + if (!$this->fetchPendingMessages()) { + return false; } - yield from $this->getPendingMessages(); + return true; } - private function fetchMessage(): bool + private function fetchPendingMessages(): bool { if (!$this->currentResponse->resolve($this->configuration['poll_timeout'])) { return false; @@ -416,13 +413,13 @@ class Connection { if (null !== $this->currentResponse) { // fetch current response in order to requeue in transit messages - if (!$this->fetchMessage()) { + if (!$this->fetchPendingMessages()) { $this->currentResponse->cancel(); $this->currentResponse = null; } } - foreach ($this->getPendingMessages() as $message) { + foreach ($this->getPendingMessages(\count($this->buffer)) as $message) { $this->client->changeMessageVisibility([ 'QueueUrl' => $this->getQueueUrl(), 'ReceiptHandle' => $message['id'], diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php index 481c2e2e128..2ab7f9758da 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php @@ -49,6 +49,39 @@ class AmqpReceiverTest extends TestCase $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); } + public function testGetAcceptsFetchSize() + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + $amqpEnvelope = $this->createAMQPEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->method('getQueueNames')->willReturn(['queueName']); + $connection->expects($this->exactly(2))->method('get')->with('queueName')->willReturnOnConsecutiveCalls($amqpEnvelope, null); + + $receiver = new AmqpReceiver($connection, $serializer); + $actualEnvelopes = iterator_to_array($receiver->get(7)); + + $this->assertCount(1, $actualEnvelopes); + } + + public function testGetFromQueuesAcceptsFetchSize() + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + $amqpEnvelope = $this->createAMQPEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->expects($this->exactly(2))->method('get')->with('queueName')->willReturnOnConsecutiveCalls($amqpEnvelope, null); + + $receiver = new AmqpReceiver($connection, $serializer); + $actualEnvelopes = iterator_to_array($receiver->getFromQueues(['queueName'], 7)); + + $this->assertCount(1, $actualEnvelopes); + } + public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() { $this->expectException(TransportException::class); @@ -158,10 +191,10 @@ class AmqpReceiverTest extends TestCase $this->assertInstanceOf(MessageDecodingFailedException::class, $envelopes[0]->getMessage()); } - private function createAMQPEnvelope(?string $messageId = null): \AMQPEnvelope + private function createAMQPEnvelope(?string $messageId = null, string $body = '{"message": "Hi"}'): \AMQPEnvelope { $envelope = $this->createStub(\AMQPEnvelope::class); - $envelope->method('getBody')->willReturn('{"message": "Hi"}'); + $envelope->method('getBody')->willReturn($body); $envelope->method('getHeaders')->willReturn([ 'type' => DummyMessage::class, ]); diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php index 3b3c5f10207..d5699c58a0d 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php @@ -37,19 +37,52 @@ class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface $this->serializer = $serializer ?? new PhpSerializer(); } - public function get(): iterable + /** + * @param int $fetchSize + */ + public function get(/* int $fetchSize = 1 */): iterable { - yield from $this->getFromQueues($this->connection->getQueueNames()); + $fetchSize = \func_num_args() > 0 ? max(1, func_get_arg(0)) : 1; + + yield from $this->getFromQueues($this->connection->getQueueNames(), $fetchSize); } - public function getFromQueues(array $queueNames): iterable + /** + * @param int $fetchSize + */ + public function getFromQueues(array $queueNames/* , int $fetchSize = 1 */): iterable { - foreach ($queueNames as $queueName) { - yield from $this->getEnvelope($queueName); + $fetchSize = \func_num_args() > 1 ? max(1, func_get_arg(1)) : 1; + $remaining = $fetchSize; + $activeQueues = array_values($queueNames); + $firstRound = true; + + while ($activeQueues && ($remaining > 0 || $firstRound)) { + $exhausted = []; + + foreach ($activeQueues as $i => $queueName) { + if (null === $envelope = $this->getEnvelope($queueName)) { + $exhausted[] = $i; + continue; + } + + yield $envelope; + --$remaining; + + if ($remaining <= 0 && !$firstRound) { + return; + } + } + + $firstRound = false; + + foreach (array_reverse($exhausted) as $i) { + array_splice($activeQueues, $i, 1); + } } } - private function getEnvelope(string $queueName): iterable + private function getEnvelope(string $queueName): ?Envelope { try { $amqpEnvelope = $this->connection->get($queueName); @@ -68,7 +101,7 @@ class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface } if (null === $amqpEnvelope) { - return; + return null; } $body = $amqpEnvelope->getBody(); @@ -78,13 +111,15 @@ class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface ...($id ? [new TransportMessageIdStamp($id)] : []), ]; + $data = [ + 'body' => false === $body ? '' : $body, + 'headers' => $amqpEnvelope->getHeaders(), + ]; + try { - yield $this->serializer->decode($data = [ - 'body' => false === $body ? '' : $body, // workaround https://github.com/pdezwart/php-amqp/issues/351 - 'headers' => $amqpEnvelope->getHeaders(), - ])->withoutAll(TransportMessageIdStamp::class)->with(...$stamps); + return $this->serializer->decode($data)->withoutAll(TransportMessageIdStamp::class)->with(...$stamps); } catch (MessageDecodingFailedException $e) { - yield MessageDecodingFailedException::wrap($data, $e->getMessage(), $e->getCode(), $e)->with(...$stamps); + return MessageDecodingFailedException::wrap($data, $e->getMessage(), $e->getCode(), $e)->with(...$stamps); } } diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php index 0d8ff8b6c5b..bcea81fe828 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php @@ -36,14 +36,24 @@ class AmqpTransport implements QueueReceiverInterface, TransportInterface, Setup $this->serializer = $serializer ?? new PhpSerializer(); } - public function get(): iterable + /** + * @param int $fetchSize + */ + public function get(/* int $fetchSize = 1 */): iterable { - return $this->getReceiver()->get(); + $fetchSize = \func_num_args() > 0 ? func_get_arg(0) : 1; + + return $this->getReceiver()->get($fetchSize); } - public function getFromQueues(array $queueNames): iterable + /** + * @param int $fetchSize + */ + public function getFromQueues(array $queueNames/* , int $fetchSize = 1 */): iterable { - return $this->getReceiver()->getFromQueues($queueNames); + $fetchSize = \func_num_args() > 1 ? func_get_arg(1) : 1; + + return $this->getReceiver()->getFromQueues($queueNames, $fetchSize); } public function ack(Envelope $envelope): void diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php index 933e4926f0d..d742f387708 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php @@ -35,7 +35,10 @@ class BeanstalkdReceiver implements KeepaliveReceiverInterface, MessageCountAwar $this->serializer = $serializer ?? new PhpSerializer(); } - public function get(): iterable + /** + * @param int $fetchSize + */ + public function get(/* int $fetchSize = 1 */): iterable { if (!$beanstalkdEnvelope = $this->connection->get()) { return; diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php index 6400e156538..de455a75715 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php @@ -34,9 +34,14 @@ class BeanstalkdTransport implements TransportInterface, KeepaliveReceiverInterf $this->serializer = $serializer ?? new PhpSerializer(); } - public function get(): iterable + /** + * @param int $fetchSize + */ + public function get(/* int $fetchSize = 1 */): iterable { - return $this->getReceiver()->get(); + $fetchSize = \func_num_args() > 0 ? func_get_arg(0) : 1; + + return $this->getReceiver()->get($fetchSize); } public function ack(Envelope $envelope): void diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php index 97d11d2b857..1c426636b33 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php @@ -43,7 +43,7 @@ class ConnectionTest extends TestCase { $queryBuilder = $this->getQueryBuilderStub(); $driverConnection = $this->getDBALConnection(); - $stmt = $this->getResultMock([ + $stmt = $this->getGetResultMock([ 'id' => 1, 'body' => '{"message":"Hi"}', 'headers' => json_encode(['type' => DummyMessage::class]), @@ -70,16 +70,16 @@ class ConnectionTest extends TestCase $connection = new Connection([], $driverConnection); $doctrineEnvelope = $connection->get(); - $this->assertEquals(1, $doctrineEnvelope['id']); - $this->assertEquals('{"message":"Hi"}', $doctrineEnvelope['body']); - $this->assertEquals(['type' => DummyMessage::class], $doctrineEnvelope['headers']); + $this->assertEquals(1, $doctrineEnvelope[0]['id']); + $this->assertEquals('{"message":"Hi"}', $doctrineEnvelope[0]['body']); + $this->assertEquals(['type' => DummyMessage::class], $doctrineEnvelope[0]['headers']); } public function testGetWithNoPendingMessageWillReturnNull() { $queryBuilder = $this->getQueryBuilderStub(); $driverConnection = $this->getDBALConnection(true); - $stmt = $this->getResultMock(false); + $stmt = $this->getGetResultMock(false); $queryBuilder ->method('getParameters') @@ -108,7 +108,7 @@ class ConnectionTest extends TestCase { $queryBuilder = $this->getQueryBuilderMock(); $driverConnection = $this->getDBALConnection(true); - $stmt = $this->getResultMock(false); + $stmt = $this->getGetResultMock(false); $queryBuilder ->method('getParameters') @@ -420,7 +420,18 @@ class ConnectionTest extends TestCase return $queryBuilder; } - private function getResultMock($expectedResult): Result&MockObject + private function getGetResultMock($expectedResult): Result&MockObject + { + $stmt = $this->createMock(Result::class); + + $stmt->expects($this->once()) + ->method('fetchAllAssociative') + ->willReturn(false === $expectedResult ? [] : [$expectedResult]); + + return $stmt; + } + + private function getFindResultMock($expectedResult): Result&MockObject { $stmt = $this->createMock(Result::class); @@ -534,7 +545,7 @@ class ConnectionTest extends TestCase $queryBuilder = $this->getQueryBuilderMock(); $driverConnection = $this->getDBALConnection(); $id = 1; - $stmt = $this->getResultMock([ + $stmt = $this->getFindResultMock([ 'id' => $id, 'body' => '{"message":"Hi"}', 'headers' => json_encode(['type' => DummyMessage::class]), @@ -624,7 +635,7 @@ class ConnectionTest extends TestCase $driverConnection->method('createQueryBuilder')->willReturnCallback(static fn () => new QueryBuilder($driverConnection)); $result = $this->createStub(Result::class); - $result->method('fetchAssociative')->willReturn(false); + $result->method('fetchAllAssociative')->willReturn([]); $driverConnection->expects($this->once())->method('beginTransaction'); $driverConnection diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineIntegrationTest.php index a84d0462846..c944cb9c53e 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineIntegrationTest.php @@ -46,8 +46,8 @@ class DoctrineIntegrationTest extends TestCase { $this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); $encoded = $this->connection->get(); - $this->assertEquals('{"message": "Hi"}', $encoded['body']); - $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); + $this->assertEquals('{"message": "Hi"}', $encoded[0]['body']); + $this->assertEquals(['type' => DummyMessage::class], $encoded[0]['headers']); } public function testSendWithDelay() @@ -117,7 +117,7 @@ class DoctrineIntegrationTest extends TestCase ]); $encoded = $this->connection->get(); - $this->assertEquals('{"message": "Hi available"}', $encoded['body']); + $this->assertEquals('{"message": "Hi available"}', $encoded[0]['body']); } public function testItCountMessages() @@ -182,8 +182,8 @@ class DoctrineIntegrationTest extends TestCase ]); $next = $this->connection->get(); - $this->assertEquals('{"message": "Hi requeued"}', $next['body']); - $this->connection->reject($next['id']); + $this->assertEquals('{"message": "Hi requeued"}', $next[0]['body']); + $this->connection->reject($next[0]['id']); } public function testTheTransportIsSetupOnGet() @@ -194,7 +194,7 @@ class DoctrineIntegrationTest extends TestCase $this->connection->send('the body', ['my' => 'header']); $envelope = $this->connection->get(); - $this->assertEquals('the body', $envelope['body']); + $this->assertEquals('the body', $envelope[0]['body']); } private function formatDateTime(\DateTimeImmutable $dateTime): string diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlIntegrationTest.php index 1002e6d5a43..1e944b25a58 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlIntegrationTest.php @@ -59,8 +59,8 @@ class DoctrinePostgreSqlIntegrationTest extends TestCase $this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); $encoded = $this->connection->get(); - $this->assertEquals('{"message": "Hi"}', $encoded['body']); - $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); + $this->assertEquals('{"message": "Hi"}', $encoded[0]['body']); + $this->assertEquals(['type' => DummyMessage::class], $encoded[0]['headers']); $this->assertNull($this->connection->get()); } @@ -72,8 +72,8 @@ class DoctrinePostgreSqlIntegrationTest extends TestCase $connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); $encoded = $connection->get(); - $this->assertEquals('{"message": "Hi"}', $encoded['body']); - $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); + $this->assertEquals('{"message": "Hi"}', $encoded[0]['body']); + $this->assertEquals(['type' => DummyMessage::class], $encoded[0]['headers']); $this->assertNull($connection->get()); } diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlPgbouncerIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlPgbouncerIntegrationTest.php index 6791923e4f0..4cd2ddd299d 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlPgbouncerIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlPgbouncerIntegrationTest.php @@ -37,8 +37,8 @@ class DoctrinePostgreSqlPgbouncerIntegrationTest extends TestCase $this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); $encoded = $this->connection->get(); - $this->assertSame('{"message": "Hi"}', $encoded['body']); - $this->assertSame(['type' => DummyMessage::class], $encoded['headers']); + $this->assertSame('{"message": "Hi"}', $encoded[0]['body']); + $this->assertSame(['type' => DummyMessage::class], $encoded[0]['headers']); $this->assertNull($this->connection->get()); } @@ -50,8 +50,8 @@ class DoctrinePostgreSqlPgbouncerIntegrationTest extends TestCase $this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); $encoded = $this->connection->get(); - $this->assertSame('{"message": "Hi"}', $encoded['body']); - $this->assertSame(['type' => DummyMessage::class], $encoded['headers']); + $this->assertSame('{"message": "Hi"}', $encoded[0]['body']); + $this->assertSame(['type' => DummyMessage::class], $encoded[0]['headers']); $this->assertNull($this->connection->get()); } diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlRegularIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlRegularIntegrationTest.php index c9ab640667a..0b3f672ba62 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlRegularIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrinePostgreSqlRegularIntegrationTest.php @@ -37,8 +37,8 @@ class DoctrinePostgreSqlRegularIntegrationTest extends TestCase $this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); $encoded = $this->connection->get(); - $this->assertSame('{"message": "Hi"}', $encoded['body']); - $this->assertSame(['type' => DummyMessage::class], $encoded['headers']); + $this->assertSame('{"message": "Hi"}', $encoded[0]['body']); + $this->assertSame(['type' => DummyMessage::class], $encoded[0]['headers']); $this->assertNull($this->connection->get()); } @@ -50,8 +50,8 @@ class DoctrinePostgreSqlRegularIntegrationTest extends TestCase $this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); $encoded = $this->connection->get(); - $this->assertSame('{"message": "Hi"}', $encoded['body']); - $this->assertSame(['type' => DummyMessage::class], $encoded['headers']); + $this->assertSame('{"message": "Hi"}', $encoded[0]['body']); + $this->assertSame(['type' => DummyMessage::class], $encoded[0]['headers']); $this->assertNull($this->connection->get()); } @@ -64,8 +64,8 @@ class DoctrinePostgreSqlRegularIntegrationTest extends TestCase $connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); $encoded = $connection->get(); - $this->assertSame('{"message": "Hi"}', $encoded['body']); - $this->assertSame(['type' => DummyMessage::class], $encoded['headers']); + $this->assertSame('{"message": "Hi"}', $encoded[0]['body']); + $this->assertSame(['type' => DummyMessage::class], $encoded[0]['headers']); $this->assertNull($this->connection->get()); } diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineReceiverTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineReceiverTest.php index 55819ca5759..231e34895b6 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineReceiverTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineReceiverTest.php @@ -38,7 +38,7 @@ class DoctrineReceiverTest extends TestCase $doctrineEnvelope = $this->createDoctrineEnvelope(); $connection = $this->createStub(Connection::class); - $connection->method('get')->willReturn($doctrineEnvelope); + $connection->method('get')->willReturn([$doctrineEnvelope]); $receiver = new DoctrineReceiver($connection, $serializer); $actualEnvelopes = $receiver->get(); @@ -65,7 +65,7 @@ class DoctrineReceiverTest extends TestCase $doctrineEnvelop = $this->createDoctrineEnvelope(); $connection = $this->createStub(Connection::class); - $connection->method('get')->willReturn($doctrineEnvelop); + $connection->method('get')->willReturn([$doctrineEnvelop]); $receiver = new DoctrineReceiver($connection, $serializer); $envelopes = $receiver->get(); @@ -100,7 +100,7 @@ class DoctrineReceiverTest extends TestCase $doctrineEnvelope = $this->createRetriedDoctrineEnvelope(); $connection = $this->createStub(Connection::class); - $connection->method('get')->willReturn($doctrineEnvelope); + $connection->method('get')->willReturn([$doctrineEnvelope]); $receiver = new DoctrineReceiver($connection, $serializer); $actualEnvelopes = $receiver->get(); @@ -111,6 +111,20 @@ class DoctrineReceiverTest extends TestCase $this->assertCount(1, $messageIdStamps); } + public function testGetUsesFetchSizeWhenProvided() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope = $this->createDoctrineEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->expects($this->once())->method('get')->with(7)->willReturn([$doctrineEnvelope]); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelopes = $receiver->get(7); + + $this->assertCount(1, $actualEnvelopes); + } + public function testAll() { $serializer = $this->createSerializer(); diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportTest.php index 7d633f63dd5..742cd93c525 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportTest.php @@ -47,7 +47,7 @@ class DoctrineTransportTest extends TestCase ]; $serializer->expects($this->once())->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage)); - $connection->method('get')->willReturn($doctrineEnvelope); + $connection->method('get')->willReturn([$doctrineEnvelope]); $envelopes = $transport->get(); $this->assertSame($decodedMessage, $envelopes[0]->getMessage()); diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php index db058486768..37b85290aea 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport; +use Doctrine\DBAL\ArrayParameterType; use Doctrine\DBAL\Connection as DBALConnection; use Doctrine\DBAL\Exception as DBALException; use Doctrine\DBAL\Exception\TableNotFoundException; @@ -154,14 +155,14 @@ class Connection implements ResetInterface ]); } - public function get(): ?array + public function get(int $fetchSize = 1): ?array { get: $this->driverConnection->beginTransaction(); try { $query = $this->createAvailableMessagesQueryBuilder() ->orderBy('available_at', 'ASC') - ->setMaxResults(1); + ->setMaxResults($fetchSize); if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) { $query->select('m.id'); @@ -181,13 +182,13 @@ class Connection implements ResetInterface $sql = $this->addLockMode($query, $sql); - $doctrineEnvelope = $this->executeQuery( + $doctrineEnvelopes = $this->executeQuery( $sql, $query->getParameters(), $query->getParameterTypes() - )->fetchAssociative(); + )->fetchAllAssociative(); - if (false === $doctrineEnvelope) { + if ([] === $doctrineEnvelopes) { $this->driverConnection->commit(); $this->queueEmptiedAt = microtime(true) * 1000; @@ -197,23 +198,41 @@ class Connection implements ResetInterface // We need to be sure to empty the queue before blocking again $this->queueEmptiedAt = null; - $doctrineEnvelope = $this->decodeEnvelopeHeaders($doctrineEnvelope); + $doctrineEnvelopes = array_map($this->decodeEnvelopeHeaders(...), $doctrineEnvelopes); - $queryBuilder = $this->driverConnection->createQueryBuilder() - ->update($this->configuration['table_name']) - ->set('delivered_at', '?') - ->where('id = ?'); $now = new \DateTimeImmutable('UTC'); - $this->executeStatement($queryBuilder->getSQL(), [ - $now, - $doctrineEnvelope['id'], - ], [ - Types::DATETIME_IMMUTABLE, - ]); + + if (1 === \count($doctrineEnvelopes)) { + $queryBuilder = $this->driverConnection->createQueryBuilder() + ->update($this->configuration['table_name']) + ->set('delivered_at', '?') + ->where('id = ?'); + + $this->executeStatement($queryBuilder->getSQL(), [ + $now, + $doctrineEnvelopes[0]['id'], + ], [ + Types::DATETIME_IMMUTABLE, + ]); + } else { + $ids = array_column($doctrineEnvelopes, 'id'); + $queryBuilder = $this->driverConnection->createQueryBuilder() + ->update($this->configuration['table_name']) + ->set('delivered_at', '?') + ->where('id IN (?)'); + + $this->executeStatement($queryBuilder->getSQL(), [ + $now, + $ids, + ], [ + Types::DATETIME_IMMUTABLE, + ArrayParameterType::STRING, + ]); + } $this->driverConnection->commit(); - return $doctrineEnvelope; + return $doctrineEnvelopes; } catch (\Throwable $e) { $this->driverConnection->rollBack(); diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineReceiver.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineReceiver.php index c8d13567701..b92a69d4933 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineReceiver.php @@ -40,10 +40,15 @@ class DoctrineReceiver implements ListableReceiverInterface, MessageCountAwareIn $this->serializer = $serializer ?? new PhpSerializer(); } - public function get(): iterable + /** + * @param int $fetchSize + */ + public function get(/* int $fetchSize = 1 */): iterable { + $fetchSize = \func_num_args() > 0 ? max(1, func_get_arg(0)) : 1; + try { - $doctrineEnvelope = $this->connection->get(); + $doctrineEnvelopes = $this->connection->get($fetchSize); $this->retryingSafetyCounter = 0; // reset counter } catch (RetryableException $exception) { // Do nothing when RetryableException occurs less than "MAX_RETRIES" @@ -59,11 +64,11 @@ class DoctrineReceiver implements ListableReceiverInterface, MessageCountAwareIn throw new TransportException($exception->getMessage(), 0, $exception); } - if (null === $doctrineEnvelope) { + if (null === $doctrineEnvelopes) { return []; } - return [$this->createEnvelopeFromData($doctrineEnvelope)]; + return array_map($this->createEnvelopeFromData(...), $doctrineEnvelopes); } public function ack(Envelope $envelope): void diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransport.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransport.php index f77eea32fb9..00972e1f88f 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransport.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransport.php @@ -36,9 +36,14 @@ class DoctrineTransport implements TransportInterface, SetupableTransportInterfa ) { } - public function get(): iterable + /** + * @param int $fetchSize + */ + public function get(/* int $fetchSize = 1 */): iterable { - return $this->getReceiver()->get(); + $fetchSize = \func_num_args() > 0 ? func_get_arg(0) : 1; + + return $this->getReceiver()->get($fetchSize); } public function ack(Envelope $envelope): void diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php index 9099651e17b..fc63e6d1dcd 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php @@ -62,10 +62,10 @@ final class PostgreSqlConnection extends Connection $this->unlisten(); } - public function get(): ?array + public function get(int $fetchSize = 1): ?array { if ($this->notifyHandledExternally || null === $this->queueEmptiedAt) { - return parent::get(); + return parent::get($fetchSize); } // Fallback: when no external listener handles LISTEN/NOTIFY, @@ -92,7 +92,7 @@ final class PostgreSqlConnection extends Connection return null; } - return parent::get(); + return parent::get($fetchSize); } /** diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php index 766cf9dcdce..67b4425ca3a 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php @@ -161,6 +161,33 @@ class ConnectionTest extends TestCase $this->assertNotNull($connection->get()); } + public function testGetUsesFetchSizeWhenProvided() + { + $redis = $this->createRedisMock(); + + $redis->expects($this->once())->method('xreadgroup') + ->willReturnCallback(function (...$args) { + static $series = [ + [['symfony', 'consumer', ['queue' => '0'], 5, 1], ['queue' => [ + '1-0' => ['message' => json_encode(['body' => 'First', 'headers' => []])], + '2-0' => ['message' => json_encode(['body' => 'Second', 'headers' => []])], + ]]], + ]; + + [$expectedArgs, $return] = array_shift($series); + $this->assertSame($expectedArgs, $args); + + return $return; + }) + ; + + $connection = Connection::fromDsn('redis://localhost/queue', [], $redis); + $messages = $connection->get(5); + + $this->assertSame('1-0', $messages[0]['id']); + $this->assertSame('2-0', $messages[1]['id']); + } + #[DataProvider('provideAuthDsn')] public function testAuth(string|array $expected, string $dsn) { @@ -330,15 +357,13 @@ class ConnectionTest extends TestCase $connection = Connection::fromDsn('redis://localhost/queue', [], $redis); $message = $connection->get(); + $this->assertSame(0, $message[0]['id']); $this->assertSame([ - 'id' => 0, - 'data' => [ - 'message' => json_encode([ - 'body' => '1', - 'headers' => [], - ]), - ], - ], $message); + 'message' => json_encode([ + 'body' => '1', + 'headers' => [], + ]), + ], $message[0]['data']); } public function testClaimAbandonedMessageWithRaceCondition() diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php index 590b41ad0be..e14e2d4ae0d 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php @@ -56,7 +56,7 @@ class RedisExtIntegrationTest extends TestCase 'body' => '{"message": "Hi"}', 'headers' => ['type' => DummyMessage::class], ]), - ], $message['data']); + ], $message[0]['data']); } public function testGetTheFirstAvailableMessage() @@ -69,14 +69,14 @@ class RedisExtIntegrationTest extends TestCase 'body' => '{"message": "Hi1"}', 'headers' => ['type' => DummyMessage::class], ]), - ], $message['data']); + ], $message[0]['data']); $message = $this->connection->get(); $this->assertEquals([ 'message' => json_encode([ 'body' => '{"message": "Hi2"}', 'headers' => ['type' => DummyMessage::class], ]), - ], $message['data']); + ], $message[0]['data']); } public function testConnectionSendWithSameContent() @@ -93,7 +93,7 @@ class RedisExtIntegrationTest extends TestCase 'body' => $body, 'headers' => $headers, ]), - ], $message['data']); + ], $message[0]['data']); $message = $this->connection->get(); $this->assertEquals([ @@ -101,7 +101,7 @@ class RedisExtIntegrationTest extends TestCase 'body' => $body, 'headers' => $headers, ]), - ], $message['data']); + ], $message[0]['data']); } public function testConnectionSendAndGetDelayed() @@ -116,7 +116,7 @@ class RedisExtIntegrationTest extends TestCase 'body' => '{"message": "Hi"}', 'headers' => ['type' => DummyMessage::class], ]), - ], $message['data']); + ], $message[0]['data']); } public function testConnectionSendDelayedMessagesWithSameContent() @@ -133,7 +133,7 @@ class RedisExtIntegrationTest extends TestCase 'body' => $body, 'headers' => $headers, ]), - ], $message['data']); + ], $message[0]['data']); $message = $this->connection->get(); $this->assertEquals([ @@ -141,7 +141,7 @@ class RedisExtIntegrationTest extends TestCase 'body' => $body, 'headers' => $headers, ]), - ], $message['data']); + ], $message[0]['data']); } public function testConnectionBelowRedeliverTimeout() @@ -206,8 +206,8 @@ class RedisExtIntegrationTest extends TestCase 'body' => $body1, 'headers' => $headers, ]), - ], $message['data']); - $connection->ack($message['id']); + ], $message[0]['data']); + $connection->ack($message[0]['id']); // Queue will return the second message $message = $connection->get(); @@ -216,8 +216,8 @@ class RedisExtIntegrationTest extends TestCase 'body' => $body2, 'headers' => $headers, ]), - ], $message['data']); - $connection->ack($message['id']); + ], $message[0]['data']); + $connection->ack($message[0]['id']); } #[DataProvider('sentinelOptionNames')] @@ -245,8 +245,8 @@ class RedisExtIntegrationTest extends TestCase 'body' => '1', 'headers' => [], ]), - ], $message['data']); - $connection->reject($message['id']); + ], $message[0]['data']); + $connection->reject($message[0]['id']); $connection->cleanup(); } @@ -282,8 +282,8 @@ class RedisExtIntegrationTest extends TestCase 'body' => '1', 'headers' => [], ]), - ], $message['data']); - $connection->reject($message['id']); + ], $message[0]['data']); + $connection->reject($message[0]['id']); $connection->cleanup(); } @@ -300,8 +300,8 @@ class RedisExtIntegrationTest extends TestCase 'body' => '1', 'headers' => [], ]), - ], $message['data']); - $connection->reject($message['id']); + ], $message[0]['data']); + $connection->reject($message[0]['id']); $connection->cleanup(); } @@ -319,8 +319,8 @@ class RedisExtIntegrationTest extends TestCase 'body' => '1', 'headers' => [], ]), - ], $message['data']); - $connection->reject($message['id']); + ], $message[0]['data']); + $connection->reject($message[0]['id']); } finally { $redis->unlink('messenger-lazy'); } @@ -373,7 +373,7 @@ class RedisExtIntegrationTest extends TestCase $this->assertNull($connection->get()); // no message, should return null immediately $connection->add('1', []); $this->assertNotEmpty($message = $connection->get()); - $connection->reject($message['id']); + $connection->reject($message[0]['id']); } finally { $redis->unlink('messenger-getnonblocking'); } @@ -390,7 +390,7 @@ class RedisExtIntegrationTest extends TestCase $connection->add('2', []); $failing = $connection->get(); - $connection->reject($failing['id']); + $connection->reject($failing[0]['id']); $connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', ['sentinel' => null], $redis); $this->assertNotNull($connection->get()); @@ -435,12 +435,12 @@ class RedisExtIntegrationTest extends TestCase $this->assertSame(3, $this->connection->getMessageCount()); $message = $this->connection->get(); - $this->connection->ack($message['id']); + $this->connection->ack($message[0]['id']); $this->assertSame(2, $this->connection->getMessageCount()); $message = $this->connection->get(); - $this->connection->reject($message['id']); + $this->connection->reject($message[0]['id']); $this->assertSame(1, $this->connection->getMessageCount()); } diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisReceiverTest.php b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisReceiverTest.php index 42b338c5e03..5a19cb12a24 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisReceiverTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisReceiverTest.php @@ -35,7 +35,7 @@ class RedisReceiverTest extends TestCase public function testItReturnsTheDecodedMessageToTheHandler(array $redisEnvelope, $expectedMessage, SerializerInterface $serializer) { $connection = $this->createStub(Connection::class); - $connection->method('get')->willReturn($redisEnvelope); + $connection->method('get')->willReturn([$redisEnvelope]); $receiver = new RedisReceiver($connection, $serializer); $actualEnvelopes = $receiver->get(); @@ -50,6 +50,71 @@ class RedisReceiverTest extends TestCase $this->assertSame($redisEnvelope['id'], $transportMessageIdStamp->getId()); } + public function testGetUsesFetchSizeWhenProvided() + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + $redisEnvelope = [ + 'id' => 1, + 'data' => [ + 'message' => json_encode([ + 'body' => '{"message": "Hi"}', + 'headers' => [ + 'type' => DummyMessage::class, + ], + ]), + ], + ]; + + $connection = $this->createMock(Connection::class); + $connection->expects($this->once())->method('get')->with(7)->willReturn([$redisEnvelope]); + + $receiver = new RedisReceiver($connection, $serializer); + $actualEnvelopes = $receiver->get(7); + + $this->assertCount(1, $actualEnvelopes); + } + + public function testItReturnsMultipleDecodedMessagesWhenAvailable() + { + $connection = $this->createStub(Connection::class); + $connection->method('get')->willReturn([ + [ + 'id' => '1', + 'data' => [ + 'message' => json_encode([ + 'body' => '{"message": "Hi"}', + 'headers' => [ + 'type' => DummyMessage::class, + ], + ]), + ], + ], + [ + 'id' => '2', + 'data' => [ + 'message' => json_encode([ + 'body' => '{"message": "Hello"}', + 'headers' => [ + 'type' => DummyMessage::class, + ], + ]), + ], + ], + ]); + + $receiver = new RedisReceiver($connection, new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + )); + + $envelopes = $receiver->get(2); + + $this->assertCount(2, $envelopes); + $this->assertEquals(new DummyMessage('Hi'), $envelopes[0]->getMessage()); + $this->assertEquals(new DummyMessage('Hello'), $envelopes[1]->getMessage()); + } + #[DataProvider('rejectedRedisEnvelopeProvider')] public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException(array $redisEnvelope) { @@ -57,7 +122,7 @@ class RedisReceiverTest extends TestCase $serializer->method('decode')->willThrowException(new MessageDecodingFailedException()); $connection = $this->createStub(Connection::class); - $connection->method('get')->willReturn($redisEnvelope); + $connection->method('get')->willReturn([$redisEnvelope]); $receiver = new RedisReceiver($connection, $serializer); $envelopes = $receiver->get(); diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisTransportTest.php b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisTransportTest.php index 35fa87e82f9..b9f739db12d 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisTransportTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisTransportTest.php @@ -49,7 +49,7 @@ class RedisTransportTest extends TestCase ]; $serializer->expects($this->once())->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage)); - $connection->method('get')->willReturn($redisEnvelope); + $connection->method('get')->willReturn([$redisEnvelope]); $envelopes = $transport->get(); $this->assertSame($decodedMessage, $envelopes[0]->getMessage()); diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php index cbe2822ee29..f60ba607bfd 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php @@ -69,6 +69,8 @@ class Connection private bool $deleteAfterAck; private bool $deleteAfterReject; private bool $couldHavePendingMessages = true; + /** @var list */ + private array $buffer = []; public function __construct(array $options, \Redis|Relay|\RedisCluster|null $redis = null) { @@ -426,11 +428,17 @@ class Connection $this->nextClaim = microtime(true) + $this->claimInterval; } - public function get(): ?array + /** + * @return list|null + */ + public function get(int $fetchSize = 1): ?array { + $fetchSize = max(1, $fetchSize); + if ($this->autoSetup) { $this->setup(); } + $now = microtime(); $now = substr($now, 11).substr($now, 2, 3); @@ -462,48 +470,78 @@ class Connection $this->claimOldPendingMessages(); } - $messageId = '>'; // will receive new messages + $messages = $this->getPendingMessages($fetchSize); - if ($this->couldHavePendingMessages) { - $messageId = '0'; // will receive consumers pending messages + if (\count($messages) >= $fetchSize) { + return $messages; } + $redis = $this->getRedis(); - try { - $messages = $redis->xreadgroup( - $this->group, - $this->consumer, - [$this->stream => $messageId], - 1, - 1 - ); - } catch (\RedisException|\Relay\Exception $e) { - throw new TransportException($e->getMessage(), 0, $e); - } - - if (false === $messages) { - if ($error = $redis->getLastError() ?: null) { - $redis->clearLastError(); + while (true) { + if (!$this->couldHavePendingMessages && $this->nextClaim <= microtime(true)) { + $this->claimOldPendingMessages(); } - throw new TransportException($error ?? 'Could not read messages from the redis stream.'); + $messageId = $this->couldHavePendingMessages ? '0' : '>'; + + try { + $streamMessages = $redis->xreadgroup( + $this->group, + $this->consumer, + [$this->stream => $messageId], + $fetchSize, + 1 + ); + } catch (\RedisException|\Relay\Exception $e) { + throw new TransportException($e->getMessage(), 0, $e); + } + + if (false === $streamMessages) { + if ($error = $redis->getLastError() ?: null) { + $redis->clearLastError(); + } + + throw new TransportException($error ?? 'Could not read messages from the redis stream.'); + } + + if ($this->couldHavePendingMessages && empty($streamMessages[$this->stream])) { + $this->couldHavePendingMessages = false; + + continue; + } + + foreach ($streamMessages[$this->stream] ?? [] as $key => $message) { + $this->buffer[] = [ + 'id' => $key, + 'data' => $message, + ]; + } + + break; } - if ($this->couldHavePendingMessages && empty($messages[$this->stream])) { - $this->couldHavePendingMessages = false; + $messages = [...$messages, ...$this->getPendingMessages($fetchSize - \count($messages))]; - // No pending messages so get a new one - return $this->get(); + if (!$messages) { + return null; } - foreach ($messages[$this->stream] ?? [] as $key => $message) { - return [ - 'id' => $key, - 'data' => $message, - ]; + return $messages; + } + + /** + * @return list + */ + private function getPendingMessages(int $fetchSize): array + { + $messages = []; + + while ($fetchSize-- > 0 && $this->buffer) { + $messages[] = array_shift($this->buffer); } - return null; + return $messages; } public function ack(string $id): void diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisReceiver.php b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisReceiver.php index 15a1e4ff4b2..3b9d748c9ea 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisReceiver.php @@ -37,51 +37,67 @@ class RedisReceiver implements KeepaliveReceiverInterface, MessageCountAwareInte $this->serializer = $serializer ?? new PhpSerializer(); } - public function get(): iterable + /** + * @param int $fetchSize + */ + public function get(/* int $fetchSize = 1 */): iterable { - if (null === $message = $this->connection->get()) { + $fetchSize = \func_num_args() > 0 ? max(1, func_get_arg(0)) : 1; + + if (null === $messages = $this->connection->get($fetchSize)) { return []; } - if (null === $message['data']) { - try { - $this->connection->reject($message['id']); - } catch (TransportException $e) { - if ($e->getPrevious()) { - throw $e; + $envelopes = []; + $shouldRetry = false; + + foreach ($messages as $message) { + if (null === $message['data']) { + $shouldRetry = true; + + try { + $this->connection->reject($message['id']); + } catch (TransportException $e) { + if ($e->getPrevious()) { + throw $e; + } } + + continue; } - return $this->get(); - } - - if (null === $redisEnvelope = json_decode($message['data']['message'] ?? '', true)) { - return []; - } - - $stamps = [ - new RedisReceivedStamp($message['id']), - new TransportMessageIdStamp($message['id']), - ]; - - try { - if (\array_key_exists('body', $redisEnvelope) && \array_key_exists('headers', $redisEnvelope)) { - $envelope = $this->serializer->decode($redisEnvelope = [ - 'body' => $redisEnvelope['body'], - 'headers' => $redisEnvelope['headers'], - ]); - } else { - $envelope = $this->serializer->decode($redisEnvelope); + if (null === $redisEnvelope = json_decode($message['data']['message'] ?? '', true)) { + continue; } - } catch (MessageDecodingFailedException $e) { - return [ - MessageDecodingFailedException::wrap($redisEnvelope, $e->getMessage(), $e->getCode(), $e)->with(...$stamps), + + $stamps = [ + new RedisReceivedStamp($message['id']), + new TransportMessageIdStamp($message['id']), ]; + + try { + if (\array_key_exists('body', $redisEnvelope) && \array_key_exists('headers', $redisEnvelope)) { + $envelope = $this->serializer->decode($redisEnvelope = [ + 'body' => $redisEnvelope['body'], + 'headers' => $redisEnvelope['headers'], + ]); + } else { + $envelope = $this->serializer->decode($redisEnvelope); + } + } catch (MessageDecodingFailedException $e) { + $envelopes[] = MessageDecodingFailedException::wrap($redisEnvelope, $e->getMessage(), $e->getCode(), $e)->with(...$stamps); + + continue; + } + + $envelopes[] = $envelope->withoutAll(TransportMessageIdStamp::class)->with(...$stamps); } - return [ - $envelope->withoutAll(TransportMessageIdStamp::class)->with(...$stamps), - ]; + if (!$envelopes && $shouldRetry) { + return $this->get($fetchSize); + } + + return $envelopes; } public function ack(Envelope $envelope): void diff --git a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisTransport.php b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisTransport.php index 495ee29fb60..34904b96c3d 100644 --- a/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisTransport.php +++ b/src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisTransport.php @@ -37,9 +37,14 @@ class RedisTransport implements TransportInterface, KeepaliveReceiverInterface, $this->serializer = $serializer ?? new PhpSerializer(); } - public function get(): iterable + /** + * @param int $fetchSize + */ + public function get(/* int $fetchSize = 1 */): iterable { - return $this->getReceiver()->get(); + $fetchSize = \func_num_args() > 0 ? func_get_arg(0) : 1; + + return $this->getReceiver()->get($fetchSize); } public function ack(Envelope $envelope): void diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 87b840184a6..6f178914d71 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -8,6 +8,8 @@ CHANGELOG * Receivers no longer delete messages on decode failure; they are routed through the normal retry/failure transport path * Add regex support for transport name patterns in the `messenger:consume` command * Add an idle timeout option to the `BatchHandlerTrait` + * Add argument `$fetchSize` to `ReceiverInterface::get()` and `QueueReceiverInterface::getFromQueues()`, and to all bridges + * Add a `--fetch-size` option to the `messenger:consume` command to control how many messages are fetched per iteration 8.0 --- diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index a3811ae775e..9f788f5742e 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -78,6 +78,7 @@ class ConsumeMessagesCommand extends Command implements SignalableCommandInterfa new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'), new InputOption('exclude-receivers', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Exclude specific receivers/transports from consumption (can only be used with --all)'), new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL), + new InputOption('fetch-size', null, InputOption::VALUE_REQUIRED, 'The number of messages to fetch per call to the transport', 1), ]) ->setHelp(<<<'EOF' The %command.name% command consumes messages and dispatches them to the message bus. @@ -132,6 +133,10 @@ class ConsumeMessagesCommand extends Command implements SignalableCommandInterfa Use the --exclude-receivers option to exclude specific receivers/transports from consumption (can only be used with --all): php %command.full_name% --all --exclude-receivers= + + Use the --fetch-size option to control how many messages are fetched per call to the transport: + + php %command.full_name% --fetch-size=8 EOF ) ; @@ -296,6 +301,12 @@ class ConsumeMessagesCommand extends Command implements SignalableCommandInterfa $options['queues'] = $queues; } + if (1 < $fetchSize = (int) $input->getOption('fetch-size')) { + throw new \InvalidArgumentException(\sprintf('The "--fetch-size" option must be a positive integer, "%s" given.', $input->getOption('fetch-size'))); + } + + $options['fetch_size'] = $fetchSize; + try { $this->worker->run($options); } finally { diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/DummyReceiver.php b/src/Symfony/Component/Messenger/Tests/Fixtures/DummyReceiver.php index 555730145e0..3f00207a820 100644 --- a/src/Symfony/Component/Messenger/Tests/Fixtures/DummyReceiver.php +++ b/src/Symfony/Component/Messenger/Tests/Fixtures/DummyReceiver.php @@ -20,6 +20,7 @@ class DummyReceiver implements ReceiverInterface private array $rejectedEnvelopes = []; private int $acknowledgeCount = 0; private int $rejectCount = 0; + private array $fetchSizes = []; /** * @param Envelope[][] $deliveriesOfEnvelopes @@ -29,8 +30,13 @@ class DummyReceiver implements ReceiverInterface ) { } - public function get(): iterable + /** + * @param int $fetchSize + */ + public function get(/* int $fetchSize = 1 */): iterable { + $this->fetchSizes[] = \func_num_args() > 0 ? func_get_arg(0) : 1; + $val = array_shift($this->deliveriesOfEnvelopes); return $val ?? []; @@ -67,4 +73,9 @@ class DummyReceiver implements ReceiverInterface { return $this->rejectedEnvelopes; } + + public function getFetchSizes(): array + { + return $this->fetchSizes; + } } diff --git a/src/Symfony/Component/Messenger/Tests/Transport/InMemory/InMemoryTransportTest.php b/src/Symfony/Component/Messenger/Tests/Transport/InMemory/InMemoryTransportTest.php index 4c044d31a42..398b415d8d8 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/InMemory/InMemoryTransportTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/InMemory/InMemoryTransportTest.php @@ -69,7 +69,7 @@ class InMemoryTransportTest extends TestCase $envelope1 = $this->transport->send($envelope1); $envelope2 = new Envelope(new \stdClass()); $envelope2 = $this->transport->send($envelope2); - $this->assertSame([$envelope1, $envelope2], $this->transport->get()); + $this->assertSame([$envelope1, $envelope2], $this->transport->get(2)); $this->transport->ack($envelope1); $this->assertSame([$envelope2], $this->transport->get()); $this->transport->reject($envelope2); @@ -109,13 +109,22 @@ class InMemoryTransportTest extends TestCase $this->assertSame([$envelopeDecoded], $serializeTransport->get()); } + public function testGetUsesFetchSizeWhenProvided() + { + $envelope1 = $this->transport->send(new Envelope(new \stdClass())); + $envelope2 = $this->transport->send(new Envelope(new \stdClass())); + + $this->assertSame([$envelope1], $this->transport->get(1)); + $this->assertSame([$envelope1, $envelope2], $this->transport->get(2)); + } + public function testAcknowledgeSameMessageWithDifferentStamps() { $envelope1 = new Envelope(new \stdClass(), [new AnEnvelopeStamp()]); $envelope1 = $this->transport->send($envelope1); $envelope2 = new Envelope(new \stdClass(), [new AnEnvelopeStamp()]); $envelope2 = $this->transport->send($envelope2); - $this->assertSame([$envelope1, $envelope2], $this->transport->get()); + $this->assertSame([$envelope1, $envelope2], $this->transport->get(2)); $this->transport->ack($envelope1->with(new AnEnvelopeStamp())); $this->assertSame([$envelope2], $this->transport->get()); $this->transport->reject($envelope2->with(new AnEnvelopeStamp())); diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index 7132781df0a..7153309c732 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -373,21 +373,47 @@ class WorkerTest extends TestCase public function testWorkerLimitQueues() { $envelope = [new Envelope(new DummyMessage('message1'))]; - $receiver = $this->createMock(QueueReceiverInterface::class); - $receiver->expects($this->once()) - ->method('getFromQueues') - ->with(['foo']) - ->willReturn($envelope) - ; - $receiver->expects($this->never()) - ->method('get') - ; + $receiver = new DummyQueueReceiver([$envelope]); $dispatcher = new EventDispatcher(); $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1)); $worker = new Worker(['transport' => $receiver], new MessageBus(), $dispatcher, clock: new MockClock()); $worker->run(['queues' => ['foo']]); + + $this->assertSame([['foo']], $receiver->queueNames); + $this->assertSame([1], $receiver->getFromQueuesFetchSizes); + } + + public function testWorkerPassesFetchSizeToReceiver() + { + $receiver = new DummyReceiver([ + [new Envelope(new DummyMessage('message1'))], + ]); + + $dispatcher = new EventDispatcher(); + $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1)); + + $worker = new Worker(['transport' => $receiver], new MessageBus(), $dispatcher, clock: new MockClock()); + $worker->run(['fetch_size' => 7]); + + $this->assertSame([7], $receiver->getFetchSizes()); + } + + public function testWorkerPassesFetchSizeToQueueReceiver() + { + $receiver = new DummyQueueReceiver([ + [new Envelope(new DummyMessage('message1'))], + ]); + + $dispatcher = new EventDispatcher(); + $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1)); + + $worker = new Worker(['transport' => $receiver], new MessageBus(), $dispatcher, clock: new MockClock()); + $worker->run(['queues' => ['foo'], 'fetch_size' => 7]); + + $this->assertSame([['foo']], $receiver->queueNames); + $this->assertSame([7], $receiver->getFromQueuesFetchSizes); } public function testWorkerLimitQueuesUnsupported() @@ -717,7 +743,7 @@ class WorkerTest extends TestCase $middleware = new HandleMessageMiddleware(new HandlersLocator([ DummyMessage::class => [new HandlerDescriptor($batchHandler)], - SecondHandlerDummyMessage::class => [new HandlerDescriptor(function (SecondHandlerDummyMessage $message) {})], + SecondHandlerDummyMessage::class => [new HandlerDescriptor(static function (SecondHandlerDummyMessage $message) {})], ])); $bus = new MessageBus([$middleware]); @@ -924,9 +950,18 @@ class WorkerTest extends TestCase class DummyQueueReceiver extends DummyReceiver implements QueueReceiverInterface { - public function getFromQueues(array $queueNames): iterable + public array $queueNames = []; + public array $getFromQueuesFetchSizes = []; + + /** + * @param int $fetchSize + */ + public function getFromQueues(array $queueNames/* , int $fetchSize = 1 */): iterable { - return $this->get(); + $this->queueNames[] = $queueNames; + $this->getFromQueuesFetchSizes[] = 1 < \func_num_args() ? func_get_arg(1) : 1; + + return $this->get(...\func_num_args() > 1 ? [func_get_arg(1)] : []); } } diff --git a/src/Symfony/Component/Messenger/Transport/InMemory/InMemoryTransport.php b/src/Symfony/Component/Messenger/Transport/InMemory/InMemoryTransport.php index 35d9e8de59a..dc99b3acf16 100644 --- a/src/Symfony/Component/Messenger/Transport/InMemory/InMemoryTransport.php +++ b/src/Symfony/Component/Messenger/Transport/InMemory/InMemoryTransport.php @@ -56,13 +56,20 @@ class InMemoryTransport implements TransportInterface, ResetInterface ) { } - public function get(): iterable + /** + * @param int $fetchSize Best-effort hint about how many messages can be received in one call + */ + public function get(/* int $fetchSize = 1 */): iterable { + $fetchSize = \func_num_args() > 0 ? max(1, func_get_arg(0)) : 1; $envelopes = []; $now = $this->clock?->now() ?? new \DateTimeImmutable(); foreach ($this->decode($this->queue) as $id => $envelope) { if (!isset($this->availableAt[$id]) || $now > $this->availableAt[$id]) { $envelopes[] = $envelope; + if (\count($envelopes) >= $fetchSize) { + break; + } } } diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/QueueReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/QueueReceiverInterface.php index 1886afebb8c..97b6baa375b 100644 --- a/src/Symfony/Component/Messenger/Transport/Receiver/QueueReceiverInterface.php +++ b/src/Symfony/Component/Messenger/Transport/Receiver/QueueReceiverInterface.php @@ -24,8 +24,9 @@ interface QueueReceiverInterface extends ReceiverInterface * Get messages from the specified queue names instead of consuming from all queues. * * @param string[] $queueNames + * @param int $fetchSize Best-effort hint about how many messages can be received in one call * * @return Envelope[] */ - public function getFromQueues(array $queueNames): iterable; + public function getFromQueues(array $queueNames/* , int $fetchSize = 1 */): iterable; } diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php index 4cf7a6d84fc..64939db8c2a 100644 --- a/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php +++ b/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php @@ -39,11 +39,13 @@ interface ReceiverInterface * an Envelope containing a MessageDecodingFailedException so the worker * can route it through the usual failure handling path. * + * @param int $fetchSize Best-effort hint about how many messages can be received in one call + * * @return iterable * * @throws TransportException If there is an issue communicating with the transport */ - public function get(): iterable; + public function get(/* int $fetchSize = 1 */): iterable; /** * Acknowledges that the passed message was handled. diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/SingleMessageReceiver.php b/src/Symfony/Component/Messenger/Transport/Receiver/SingleMessageReceiver.php index 1e1027b6a42..f0f53b13fe7 100644 --- a/src/Symfony/Component/Messenger/Transport/Receiver/SingleMessageReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/Receiver/SingleMessageReceiver.php @@ -30,7 +30,10 @@ class SingleMessageReceiver implements ReceiverInterface ) { } - public function get(): iterable + /** + * @param int $fetchSize + */ + public function get(/* int $fetchSize = 1 */): iterable { if ($this->hasReceived) { return []; diff --git a/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php index f7abc3ba932..f14fb3d74bd 100644 --- a/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php +++ b/src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php @@ -30,7 +30,10 @@ class SyncTransport implements TransportInterface ) { } - public function get(): iterable + /** + * @param int $fetchSize + */ + public function get(/* int $fetchSize = 1 */): iterable { throw new InvalidArgumentException('You cannot receive messages from the Messenger SyncTransport.'); } diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 486423c5c3d..87cd2285948 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -100,11 +100,14 @@ class Worker while (!$this->shouldStop) { $envelopeHandled = false; $envelopeHandledStart = $this->clock->now(); + $fetchSize = max(1, $options['fetch_size'] ?? 1); + foreach ($this->receivers as $transportName => $receiver) { if ($queueNames) { - $envelopes = $receiver->getFromQueues($queueNames); + /** @var QueueReceiverInterface $receiver */ + $envelopes = $receiver->getFromQueues($queueNames, $fetchSize); } else { - $envelopes = $receiver->get(); + $envelopes = $receiver->get($fetchSize); } foreach ($envelopes as $envelope) { diff --git a/src/Symfony/Component/Scheduler/Messenger/SchedulerTransport.php b/src/Symfony/Component/Scheduler/Messenger/SchedulerTransport.php index aea2cf669a4..5072edf809d 100644 --- a/src/Symfony/Component/Scheduler/Messenger/SchedulerTransport.php +++ b/src/Symfony/Component/Scheduler/Messenger/SchedulerTransport.php @@ -24,7 +24,10 @@ class SchedulerTransport implements TransportInterface ) { } - public function get(): iterable + /** + * @param int $fetchSize + */ + public function get(/* int $fetchSize = 1 */): iterable { foreach ($this->messageGenerator->getMessages() as $context => $message) { $stamp = new ScheduledStamp($context);