[Messenger] Add a --fetch-size option to the messenger:consume command to control how many messages are fetched per iteration

This commit is contained in:
Nicolas Grekas
2026-03-12 21:19:40 +01:00
parent ee4ab46d5d
commit ccd4924b96
7 changed files with 139 additions and 50 deletions

View File

@@ -53,14 +53,14 @@ class AmazonSqsIntegrationTest extends TestCase
usleep(5000);
}
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class, DummyMessage::class => 'special'], $encoded['headers']);
$this->assertEquals('{"message": "Hi"}', $encoded[0]['body']);
$this->assertEquals(['type' => DummyMessage::class, DummyMessage::class => 'special'], $encoded[0]['headers']);
$this->waitUntilElapsed(seconds: 1.0, since: $messageSentAt);
$connection->keepalive($encoded['id']);
$connection->keepalive($encoded[0]['id']);
$this->waitUntilElapsed(seconds: 2.0, since: $messageSentAt);
$this->assertSame(0, $connection->getMessageCount(), 'The queue should be empty since visibility timeout was extended');
$connection->delete($encoded['id']);
$connection->delete($encoded[0]['id']);
}
private function waitUntilElapsed(float $seconds, float $since): void

View File

@@ -32,7 +32,7 @@ class AmazonSqsReceiverTest extends TestCase
$sqsEnvelop = $this->createSqsEnvelope();
$connection = $this->createStub(Connection::class);
$connection->method('get')->willReturn($sqsEnvelop);
$connection->method('get')->willReturn([$sqsEnvelop]);
$receiver = new AmazonSqsReceiver($connection, $serializer);
$actualEnvelopes = iterator_to_array($receiver->get());
@@ -40,6 +40,44 @@ class AmazonSqsReceiverTest extends TestCase
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
}
public function testGetUsesFetchSizeWhenProvided()
{
$serializer = $this->createSerializer();
$sqsEnvelope = $this->createSqsEnvelope();
$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('get')->with(7)->willReturn([$sqsEnvelope]);
$receiver = new AmazonSqsReceiver($connection, $serializer);
$actualEnvelopes = iterator_to_array($receiver->get(7));
$this->assertCount(1, $actualEnvelopes);
}
public function testItReturnsMultipleDecodedMessagesWhenAvailable()
{
$serializer = $this->createSerializer();
$connection = $this->createStub(Connection::class);
$connection->method('get')->willReturn([
$this->createSqsEnvelope(),
[
'id' => 2,
'body' => '{"message": "Hello"}',
'headers' => [
'type' => DummyMessage::class,
],
],
]);
$receiver = new AmazonSqsReceiver($connection, $serializer);
$actualEnvelopes = iterator_to_array($receiver->get(2));
$this->assertCount(2, $actualEnvelopes);
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
$this->assertEquals(new DummyMessage('Hello'), $actualEnvelopes[1]->getMessage());
}
public function testItReturnsSerializedEnvelopeWhenDecodingFails()
{
$serializer = $this->createStub(PhpSerializer::class);
@@ -47,7 +85,7 @@ class AmazonSqsReceiverTest extends TestCase
$sqsEnvelop = $this->createSqsEnvelope();
$connection = $this->createStub(Connection::class);
$connection->method('get')->willReturn($sqsEnvelop);
$connection->method('get')->willReturn([$sqsEnvelop]);
$receiver = new AmazonSqsReceiver($connection, $serializer);
$envelopes = iterator_to_array($receiver->get());

View File

@@ -54,7 +54,7 @@ class AmazonSqsTransportTest extends TestCase
];
$serializer->expects($this->once())->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('get')->willReturn($sqsEnvelope);
$connection->method('get')->willReturn([$sqsEnvelope]);
$envelopes = iterator_to_array($transport->get());
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());

View File

