[Messenger] Add a --fetch-size option to the messenger:consume command to control how many messages are fetched per iteration

This commit is contained in:
Nicolas Grekas
2026-03-12 21:19:40 +01:00
parent 3f0ca55a3d
commit e4536ac7ef
43 changed files with 723 additions and 254 deletions

View File

@@ -94,6 +94,7 @@ Messenger
custom serializers that still throw are supported via a BC fallback in receivers
* Receivers no longer delete messages from the queue on decode failure;
they are routed through the normal retry/failure transport path instead
* Add argument `$fetchSize` to `ReceiverInterface::get()` and `QueueReceiverInterface::getFromQueues()`
Security
--------

View File

@@ -53,14 +53,14 @@ class AmazonSqsIntegrationTest extends TestCase
usleep(5000);
}
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class, DummyMessage::class => 'special'], $encoded['headers']);
$this->assertEquals('{"message": "Hi"}', $encoded[0]['body']);
$this->assertEquals(['type' => DummyMessage::class, DummyMessage::class => 'special'], $encoded[0]['headers']);
$this->waitUntilElapsed(seconds: 1.0, since: $messageSentAt);
$connection->keepalive($encoded['id']);
$connection->keepalive($encoded[0]['id']);
$this->waitUntilElapsed(seconds: 2.0, since: $messageSentAt);
$this->assertSame(0, $connection->getMessageCount(), 'The queue should be empty since visibility timeout was extended');
$connection->delete($encoded['id']);
$connection->delete($encoded[0]['id']);
}
private function waitUntilElapsed(float $seconds, float $since): void

View File

@@ -32,7 +32,7 @@ class AmazonSqsReceiverTest extends TestCase
$sqsEnvelop = $this->createSqsEnvelope();
$connection = $this->createStub(Connection::class);
$connection->method('get')->willReturn($sqsEnvelop);
$connection->method('get')->willReturn([$sqsEnvelop]);
$receiver = new AmazonSqsReceiver($connection, $serializer);
$actualEnvelopes = iterator_to_array($receiver->get());
@@ -40,6 +40,44 @@ class AmazonSqsReceiverTest extends TestCase
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
}
public function testGetUsesFetchSizeWhenProvided()
{
$serializer = $this->createSerializer();
$sqsEnvelope = $this->createSqsEnvelope();
$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('get')->with(7)->willReturn([$sqsEnvelope]);
$receiver = new AmazonSqsReceiver($connection, $serializer);
$actualEnvelopes = iterator_to_array($receiver->get(7));
$this->assertCount(1, $actualEnvelopes);
}
public function testItReturnsMultipleDecodedMessagesWhenAvailable()
{
$serializer = $this->createSerializer();
$connection = $this->createStub(Connection::class);
$connection->method('get')->willReturn([
$this->createSqsEnvelope(),
[
'id' => 2,
'body' => '{"message": "Hello"}',
'headers' => [
'type' => DummyMessage::class,
],
],
]);
$receiver = new AmazonSqsReceiver($connection, $serializer);
$actualEnvelopes = iterator_to_array($receiver->get(2));
$this->assertCount(2, $actualEnvelopes);
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
$this->assertEquals(new DummyMessage('Hello'), $actualEnvelopes[1]->getMessage());
}
public function testItReturnsSerializedEnvelopeWhenDecodingFails()
{
$serializer = $this->createStub(PhpSerializer::class);
@@ -47,7 +85,7 @@ class AmazonSqsReceiverTest extends TestCase
$sqsEnvelop = $this->createSqsEnvelope();
$connection = $this->createStub(Connection::class);
$connection->method('get')->willReturn($sqsEnvelop);
$connection->method('get')->willReturn([$sqsEnvelop]);
$receiver = new AmazonSqsReceiver($connection, $serializer);
$envelopes = iterator_to_array($receiver->get());

View File

@@ -54,7 +54,7 @@ class AmazonSqsTransportTest extends TestCase
];
$serializer->expects($this->once())->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('get')->willReturn($sqsEnvelope);
$connection->method('get')->willReturn([$sqsEnvelope]);
$envelopes = iterator_to_array($transport->get());
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());

View File