@@ -280,6 +280,48 @@ class ConnectionTest extends TestCase
$this->assertNull($connection->get());
}
public function testGetUsesMaxOfFetchSizeAndConfiguredBufferSize()
{
$client = $this->createMock(SqsClient::class);
$client
->method('getQueueUrl')
->willReturnMap([
[['QueueName' => 'queue', 'QueueOwnerAWSAccountId' => 123], ResultMockFactory::create(GetQueueUrlResult::class, ['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue'])],
]);
$firstResult = ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [
new Message(['MessageId' => 1, 'Body' => 'this is a test']),
]]);
$secondResult = ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => []]);
$series = [
[[['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
'VisibilityTimeout' => null,
'MaxNumberOfMessages' => 12,
'MessageAttributeNames' => ['All'],
'WaitTimeSeconds' => 20]], $firstResult],
[[['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
'VisibilityTimeout' => null,
'MaxNumberOfMessages' => 12,
'MessageAttributeNames' => ['All'],
'WaitTimeSeconds' => 20]], $secondResult],
];
$client->expects($this->exactly(2))
->method('receiveMessage')
->willReturnCallback(function (...$args) use (&$series) {
[$expectedArgs, $return] = array_shift($series);
$this->assertSame($expectedArgs, $args);
return $return;
})
;
$connection = new Connection(['queue_name' => 'queue', 'account' => 123, 'auto_setup' => false, 'buffer_size' => 9], $client);
$this->assertNotNull($connection->get(12));
$this->assertNull($connection->get(12));
}
public function testUnexpectedSqsError()
{
$this->expectException(HttpException::class);

View File

@@ -36,28 +36,35 @@ class AmazonSqsReceiver implements KeepaliveReceiverInterface, MessageCountAware
$this->serializer = $serializer ?? new PhpSerializer();
}
public function get(): iterable
/**
* @param int $fetchSize
*/
public function get(/* int $fetchSize = 1 */): iterable
{
$fetchSize = \func_num_args() > 0 ? max(1, func_get_arg(0)) : 1;
try {
if (!$sqsEnvelope = $this->connection->get()) {
if (!$sqsEnvelopes = $this->connection->get($fetchSize)) {
return;
}
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
$stamps = [
new AmazonSqsReceivedStamp($sqsEnvelope['id']),
new TransportMessageIdStamp($sqsEnvelope['id']),
];
foreach ($sqsEnvelopes as $sqsEnvelope) {
$stamps = [
new AmazonSqsReceivedStamp($sqsEnvelope['id']),
new TransportMessageIdStamp($sqsEnvelope['id']),
];
try {
yield $this->serializer->decode($sqsEnvelope = [
'body' => $sqsEnvelope['body'],
'headers' => $sqsEnvelope['headers'],
])->with(...$stamps);
} catch (MessageDecodingFailedException $e) {
yield MessageDecodingFailedException::wrap($sqsEnvelope, $e->getMessage(), $e->getCode(), $e)->with(...$stamps);
try {
yield $this->serializer->decode($sqsEnvelope = [
'body' => $sqsEnvelope['body'],
'headers' => $sqsEnvelope['headers'],
])->with(...$stamps);
} catch (MessageDecodingFailedException $e) {
yield MessageDecodingFailedException::wrap($sqsEnvelope, $e->getMessage(), $e->getCode(), $e)->with(...$stamps);
}
}
}

View File

@@ -43,9 +43,14 @@ class AmazonSqsTransport implements TransportInterface, KeepaliveReceiverInterfa
$this->serializer = $serializer ?? new PhpSerializer();
}
public function get(): iterable
/**
* @param int $fetchSize
*/
public function get(/* int $fetchSize = 1 */): iterable
{
return $this->getReceiver()->get();
$fetchSize = \func_num_args() > 0 ? func_get_arg(0) : 1;
return $this->getReceiver()->get($fetchSize);
}
public function ack(Envelope $envelope): void

View File

@@ -99,7 +99,7 @@ class Connection
* * access_key: AWS access key
* * secret_key: AWS secret key
* * session_token: AWS session token (required only when using temporary credentials)
* * buffer_size: number of messages to prefetch (Default: 9)
* * buffer_size: number of messages to prefetch (Default: 9, Max: 10)
* * wait_time: long polling duration in seconds (Default: 20)
* * poll_timeout: amount of seconds the transport should wait for new message
* * visibility_timeout: amount of seconds the message won't be visible
@@ -188,61 +188,58 @@ class Connection
return new self($configuration, new SqsClient($clientConfiguration, null, $client, $logger), $queueUrl);
}
public function get(): ?array
public function get(int $fetchSize = 1): ?array
{
if ($this->configuration['auto_setup']) {
$this->setup();
}
foreach ($this->getNextMessages() as $message) {
return $message;
$fetchSize = max(1, $fetchSize);
$messages = $this->getPendingMessages($fetchSize);
if (\count($messages) < $fetchSize
&& $this->fetchMessages(max($fetchSize, $this->configuration['buffer_size']))
) {
$messages = [...$messages, ...$this->getPendingMessages($fetchSize - \count($messages))];
}
return null;
return $messages ?: null;
}
/**
* @return \Generator<int, array>
* @return list<array>
*/
private function getNextMessages(): \Generator
private function getPendingMessages(int $fetchSize): array
{
yield from $this->getPendingMessages();
yield from $this->getNewMessages();
}
$messages = [];
/**
* @return \Generator<int, array>
*/
private function getPendingMessages(): \Generator
{
while ($this->buffer) {
yield array_shift($this->buffer);
while ($fetchSize-- > 0 && $this->buffer) {
$messages[] = array_shift($this->buffer);
}
return $messages;
}
/**
* @return \Generator<int, array>
*/
private function getNewMessages(): \Generator
private function fetchMessages(int $fetchSize): bool
{
if (null === $this->currentResponse) {
$this->currentResponse = $this->client->receiveMessage([
'QueueUrl' => $this->getQueueUrl(),
'VisibilityTimeout' => $this->configuration['visibility_timeout'],
'MaxNumberOfMessages' => $this->configuration['buffer_size'],
'MaxNumberOfMessages' => min($fetchSize, 10), // SQS limitation
'MessageAttributeNames' => ['All'],
'WaitTimeSeconds' => $this->configuration['wait_time'],
]);
}
if (!$this->fetchMessage()) {
return;
if (!$this->fetchPendingMessages()) {
return false;
}
yield from $this->getPendingMessages();
return true;
}
private function fetchMessage(): bool
private function fetchPendingMessages(): bool
{
if (!$this->currentResponse->resolve($this->configuration['poll_timeout'])) {
return false;
@@ -416,13 +413,13 @@ class Connection
{
if (null !== $this->currentResponse) {
// fetch current response in order to requeue in transit messages
if (!$this->fetchMessage()) {
if (!$this->fetchPendingMessages()) {
$this->currentResponse->cancel();
$this->currentResponse = null;
}
}
foreach ($this->getPendingMessages() as $message) {
foreach ($this->getPendingMessages(\count($this->buffer)) as $message) {
$this->client->changeMessageVisibility([
'QueueUrl' => $this->getQueueUrl(),
'ReceiptHandle' => $message['id'],