@@ -280,6 +280,48 @@ class ConnectionTest extends TestCase
$this->assertNull($connection->get());
}
public function testGetUsesMaxOfFetchSizeAndConfiguredBufferSize()
{
$client = $this->createMock(SqsClient::class);
$client
->method('getQueueUrl')
->willReturnMap([
[['QueueName' => 'queue', 'QueueOwnerAWSAccountId' => 123], ResultMockFactory::create(GetQueueUrlResult::class, ['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue'])],
]);
$firstResult = ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [
new Message(['MessageId' => 1, 'Body' => 'this is a test']),
]]);
$secondResult = ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => []]);
$series = [
[[['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
'VisibilityTimeout' => null,
'MaxNumberOfMessages' => 12,
'MessageAttributeNames' => ['All'],
'WaitTimeSeconds' => 20]], $firstResult],
[[['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
'VisibilityTimeout' => null,
'MaxNumberOfMessages' => 12,
'MessageAttributeNames' => ['All'],
'WaitTimeSeconds' => 20]], $secondResult],
];
$client->expects($this->exactly(2))
->method('receiveMessage')
->willReturnCallback(function (...$args) use (&$series) {
[$expectedArgs, $return] = array_shift($series);
$this->assertSame($expectedArgs, $args);
return $return;
})
;
$connection = new Connection(['queue_name' => 'queue', 'account' => 123, 'auto_setup' => false, 'buffer_size' => 9], $client);
$this->assertNotNull($connection->get(12));
$this->assertNull($connection->get(12));
}
public function testUnexpectedSqsError()
{
$this->expectException(HttpException::class);

View File

@@ -36,28 +36,35 @@ class AmazonSqsReceiver implements KeepaliveReceiverInterface, MessageCountAware
$this->serializer = $serializer ?? new PhpSerializer();
}
public function get(): iterable
/**
* @param int $fetchSize
*/
public function get(/* int $fetchSize = 1 */): iterable
{
$fetchSize = \func_num_args() > 0 ? max(1, func_get_arg(0)) : 1;
try {
if (!$sqsEnvelope = $this->connection->get()) {
if (!$sqsEnvelopes = $this->connection->get($fetchSize)) {
return;
}
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
$stamps = [
new AmazonSqsReceivedStamp($sqsEnvelope['id']),
new TransportMessageIdStamp($sqsEnvelope['id']),
];
foreach ($sqsEnvelopes as $sqsEnvelope) {
$stamps = [
new AmazonSqsReceivedStamp($sqsEnvelope['id']),
new TransportMessageIdStamp($sqsEnvelope['id']),
];
try {
yield $this->serializer->decode($sqsEnvelope = [
'body' => $sqsEnvelope['body'],
'headers' => $sqsEnvelope['headers'],
])->with(...$stamps);
} catch (MessageDecodingFailedException $e) {
yield MessageDecodingFailedException::wrap($sqsEnvelope, $e->getMessage(), $e->getCode(), $e)->with(...$stamps);
try {
yield $this->serializer->decode($sqsEnvelope = [
'body' => $sqsEnvelope['body'],
'headers' => $sqsEnvelope['headers'],
])->with(...$stamps);
} catch (MessageDecodingFailedException $e) {
yield MessageDecodingFailedException::wrap($sqsEnvelope, $e->getMessage(), $e->getCode(), $e)->with(...$stamps);
}
}
}

View File

@@ -43,9 +43,14 @@ class AmazonSqsTransport implements TransportInterface, KeepaliveReceiverInterfa
$this->serializer = $serializer ?? new PhpSerializer();
}
public function get(): iterable
/**
* @param int $fetchSize
*/
public function get(/* int $fetchSize = 1 */): iterable
{
return $this->getReceiver()->get();
$fetchSize = \func_num_args() > 0 ? func_get_arg(0) : 1;
return $this->getReceiver()->get($fetchSize);
}
public function ack(Envelope $envelope): void

View File

@@ -99,7 +99,7 @@ class Connection
* * access_key: AWS access key
* * secret_key: AWS secret key
* * session_token: AWS session token (required only when using temporary credentials)
* * buffer_size: number of messages to prefetch (Default: 9)
* * buffer_size: number of messages to prefetch (Default: 9, Max: 10)
* * wait_time: long polling duration in seconds (Default: 20)
* * poll_timeout: amount of seconds the transport should wait for new message
* * visibility_timeout: amount of seconds the message won't be visible
@@ -188,61 +188,58 @@ class Connection
return new self($configuration, new SqsClient($clientConfiguration, null, $client, $logger), $queueUrl);
}
public function get(): ?array
public function get(int $fetchSize = 1): ?array
{
if ($this->configuration['auto_setup']) {
$this->setup();
}
foreach ($this->getNextMessages() as $message) {
return $message;
$fetchSize = max(1, $fetchSize);
$messages = $this->getPendingMessages($fetchSize);
if (\count($messages) < $fetchSize
&& $this->fetchMessages(max($fetchSize, $this->configuration['buffer_size']))
) {
$messages = [...$messages, ...$this->getPendingMessages($fetchSize - \count($messages))];
}
return null;
return $messages ?: null;
}
/**
* @return \Generator<int, array>
* @return list<array>
*/
private function getNextMessages(): \Generator
private function getPendingMessages(int $fetchSize): array
{
yield from $this->getPendingMessages();
yield from $this->getNewMessages();
}
$messages = [];
/**
* @return \Generator<int, array>
*/
private function getPendingMessages(): \Generator
{
while ($this->buffer) {
yield array_shift($this->buffer);
while ($fetchSize-- > 0 && $this->buffer) {
$messages[] = array_shift($this->buffer);
}
return $messages;
}
/**
* @return \Generator<int, array>
*/
private function getNewMessages(): \Generator
private function fetchMessages(int $fetchSize): bool
{
if (null === $this->currentResponse) {
$this->currentResponse = $this->client->receiveMessage([
'QueueUrl' => $this->getQueueUrl(),
'VisibilityTimeout' => $this->configuration['visibility_timeout'],
'MaxNumberOfMessages' => $this->configuration['buffer_size'],
'MaxNumberOfMessages' => min($fetchSize, 10), // SQS limitation
'MessageAttributeNames' => ['All'],
'WaitTimeSeconds' => $this->configuration['wait_time'],
]);
}
if (!$this->fetchMessage()) {
return;
if (!$this->fetchPendingMessages()) {
return false;
}
yield from $this->getPendingMessages();
return true;
}
private function fetchMessage(): bool
private function fetchPendingMessages(): bool
{
if (!$this->currentResponse->resolve($this->configuration['poll_timeout'])) {
return false;
@@ -416,13 +413,13 @@ class Connection
{
if (null !== $this->currentResponse) {
// fetch current response in order to requeue in transit messages
if (!$this->fetchMessage()) {
if (!$this->fetchPendingMessages()) {
$this->currentResponse->cancel();
$this->currentResponse = null;
}
}
foreach ($this->getPendingMessages() as $message) {
foreach ($this->getPendingMessages(\count($this->buffer)) as $message) {
$this->client->changeMessageVisibility([
'QueueUrl' => $this->getQueueUrl(),
'ReceiptHandle' => $message['id'],

View File

@@ -49,6 +49,39 @@ class AmqpReceiverTest extends TestCase
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
}
public function testGetAcceptsFetchSize()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->createMock(Connection::class);
$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->expects($this->exactly(2))->method('get')->with('queueName')->willReturnOnConsecutiveCalls($amqpEnvelope, null);
$receiver = new AmqpReceiver($connection, $serializer);
$actualEnvelopes = iterator_to_array($receiver->get(7));
$this->assertCount(1, $actualEnvelopes);
}
public function testGetFromQueuesAcceptsFetchSize()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->createMock(Connection::class);
$connection->expects($this->exactly(2))->method('get')->with('queueName')->willReturnOnConsecutiveCalls($amqpEnvelope, null);
$receiver = new AmqpReceiver($connection, $serializer);
$actualEnvelopes = iterator_to_array($receiver->getFromQueues(['queueName'], 7));
$this->assertCount(1, $actualEnvelopes);
}
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
{
$this->expectException(TransportException::class);
@@ -158,10 +191,10 @@ class AmqpReceiverTest extends TestCase
$this->assertInstanceOf(MessageDecodingFailedException::class, $envelopes[0]->getMessage());
}
private function createAMQPEnvelope(?string $messageId = null): \AMQPEnvelope
private function createAMQPEnvelope(?string $messageId = null, string $body = '{"message": "Hi"}'): \AMQPEnvelope
{
$envelope = $this->createStub(\AMQPEnvelope::class);
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
$envelope->method('getBody')->willReturn($body);
$envelope->method('getHeaders')->willReturn([
'type' => DummyMessage::class,
]);

View File

@@ -37,19 +37,52 @@ class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface
$this->serializer = $serializer ?? new PhpSerializer();
}
public function get(): iterable
/**
* @param int $fetchSize
*/
public function get(/* int $fetchSize = 1 */): iterable
{
yield from $this->getFromQueues($this->connection->getQueueNames());
$fetchSize = \func_num_args() > 0 ? max(1, func_get_arg(0)) : 1;
yield from $this->getFromQueues($this->connection->getQueueNames(), $fetchSize);
}
public function getFromQueues(array $queueNames): iterable
/**
* @param int $fetchSize
*/
public function getFromQueues(array $queueNames/* , int $fetchSize = 1 */): iterable
{
foreach ($queueNames as $queueName) {
yield from $this->getEnvelope($queueName);
$fetchSize = \func_num_args() > 1 ? max(1, func_get_arg(1)) : 1;
$remaining = $fetchSize;
$activeQueues = array_values($queueNames);
$firstRound = true;
while ($activeQueues && ($remaining > 0 || $firstRound)) {
$exhausted = [];
foreach ($activeQueues as $i => $queueName) {
if (null === $envelope = $this->getEnvelope($queueName)) {
$exhausted[] = $i;
continue;
}
yield $envelope;
--$remaining;
if ($remaining <= 0 && !$firstRound) {
return;
}
}
$firstRound = false;
foreach (array_reverse($exhausted) as $i) {
array_splice($activeQueues, $i, 1);
}
}
}
private function getEnvelope(string $queueName): iterable
private function getEnvelope(string $queueName): ?Envelope
{
try {
$amqpEnvelope = $this->connection->get($queueName);
@@ -68,7 +101,7 @@ class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface
}
if (null === $amqpEnvelope) {
return;
return null;
}
$body = $amqpEnvelope->getBody();
@@ -78,13 +111,15 @@ class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface
...($id ? [new TransportMessageIdStamp($id)] : []),
];
$data = [
'body' => false === $body ? '' : $body,
'headers' => $amqpEnvelope->getHeaders(),
];
try {
yield $this->serializer->decode($data = [
'body' => false === $body ? '' : $body, // workaround https://github.com/pdezwart/php-amqp/issues/351
'headers' => $amqpEnvelope->getHeaders(),
])->withoutAll(TransportMessageIdStamp::class)->with(...$stamps);
return $this->serializer->decode($data)->withoutAll(TransportMessageIdStamp::class)->with(...$stamps);
} catch (MessageDecodingFailedException $e) {
yield MessageDecodingFailedException::wrap($data, $e->getMessage(), $e->getCode(), $e)->with(...$stamps);
return MessageDecodingFailedException::wrap($data, $e->getMessage(), $e->getCode(), $e)->with(...$stamps);
}
}

View File

@@ -36,14 +36,24 @@ class AmqpTransport implements QueueReceiverInterface, TransportInterface, Setup
$this->serializer = $serializer ?? new PhpSerializer();
}
public function get(): iterable
/**
* @param int $fetchSize
*/
public function get(/* int $fetchSize = 1 */): iterable
{
return $this->getReceiver()->get();
$fetchSize = \func_num_args() > 0 ? func_get_arg(0) : 1;
return $this->getReceiver()->get($fetchSize);
}
public function getFromQueues(array $queueNames): iterable
/**
* @param int $fetchSize
*/
public function getFromQueues(array $queueNames/* , int $fetchSize = 1 */): iterable
{
return $this->getReceiver()->getFromQueues($queueNames);
$fetchSize = \func_num_args() > 1 ? func_get_arg(1) : 1;
return $this->getReceiver()->getFromQueues($queueNames, $fetchSize);
}
public function ack(Envelope $envelope): void

View File

@@ -35,7 +35,10 @@ class BeanstalkdReceiver implements KeepaliveReceiverInterface, MessageCountAwar
$this->serializer = $serializer ?? new PhpSerializer();
}
public function get(): iterable
/**
* @param int $fetchSize
*/
public function get(/* int $fetchSize = 1 */): iterable
{
if (!$beanstalkdEnvelope = $this->connection->get()) {
return;

View File

@@ -34,9 +34,14 @@ class BeanstalkdTransport implements TransportInterface, KeepaliveReceiverInterf
$this->serializer = $serializer ?? new PhpSerializer();
}
public function get(): iterable
/**
* @param int $fetchSize
*/
public function get(/* int $fetchSize = 1 */): iterable
{
return $this->getReceiver()->get();
$fetchSize = \func_num_args() > 0 ? func_get_arg(0) : 1;
return $this->getReceiver()->get($fetchSize);
}
public function ack(Envelope $envelope): void

View File

@@ -43,7 +43,7 @@ class ConnectionTest extends TestCase
{
$queryBuilder = $this->getQueryBuilderStub();
$driverConnection = $this->getDBALConnection();
$stmt = $this->getResultMock([
$stmt = $this->getGetResultMock([
'id' => 1,
'body' => '{"message":"Hi"}',
'headers' => json_encode(['type' => DummyMessage::class]),
@@ -70,16 +70,16 @@ class ConnectionTest extends TestCase
$connection = new Connection([], $driverConnection);
$doctrineEnvelope = $connection->get();
$this->assertEquals(1, $doctrineEnvelope['id']);
$this->assertEquals('{"message":"Hi"}', $doctrineEnvelope['body']);
$this->assertEquals(['type' => DummyMessage::class], $doctrineEnvelope['headers']);
$this->assertEquals(1, $doctrineEnvelope[0]['id']);
$this->assertEquals('{"message":"Hi"}', $doctrineEnvelope[0]['body']);
$this->assertEquals(['type' => DummyMessage::class], $doctrineEnvelope[0]['headers']);
}
public function testGetWithNoPendingMessageWillReturnNull()
{
$queryBuilder = $this->getQueryBuilderStub();
$driverConnection = $this->getDBALConnection(true);
$stmt = $this->getResultMock(false);
$stmt = $this->getGetResultMock(false);
$queryBuilder
->method('getParameters')
@@ -108,7 +108,7 @@ class ConnectionTest extends TestCase
{
$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnection(true);
$stmt = $this->getResultMock(false);
$stmt = $this->getGetResultMock(false);
$queryBuilder
->method('getParameters')
@@ -420,7 +420,18 @@ class ConnectionTest extends TestCase
return $queryBuilder;
}
private function getResultMock($expectedResult): Result&MockObject
private function getGetResultMock($expectedResult): Result&MockObject
{
$stmt = $this->createMock(Result::class);
$stmt->expects($this->once())
->method('fetchAllAssociative')
->willReturn(false === $expectedResult ? [] : [$expectedResult]);
return $stmt;
}
private function getFindResultMock($expectedResult): Result&MockObject
{
$stmt = $this->createMock(Result::class);
@@ -534,7 +545,7 @@ class ConnectionTest extends TestCase
$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnection();
$id = 1;
$stmt = $this->getResultMock([
$stmt = $this->getFindResultMock([
'id' => $id,
'body' => '{"message":"Hi"}',
'headers' => json_encode(['type' => DummyMessage::class]),
@@ -624,7 +635,7 @@ class ConnectionTest extends TestCase
$driverConnection->method('createQueryBuilder')->willReturnCallback(static fn () => new QueryBuilder($driverConnection));
$result = $this->createStub(Result::class);
$result->method('fetchAssociative')->willReturn(false);
$result->method('fetchAllAssociative')->willReturn([]);
$driverConnection->expects($this->once())->method('beginTransaction');
$driverConnection

View File

@@ -46,8 +46,8 @@ class DoctrineIntegrationTest extends TestCase
{
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
$this->assertEquals('{"message": "Hi"}', $encoded[0]['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded[0]['headers']);
}
public function testSendWithDelay()
@@ -117,7 +117,7 @@ class DoctrineIntegrationTest extends TestCase
]);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi available"}', $encoded['body']);
$this->assertEquals('{"message": "Hi available"}', $encoded[0]['body']);
}
public function testItCountMessages()
@@ -182,8 +182,8 @@ class DoctrineIntegrationTest extends TestCase
]);
$next = $this->connection->get();
$this->assertEquals('{"message": "Hi requeued"}', $next['body']);
$this->connection->reject($next['id']);
$this->assertEquals('{"message": "Hi requeued"}', $next[0]['body']);
$this->connection->reject($next[0]['id']);
}
public function testTheTransportIsSetupOnGet()
@@ -194,7 +194,7 @@ class DoctrineIntegrationTest extends TestCase
$this->connection->send('the body', ['my' => 'header']);
$envelope = $this->connection->get();
$this->assertEquals('the body', $envelope['body']);
$this->assertEquals('the body', $envelope[0]['body']);
}
private function formatDateTime(\DateTimeImmutable $dateTime): string

View File

@@ -59,8 +59,8 @@ class DoctrinePostgreSqlIntegrationTest extends TestCase
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
$this->assertEquals('{"message": "Hi"}', $encoded[0]['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded[0]['headers']);
$this->assertNull($this->connection->get());
}
@@ -72,8 +72,8 @@ class DoctrinePostgreSqlIntegrationTest extends TestCase
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$encoded = $connection->get();
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
$this->assertEquals('{"message": "Hi"}', $encoded[0]['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded[0]['headers']);
$this->assertNull($connection->get());
}

View File

@@ -37,8 +37,8 @@ class DoctrinePostgreSqlPgbouncerIntegrationTest extends TestCase
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$encoded = $this->connection->get();
$this->assertSame('{"message": "Hi"}', $encoded['body']);
$this->assertSame(['type' => DummyMessage::class], $encoded['headers']);
$this->assertSame('{"message": "Hi"}', $encoded[0]['body']);
$this->assertSame(['type' => DummyMessage::class], $encoded[0]['headers']);
$this->assertNull($this->connection->get());
}
@@ -50,8 +50,8 @@ class DoctrinePostgreSqlPgbouncerIntegrationTest extends TestCase
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$encoded = $this->connection->get();
$this->assertSame('{"message": "Hi"}', $encoded['body']);
$this->assertSame(['type' => DummyMessage::class], $encoded['headers']);
$this->assertSame('{"message": "Hi"}', $encoded[0]['body']);
$this->assertSame(['type' => DummyMessage::class], $encoded[0]['headers']);
$this->assertNull($this->connection->get());
}

View File

@@ -37,8 +37,8 @@ class DoctrinePostgreSqlRegularIntegrationTest extends TestCase
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$encoded = $this->connection->get();
$this->assertSame('{"message": "Hi"}', $encoded['body']);
$this->assertSame(['type' => DummyMessage::class], $encoded['headers']);
$this->assertSame('{"message": "Hi"}', $encoded[0]['body']);
$this->assertSame(['type' => DummyMessage::class], $encoded[0]['headers']);
$this->assertNull($this->connection->get());
}
@@ -50,8 +50,8 @@ class DoctrinePostgreSqlRegularIntegrationTest extends TestCase
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$encoded = $this->connection->get();
$this->assertSame('{"message": "Hi"}', $encoded['body']);
$this->assertSame(['type' => DummyMessage::class], $encoded['headers']);
$this->assertSame('{"message": "Hi"}', $encoded[0]['body']);
$this->assertSame(['type' => DummyMessage::class], $encoded[0]['headers']);
$this->assertNull($this->connection->get());
}
@@ -64,8 +64,8 @@ class DoctrinePostgreSqlRegularIntegrationTest extends TestCase
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$encoded = $connection->get();
$this->assertSame('{"message": "Hi"}', $encoded['body']);
$this->assertSame(['type' => DummyMessage::class], $encoded['headers']);
$this->assertSame('{"message": "Hi"}', $encoded[0]['body']);
$this->assertSame(['type' => DummyMessage::class], $encoded[0]['headers']);
$this->assertNull($this->connection->get());
}

View File

@@ -38,7 +38,7 @@ class DoctrineReceiverTest extends TestCase
$doctrineEnvelope = $this->createDoctrineEnvelope();
$connection = $this->createStub(Connection::class);
$connection->method('get')->willReturn($doctrineEnvelope);
$connection->method('get')->willReturn([$doctrineEnvelope]);
$receiver = new DoctrineReceiver($connection, $serializer);
$actualEnvelopes = $receiver->get();
@@ -65,7 +65,7 @@ class DoctrineReceiverTest extends TestCase
$doctrineEnvelop = $this->createDoctrineEnvelope();
$connection = $this->createStub(Connection::class);
$connection->method('get')->willReturn($doctrineEnvelop);
$connection->method('get')->willReturn([$doctrineEnvelop]);
$receiver = new DoctrineReceiver($connection, $serializer);
$envelopes = $receiver->get();
@@ -100,7 +100,7 @@ class DoctrineReceiverTest extends TestCase
$doctrineEnvelope = $this->createRetriedDoctrineEnvelope();
$connection = $this->createStub(Connection::class);
$connection->method('get')->willReturn($doctrineEnvelope);
$connection->method('get')->willReturn([$doctrineEnvelope]);
$receiver = new DoctrineReceiver($connection, $serializer);
$actualEnvelopes = $receiver->get();
@@ -111,6 +111,20 @@ class DoctrineReceiverTest extends TestCase
$this->assertCount(1, $messageIdStamps);
}
public function testGetUsesFetchSizeWhenProvided()
{
$serializer = $this->createSerializer();
$doctrineEnvelope = $this->createDoctrineEnvelope();
$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('get')->with(7)->willReturn([$doctrineEnvelope]);
$receiver = new DoctrineReceiver($connection, $serializer);
$actualEnvelopes = $receiver->get(7);
$this->assertCount(1, $actualEnvelopes);
}
public function testAll()
{
$serializer = $this->createSerializer();

View File

@@ -47,7 +47,7 @@ class DoctrineTransportTest extends TestCase
];
$serializer->expects($this->once())->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('get')->willReturn($doctrineEnvelope);
$connection->method('get')->willReturn([$doctrineEnvelope]);
$envelopes = $transport->get();
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());

View File

@@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
use Doctrine\DBAL\ArrayParameterType;
use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\DBAL\Exception as DBALException;
use Doctrine\DBAL\Exception\TableNotFoundException;
@@ -154,14 +155,14 @@ class Connection implements ResetInterface
]);
}
public function get(): ?array
public function get(int $fetchSize = 1): ?array
{
get:
$this->driverConnection->beginTransaction();
try {
$query = $this->createAvailableMessagesQueryBuilder()
->orderBy('available_at', 'ASC')
->setMaxResults(1);
->setMaxResults($fetchSize);
if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
$query->select('m.id');
@@ -181,13 +182,13 @@ class Connection implements ResetInterface
$sql = $this->addLockMode($query, $sql);
$doctrineEnvelope = $this->executeQuery(
$doctrineEnvelopes = $this->executeQuery(
$sql,
$query->getParameters(),
$query->getParameterTypes()
)->fetchAssociative();
)->fetchAllAssociative();
if (false === $doctrineEnvelope) {
if ([] === $doctrineEnvelopes) {
$this->driverConnection->commit();
$this->queueEmptiedAt = microtime(true) * 1000;
@@ -197,23 +198,41 @@ class Connection implements ResetInterface
// We need to be sure to empty the queue before blocking again
$this->queueEmptiedAt = null;
$doctrineEnvelope = $this->decodeEnvelopeHeaders($doctrineEnvelope);
$doctrineEnvelopes = array_map($this->decodeEnvelopeHeaders(...), $doctrineEnvelopes);
$queryBuilder = $this->driverConnection->createQueryBuilder()
->update($this->configuration['table_name'])
->set('delivered_at', '?')
->where('id = ?');
$now = new \DateTimeImmutable('UTC');
$this->executeStatement($queryBuilder->getSQL(), [
$now,
$doctrineEnvelope['id'],
], [
Types::DATETIME_IMMUTABLE,
]);
if (1 === \count($doctrineEnvelopes)) {
$queryBuilder = $this->driverConnection->createQueryBuilder()
->update($this->configuration['table_name'])
->set('delivered_at', '?')
->where('id = ?');
$this->executeStatement($queryBuilder->getSQL(), [
$now,
$doctrineEnvelopes[0]['id'],
], [
Types::DATETIME_IMMUTABLE,
]);
} else {
$ids = array_column($doctrineEnvelopes, 'id');
$queryBuilder = $this->driverConnection->createQueryBuilder()
->update($this->configuration['table_name'])
->set('delivered_at', '?')
->where('id IN (?)');
$this->executeStatement($queryBuilder->getSQL(), [
$now,
$ids,
], [
Types::DATETIME_IMMUTABLE,
ArrayParameterType::STRING,
]);
}
$this->driverConnection->commit();
return $doctrineEnvelope;
return $doctrineEnvelopes;
} catch (\Throwable $e) {
$this->driverConnection->rollBack();

View File

@@ -40,10 +40,15 @@ class DoctrineReceiver implements ListableReceiverInterface, MessageCountAwareIn
$this->serializer = $serializer ?? new PhpSerializer();
}
public function get(): iterable
/**
* @param int $fetchSize
*/
public function get(/* int $fetchSize = 1 */): iterable
{
$fetchSize = \func_num_args() > 0 ? max(1, func_get_arg(0)) : 1;
try {
$doctrineEnvelope = $this->connection->get();
$doctrineEnvelopes = $this->connection->get($fetchSize);
$this->retryingSafetyCounter = 0; // reset counter
} catch (RetryableException $exception) {
// Do nothing when RetryableException occurs less than "MAX_RETRIES"
@@ -59,11 +64,11 @@ class DoctrineReceiver implements ListableReceiverInterface, MessageCountAwareIn
throw new TransportException($exception->getMessage(), 0, $exception);
}
if (null === $doctrineEnvelope) {
if (null === $doctrineEnvelopes) {
return [];
}
return [$this->createEnvelopeFromData($doctrineEnvelope)];
return array_map($this->createEnvelopeFromData(...), $doctrineEnvelopes);
}
public function ack(Envelope $envelope): void

View File

@@ -36,9 +36,14 @@ class DoctrineTransport implements TransportInterface, SetupableTransportInterfa
) {
}
public function get(): iterable
/**
* @param int $fetchSize
*/
public function get(/* int $fetchSize = 1 */): iterable
{
return $this->getReceiver()->get();
$fetchSize = \func_num_args() > 0 ? func_get_arg(0) : 1;
return $this->getReceiver()->get($fetchSize);
}
public function ack(Envelope $envelope): void

View File

@@ -62,10 +62,10 @@ final class PostgreSqlConnection extends Connection
$this->unlisten();
}
public function get(): ?array
public function get(int $fetchSize = 1): ?array
{
if ($this->notifyHandledExternally || null === $this->queueEmptiedAt) {
return parent::get();
return parent::get($fetchSize);
}
// Fallback: when no external listener handles LISTEN/NOTIFY,
@@ -92,7 +92,7 @@ final class PostgreSqlConnection extends Connection
return null;
}
return parent::get();
return parent::get($fetchSize);
}
/**

View File

@@ -161,6 +161,33 @@ class ConnectionTest extends TestCase
$this->assertNotNull($connection->get());
}
public function testGetUsesFetchSizeWhenProvided()
{
$redis = $this->createRedisMock();
$redis->expects($this->once())->method('xreadgroup')
->willReturnCallback(function (...$args) {
static $series = [
[['symfony', 'consumer', ['queue' => '0'], 5, 1], ['queue' => [
'1-0' => ['message' => json_encode(['body' => 'First', 'headers' => []])],
'2-0' => ['message' => json_encode(['body' => 'Second', 'headers' => []])],
]]],
];
[$expectedArgs, $return] = array_shift($series);
$this->assertSame($expectedArgs, $args);
return $return;
})
;
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$messages = $connection->get(5);
$this->assertSame('1-0', $messages[0]['id']);
$this->assertSame('2-0', $messages[1]['id']);
}
#[DataProvider('provideAuthDsn')]
public function testAuth(string|array $expected, string $dsn)
{
@@ -330,15 +357,13 @@ class ConnectionTest extends TestCase
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$message = $connection->get();
$this->assertSame(0, $message[0]['id']);
$this->assertSame([
'id' => 0,
'data' => [
'message' => json_encode([
'body' => '1',
'headers' => [],
]),
],
], $message);
'message' => json_encode([
'body' => '1',
'headers' => [],
]),
], $message[0]['data']);
}
public function testClaimAbandonedMessageWithRaceCondition()

View File

@@ -56,7 +56,7 @@ class RedisExtIntegrationTest extends TestCase
'body' => '{"message": "Hi"}',
'headers' => ['type' => DummyMessage::class],
]),
], $message['data']);
], $message[0]['data']);
}
public function testGetTheFirstAvailableMessage()
@@ -69,14 +69,14 @@ class RedisExtIntegrationTest extends TestCase
'body' => '{"message": "Hi1"}',
'headers' => ['type' => DummyMessage::class],
]),
], $message['data']);
], $message[0]['data']);
$message = $this->connection->get();
$this->assertEquals([
'message' => json_encode([
'body' => '{"message": "Hi2"}',
'headers' => ['type' => DummyMessage::class],
]),
], $message['data']);
], $message[0]['data']);
}
public function testConnectionSendWithSameContent()
@@ -93,7 +93,7 @@ class RedisExtIntegrationTest extends TestCase
'body' => $body,
'headers' => $headers,
]),
], $message['data']);
], $message[0]['data']);
$message = $this->connection->get();
$this->assertEquals([
@@ -101,7 +101,7 @@ class RedisExtIntegrationTest extends TestCase
'body' => $body,
'headers' => $headers,
]),
], $message['data']);
], $message[0]['data']);
}
public function testConnectionSendAndGetDelayed()
@@ -116,7 +116,7 @@ class RedisExtIntegrationTest extends TestCase
'body' => '{"message": "Hi"}',
'headers' => ['type' => DummyMessage::class],
]),
], $message['data']);
], $message[0]['data']);
}
public function testConnectionSendDelayedMessagesWithSameContent()
@@ -133,7 +133,7 @@ class RedisExtIntegrationTest extends TestCase
'body' => $body,
'headers' => $headers,
]),
], $message['data']);
], $message[0]['data']);
$message = $this->connection->get();
$this->assertEquals([
@@ -141,7 +141,7 @@ class RedisExtIntegrationTest extends TestCase
'body' => $body,
'headers' => $headers,
]),
], $message['data']);
], $message[0]['data']);
}
public function testConnectionBelowRedeliverTimeout()
@@ -206,8 +206,8 @@ class RedisExtIntegrationTest extends TestCase
'body' => $body1,
'headers' => $headers,
]),
], $message['data']);
$connection->ack($message['id']);
], $message[0]['data']);
$connection->ack($message[0]['id']);
// Queue will return the second message
$message = $connection->get();
@@ -216,8 +216,8 @@ class RedisExtIntegrationTest extends TestCase
'body' => $body2,
'headers' => $headers,
]),
], $message['data']);
$connection->ack($message['id']);
], $message[0]['data']);
$connection->ack($message[0]['id']);
}
#[DataProvider('sentinelOptionNames')]
@@ -245,8 +245,8 @@ class RedisExtIntegrationTest extends TestCase
'body' => '1',
'headers' => [],
]),
], $message['data']);
$connection->reject($message['id']);
], $message[0]['data']);
$connection->reject($message[0]['id']);
$connection->cleanup();
}
@@ -282,8 +282,8 @@ class RedisExtIntegrationTest extends TestCase
'body' => '1',
'headers' => [],
]),
], $message['data']);
$connection->reject($message['id']);
], $message[0]['data']);
$connection->reject($message[0]['id']);
$connection->cleanup();
}
@@ -300,8 +300,8 @@ class RedisExtIntegrationTest extends TestCase
'body' => '1',
'headers' => [],
]),
], $message['data']);
$connection->reject($message['id']);
], $message[0]['data']);
$connection->reject($message[0]['id']);
$connection->cleanup();
}
@@ -319,8 +319,8 @@ class RedisExtIntegrationTest extends TestCase
'body' => '1',
'headers' => [],
]),
], $message['data']);
$connection->reject($message['id']);
], $message[0]['data']);
$connection->reject($message[0]['id']);
} finally {
$redis->unlink('messenger-lazy');
}
@@ -373,7 +373,7 @@ class RedisExtIntegrationTest extends TestCase
$this->assertNull($connection->get()); // no message, should return null immediately
$connection->add('1', []);
$this->assertNotEmpty($message = $connection->get());
$connection->reject($message['id']);
$connection->reject($message[0]['id']);
} finally {
$redis->unlink('messenger-getnonblocking');
}
@@ -390,7 +390,7 @@ class RedisExtIntegrationTest extends TestCase
$connection->add('2', []);
$failing = $connection->get();
$connection->reject($failing['id']);
$connection->reject($failing[0]['id']);
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', ['sentinel' => null], $redis);
$this->assertNotNull($connection->get());
@@ -435,12 +435,12 @@ class RedisExtIntegrationTest extends TestCase
$this->assertSame(3, $this->connection->getMessageCount());
$message = $this->connection->get();
$this->connection->ack($message['id']);
$this->connection->ack($message[0]['id']);
$this->assertSame(2, $this->connection->getMessageCount());
$message = $this->connection->get();
$this->connection->reject($message['id']);
$this->connection->reject($message[0]['id']);
$this->assertSame(1, $this->connection->getMessageCount());
}

View File

@@ -35,7 +35,7 @@ class RedisReceiverTest extends TestCase
public function testItReturnsTheDecodedMessageToTheHandler(array $redisEnvelope, $expectedMessage, SerializerInterface $serializer)
{
$connection = $this->createStub(Connection::class);
$connection->method('get')->willReturn($redisEnvelope);
$connection->method('get')->willReturn([$redisEnvelope]);
$receiver = new RedisReceiver($connection, $serializer);
$actualEnvelopes = $receiver->get();
@@ -50,6 +50,71 @@ class RedisReceiverTest extends TestCase
$this->assertSame($redisEnvelope['id'], $transportMessageIdStamp->getId());
}
public function testGetUsesFetchSizeWhenProvided()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
$redisEnvelope = [
'id' => 1,
'data' => [
'message' => json_encode([
'body' => '{"message": "Hi"}',
'headers' => [
'type' => DummyMessage::class,
],
]),
],
];
$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('get')->with(7)->willReturn([$redisEnvelope]);
$receiver = new RedisReceiver($connection, $serializer);
$actualEnvelopes = $receiver->get(7);
$this->assertCount(1, $actualEnvelopes);
}
public function testItReturnsMultipleDecodedMessagesWhenAvailable()
{
$connection = $this->createStub(Connection::class);
$connection->method('get')->willReturn([
[
'id' => '1',
'data' => [
'message' => json_encode([
'body' => '{"message": "Hi"}',
'headers' => [
'type' => DummyMessage::class,
],
]),
],
],
[
'id' => '2',
'data' => [
'message' => json_encode([
'body' => '{"message": "Hello"}',
'headers' => [
'type' => DummyMessage::class,
],
]),
],
],
]);
$receiver = new RedisReceiver($connection, new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
));
$envelopes = $receiver->get(2);
$this->assertCount(2, $envelopes);
$this->assertEquals(new DummyMessage('Hi'), $envelopes[0]->getMessage());
$this->assertEquals(new DummyMessage('Hello'), $envelopes[1]->getMessage());
}
#[DataProvider('rejectedRedisEnvelopeProvider')]
public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException(array $redisEnvelope)
{
@@ -57,7 +122,7 @@ class RedisReceiverTest extends TestCase
$serializer->method('decode')->willThrowException(new MessageDecodingFailedException());
$connection = $this->createStub(Connection::class);
$connection->method('get')->willReturn($redisEnvelope);
$connection->method('get')->willReturn([$redisEnvelope]);
$receiver = new RedisReceiver($connection, $serializer);
$envelopes = $receiver->get();

View File

@@ -49,7 +49,7 @@ class RedisTransportTest extends TestCase
];
$serializer->expects($this->once())->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('get')->willReturn($redisEnvelope);
$connection->method('get')->willReturn([$redisEnvelope]);
$envelopes = $transport->get();
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());

View File

@@ -69,6 +69,8 @@ class Connection
private bool $deleteAfterAck;
private bool $deleteAfterReject;
private bool $couldHavePendingMessages = true;
/** @var list<array{id: string, data: mixed}> */
private array $buffer = [];
public function __construct(array $options, \Redis|Relay|\RedisCluster|null $redis = null)
{
@@ -426,11 +428,17 @@ class Connection
$this->nextClaim = microtime(true) + $this->claimInterval;
}
public function get(): ?array
/**
* @return list<array{id: string, data: mixed}>|null
*/
public function get(int $fetchSize = 1): ?array
{
$fetchSize = max(1, $fetchSize);
if ($this->autoSetup) {
$this->setup();
}
$now = microtime();
$now = substr($now, 11).substr($now, 2, 3);
@@ -462,48 +470,78 @@ class Connection
$this->claimOldPendingMessages();
}
$messageId = '>'; // will receive new messages
$messages = $this->getPendingMessages($fetchSize);
if ($this->couldHavePendingMessages) {
$messageId = '0'; // will receive consumers pending messages
if (\count($messages) >= $fetchSize) {
return $messages;
}
$redis = $this->getRedis();
try {
$messages = $redis->xreadgroup(
$this->group,
$this->consumer,
[$this->stream => $messageId],
1,
1
);
} catch (\RedisException|\Relay\Exception $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if (false === $messages) {
if ($error = $redis->getLastError() ?: null) {
$redis->clearLastError();
while (true) {
if (!$this->couldHavePendingMessages && $this->nextClaim <= microtime(true)) {
$this->claimOldPendingMessages();
}
throw new TransportException($error ?? 'Could not read messages from the redis stream.');
$messageId = $this->couldHavePendingMessages ? '0' : '>';
try {
$streamMessages = $redis->xreadgroup(
$this->group,
$this->consumer,
[$this->stream => $messageId],
$fetchSize,
1
);
} catch (\RedisException|\Relay\Exception $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if (false === $streamMessages) {
if ($error = $redis->getLastError() ?: null) {
$redis->clearLastError();
}
throw new TransportException($error ?? 'Could not read messages from the redis stream.');
}
if ($this->couldHavePendingMessages && empty($streamMessages[$this->stream])) {
$this->couldHavePendingMessages = false;
continue;
}
foreach ($streamMessages[$this->stream] ?? [] as $key => $message) {
$this->buffer[] = [
'id' => $key,
'data' => $message,
];
}
break;
}
if ($this->couldHavePendingMessages && empty($messages[$this->stream])) {
$this->couldHavePendingMessages = false;
$messages = [...$messages, ...$this->getPendingMessages($fetchSize - \count($messages))];
// No pending messages so get a new one
return $this->get();
if (!$messages) {
return null;
}
foreach ($messages[$this->stream] ?? [] as $key => $message) {
return [
'id' => $key,
'data' => $message,
];
return $messages;
}
/**
* @return list<array{id: string, data: mixed}>
*/
private function getPendingMessages(int $fetchSize): array
{
$messages = [];
while ($fetchSize-- > 0 && $this->buffer) {
$messages[] = array_shift($this->buffer);
}
return null;
return $messages;
}
public function ack(string $id): void

View File

@@ -37,51 +37,67 @@ class RedisReceiver implements KeepaliveReceiverInterface, MessageCountAwareInte
$this->serializer = $serializer ?? new PhpSerializer();
}
public function get(): iterable
/**
* @param int $fetchSize
*/
public function get(/* int $fetchSize = 1 */): iterable
{
if (null === $message = $this->connection->get()) {
$fetchSize = \func_num_args() > 0 ? max(1, func_get_arg(0)) : 1;
if (null === $messages = $this->connection->get($fetchSize)) {
return [];
}
if (null === $message['data']) {
try {
$this->connection->reject($message['id']);
} catch (TransportException $e) {
if ($e->getPrevious()) {
throw $e;
$envelopes = [];
$shouldRetry = false;
foreach ($messages as $message) {
if (null === $message['data']) {
$shouldRetry = true;
try {
$this->connection->reject($message['id']);
} catch (TransportException $e) {
if ($e->getPrevious()) {
throw $e;
}
}
continue;
}
return $this->get();
}
if (null === $redisEnvelope = json_decode($message['data']['message'] ?? '', true)) {
return [];
}
$stamps = [
new RedisReceivedStamp($message['id']),
new TransportMessageIdStamp($message['id']),
];
try {
if (\array_key_exists('body', $redisEnvelope) && \array_key_exists('headers', $redisEnvelope)) {
$envelope = $this->serializer->decode($redisEnvelope = [
'body' => $redisEnvelope['body'],
'headers' => $redisEnvelope['headers'],
]);
} else {
$envelope = $this->serializer->decode($redisEnvelope);
if (null === $redisEnvelope = json_decode($message['data']['message'] ?? '', true)) {
continue;
}
} catch (MessageDecodingFailedException $e) {
return [
MessageDecodingFailedException::wrap($redisEnvelope, $e->getMessage(), $e->getCode(), $e)->with(...$stamps),
$stamps = [
new RedisReceivedStamp($message['id']),
new TransportMessageIdStamp($message['id']),
];
try {
if (\array_key_exists('body', $redisEnvelope) && \array_key_exists('headers', $redisEnvelope)) {
$envelope = $this->serializer->decode($redisEnvelope = [
'body' => $redisEnvelope['body'],
'headers' => $redisEnvelope['headers'],
]);
} else {
$envelope = $this->serializer->decode($redisEnvelope);
}
} catch (MessageDecodingFailedException $e) {
$envelopes[] = MessageDecodingFailedException::wrap($redisEnvelope, $e->getMessage(), $e->getCode(), $e)->with(...$stamps);
continue;
}
$envelopes[] = $envelope->withoutAll(TransportMessageIdStamp::class)->with(...$stamps);
}
return [
$envelope->withoutAll(TransportMessageIdStamp::class)->with(...$stamps),
];
if (!$envelopes && $shouldRetry) {
return $this->get($fetchSize);
}
return $envelopes;
}
public function ack(Envelope $envelope): void

View File

@@ -37,9 +37,14 @@ class RedisTransport implements TransportInterface, KeepaliveReceiverInterface,
$this->serializer = $serializer ?? new PhpSerializer();
}
public function get(): iterable
/**
* @param int $fetchSize
*/
public function get(/* int $fetchSize = 1 */): iterable
{
return $this->getReceiver()->get();
$fetchSize = \func_num_args() > 0 ? func_get_arg(0) : 1;
return $this->getReceiver()->get($fetchSize);
}
public function ack(Envelope $envelope): void

View File

@@ -8,6 +8,8 @@ CHANGELOG
* Receivers no longer delete messages on decode failure; they are routed through the normal retry/failure transport path
* Add regex support for transport name patterns in the `messenger:consume` command
* Add an idle timeout option to the `BatchHandlerTrait`
* Add argument `$fetchSize` to `ReceiverInterface::get()` and `QueueReceiverInterface::getFromQueues()`, and to all bridges
* Add a `--fetch-size` option to the `messenger:consume` command to control how many messages are fetched per iteration
8.0
---

View File

@@ -78,6 +78,7 @@ class ConsumeMessagesCommand extends Command implements SignalableCommandInterfa
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
new InputOption('exclude-receivers', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Exclude specific receivers/transports from consumption (can only be used with --all)'),
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
new InputOption('fetch-size', null, InputOption::VALUE_REQUIRED, 'The number of messages to fetch per call to the transport', 1),
])
->setHelp(<<<'EOF'
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
@@ -132,6 +133,10 @@ class ConsumeMessagesCommand extends Command implements SignalableCommandInterfa
Use the <info>--exclude-receivers</info> option to exclude specific receivers/transports from consumption (can only be used with <info>--all</info>):
<info>php %command.full_name% --all --exclude-receivers=<receiver-name></info>
Use the <info>--fetch-size</info> option to control how many messages are fetched per call to the transport:
<info>php %command.full_name% <receiver-name> --fetch-size=8</info>
EOF
)
;
@@ -296,6 +301,12 @@ class ConsumeMessagesCommand extends Command implements SignalableCommandInterfa
$options['queues'] = $queues;
}
if (1 < $fetchSize = (int) $input->getOption('fetch-size')) {
throw new \InvalidArgumentException(\sprintf('The "--fetch-size" option must be a positive integer, "%s" given.', $input->getOption('fetch-size')));
}
$options['fetch_size'] = $fetchSize;
try {
$this->worker->run($options);
} finally {

View File

@@ -20,6 +20,7 @@ class DummyReceiver implements ReceiverInterface
private array $rejectedEnvelopes = [];
private int $acknowledgeCount = 0;
private int $rejectCount = 0;
private array $fetchSizes = [];
/**
* @param Envelope[][] $deliveriesOfEnvelopes
@@ -29,8 +30,13 @@ class DummyReceiver implements ReceiverInterface
) {
}
public function get(): iterable
/**
* @param int $fetchSize
*/
public function get(/* int $fetchSize = 1 */): iterable
{
$this->fetchSizes[] = \func_num_args() > 0 ? func_get_arg(0) : 1;
$val = array_shift($this->deliveriesOfEnvelopes);
return $val ?? [];
@@ -67,4 +73,9 @@ class DummyReceiver implements ReceiverInterface
{
return $this->rejectedEnvelopes;
}
public function getFetchSizes(): array
{
return $this->fetchSizes;
}
}

View File

@@ -69,7 +69,7 @@ class InMemoryTransportTest extends TestCase
$envelope1 = $this->transport->send($envelope1);
$envelope2 = new Envelope(new \stdClass());
$envelope2 = $this->transport->send($envelope2);
$this->assertSame([$envelope1, $envelope2], $this->transport->get());
$this->assertSame([$envelope1, $envelope2], $this->transport->get(2));
$this->transport->ack($envelope1);
$this->assertSame([$envelope2], $this->transport->get());
$this->transport->reject($envelope2);
@@ -109,13 +109,22 @@ class InMemoryTransportTest extends TestCase
$this->assertSame([$envelopeDecoded], $serializeTransport->get());
}
public function testGetUsesFetchSizeWhenProvided()
{
$envelope1 = $this->transport->send(new Envelope(new \stdClass()));
$envelope2 = $this->transport->send(new Envelope(new \stdClass()));
$this->assertSame([$envelope1], $this->transport->get(1));
$this->assertSame([$envelope1, $envelope2], $this->transport->get(2));
}
public function testAcknowledgeSameMessageWithDifferentStamps()
{
$envelope1 = new Envelope(new \stdClass(), [new AnEnvelopeStamp()]);
$envelope1 = $this->transport->send($envelope1);
$envelope2 = new Envelope(new \stdClass(), [new AnEnvelopeStamp()]);
$envelope2 = $this->transport->send($envelope2);
$this->assertSame([$envelope1, $envelope2], $this->transport->get());
$this->assertSame([$envelope1, $envelope2], $this->transport->get(2));
$this->transport->ack($envelope1->with(new AnEnvelopeStamp()));
$this->assertSame([$envelope2], $this->transport->get());
$this->transport->reject($envelope2->with(new AnEnvelopeStamp()));

View File

@@ -373,21 +373,47 @@ class WorkerTest extends TestCase
public function testWorkerLimitQueues()
{
$envelope = [new Envelope(new DummyMessage('message1'))];
$receiver = $this->createMock(QueueReceiverInterface::class);
$receiver->expects($this->once())
->method('getFromQueues')
->with(['foo'])
->willReturn($envelope)
;
$receiver->expects($this->never())
->method('get')
;
$receiver = new DummyQueueReceiver([$envelope]);
$dispatcher = new EventDispatcher();
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
$worker = new Worker(['transport' => $receiver], new MessageBus(), $dispatcher, clock: new MockClock());
$worker->run(['queues' => ['foo']]);
$this->assertSame([['foo']], $receiver->queueNames);
$this->assertSame([1], $receiver->getFromQueuesFetchSizes);
}
public function testWorkerPassesFetchSizeToReceiver()
{
$receiver = new DummyReceiver([
[new Envelope(new DummyMessage('message1'))],
]);
$dispatcher = new EventDispatcher();
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
$worker = new Worker(['transport' => $receiver], new MessageBus(), $dispatcher, clock: new MockClock());
$worker->run(['fetch_size' => 7]);
$this->assertSame([7], $receiver->getFetchSizes());
}
public function testWorkerPassesFetchSizeToQueueReceiver()
{
$receiver = new DummyQueueReceiver([
[new Envelope(new DummyMessage('message1'))],
]);
$dispatcher = new EventDispatcher();
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
$worker = new Worker(['transport' => $receiver], new MessageBus(), $dispatcher, clock: new MockClock());
$worker->run(['queues' => ['foo'], 'fetch_size' => 7]);
$this->assertSame([['foo']], $receiver->queueNames);
$this->assertSame([7], $receiver->getFromQueuesFetchSizes);
}
public function testWorkerLimitQueuesUnsupported()
@@ -717,7 +743,7 @@ class WorkerTest extends TestCase
$middleware = new HandleMessageMiddleware(new HandlersLocator([
DummyMessage::class => [new HandlerDescriptor($batchHandler)],
SecondHandlerDummyMessage::class => [new HandlerDescriptor(function (SecondHandlerDummyMessage $message) {})],
SecondHandlerDummyMessage::class => [new HandlerDescriptor(static function (SecondHandlerDummyMessage $message) {})],
]));
$bus = new MessageBus([$middleware]);
@@ -924,9 +950,18 @@ class WorkerTest extends TestCase
class DummyQueueReceiver extends DummyReceiver implements QueueReceiverInterface
{
public function getFromQueues(array $queueNames): iterable
public array $queueNames = [];
public array $getFromQueuesFetchSizes = [];
/**
* @param int $fetchSize
*/
public function getFromQueues(array $queueNames/* , int $fetchSize = 1 */): iterable
{
return $this->get();
$this->queueNames[] = $queueNames;
$this->getFromQueuesFetchSizes[] = 1 < \func_num_args() ? func_get_arg(1) : 1;
return $this->get(...\func_num_args() > 1 ? [func_get_arg(1)] : []);
}
}

View File

@@ -56,13 +56,20 @@ class InMemoryTransport implements TransportInterface, ResetInterface
) {
}
public function get(): iterable
/**
* @param int $fetchSize Best-effort hint about how many messages can be received in one call
*/
public function get(/* int $fetchSize = 1 */): iterable
{
$fetchSize = \func_num_args() > 0 ? max(1, func_get_arg(0)) : 1;
$envelopes = [];
$now = $this->clock?->now() ?? new \DateTimeImmutable();
foreach ($this->decode($this->queue) as $id => $envelope) {
if (!isset($this->availableAt[$id]) || $now > $this->availableAt[$id]) {
$envelopes[] = $envelope;
if (\count($envelopes) >= $fetchSize) {
break;
}
}
}

View File

@@ -24,8 +24,9 @@ interface QueueReceiverInterface extends ReceiverInterface
* Get messages from the specified queue names instead of consuming from all queues.
*
* @param string[] $queueNames
* @param int $fetchSize Best-effort hint about how many messages can be received in one call
*
* @return Envelope[]
*/
public function getFromQueues(array $queueNames): iterable;
public function getFromQueues(array $queueNames/* , int $fetchSize = 1 */): iterable;
}

View File

@@ -39,11 +39,13 @@ interface ReceiverInterface
* an Envelope containing a MessageDecodingFailedException so the worker
* can route it through the usual failure handling path.
*
* @param int $fetchSize Best-effort hint about how many messages can be received in one call
*
* @return iterable<Envelope>
*
* @throws TransportException If there is an issue communicating with the transport
*/
public function get(): iterable;
public function get(/* int $fetchSize = 1 */): iterable;
/**
* Acknowledges that the passed message was handled.

View File

@@ -30,7 +30,10 @@ class SingleMessageReceiver implements ReceiverInterface
) {
}
public function get(): iterable
/**
* @param int $fetchSize
*/
public function get(/* int $fetchSize = 1 */): iterable
{
if ($this->hasReceived) {
return [];

View File

@@ -30,7 +30,10 @@ class SyncTransport implements TransportInterface
) {
}
public function get(): iterable
/**
* @param int $fetchSize
*/
public function get(/* int $fetchSize = 1 */): iterable
{
throw new InvalidArgumentException('You cannot receive messages from the Messenger SyncTransport.');
}

View File

@@ -100,11 +100,14 @@ class Worker
while (!$this->shouldStop) {
$envelopeHandled = false;
$envelopeHandledStart = $this->clock->now();
$fetchSize = max(1, $options['fetch_size'] ?? 1);
foreach ($this->receivers as $transportName => $receiver) {
if ($queueNames) {
$envelopes = $receiver->getFromQueues($queueNames);
/** @var QueueReceiverInterface $receiver */
$envelopes = $receiver->getFromQueues($queueNames, $fetchSize);
} else {
$envelopes = $receiver->get();
$envelopes = $receiver->get($fetchSize);
}
foreach ($envelopes as $envelope) {

View File

@@ -24,7 +24,10 @@ class SchedulerTransport implements TransportInterface
) {
}
public function get(): iterable
/**
* @param int $fetchSize
*/
public function get(/* int $fetchSize = 1 */): iterable
{
foreach ($this->messageGenerator->getMessages() as $context => $message) {
$stamp = new ScheduledStamp($context);