mirror of
https://github.com/symfony/redis-messenger.git
synced 2026-03-24 01:12:15 +01:00
Merge branch '8.0' into 8.1
* 8.0: Fix merge [VarDumper] Wrong dumper output for Accept: aplication/json requests [HttpKernel] Reset router locale to default when finishing main request Only decrement pendingRequests when it's more than zero [Dotenv] Fix self-referencing variables with defaults and env key resolution during deferred expansion Improve Bulgarian translations in validators.bg.xlf [Cache] Fix ChainAdapter ignoring item expiry when propagating to earlier adapters [Form] Fix typed property initialization in ValidatorExtension [Messenger] Fix duplicate pending messages in Redis transport with batch handlers Fix deprecation notices for "@method" annotations when implementing interfaces directly
This commit is contained in:
@@ -147,28 +147,51 @@ class ConnectionTest extends TestCase
|
||||
$this->assertInstanceOf(Connection::class, new Connection([], $redis));
|
||||
}
|
||||
|
||||
public function testKeepGettingPendingMessages()
|
||||
public function testPendingScanAdvancesCursorWithoutDuplicates()
|
||||
{
|
||||
$redis = $this->createRedisMock();
|
||||
|
||||
$redis->expects($this->exactly(3))->method('xreadgroup')
|
||||
->with('symfony', 'consumer', ['queue' => 0], 1, 1)
|
||||
->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]);
|
||||
$redis->expects($this->exactly(4))->method('xreadgroup')
|
||||
->willReturnCallback(function (...$args) {
|
||||
static $series = [
|
||||
// pending scan from '0': returns first pending message
|
||||
[['symfony', 'consumer', ['queue' => '0'], 1, 1], ['queue' => ['100-0' => ['message' => '{"body":"1","headers":[]}']]]],
|
||||
// pending scan advances cursor past '100-0': returns next pending message
|
||||
[['symfony', 'consumer', ['queue' => '100-0'], 1, 1], ['queue' => ['200-0' => ['message' => '{"body":"2","headers":[]}']]]],
|
||||
// pending scan advances cursor past '200-0': no more pending messages
|
||||
[['symfony', 'consumer', ['queue' => '200-0'], 1, 1], []],
|
||||
// fallback to new messages: none available
|
||||
[['symfony', 'consumer', ['queue' => '>'], 1, 1], []],
|
||||
];
|
||||
|
||||
[$expectedArgs, $return] = array_shift($series);
|
||||
$this->assertSame($expectedArgs, $args);
|
||||
|
||||
return $return;
|
||||
});
|
||||
|
||||
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
|
||||
$this->assertNotNull($connection->get());
|
||||
$this->assertNotNull($connection->get());
|
||||
$this->assertNotNull($connection->get());
|
||||
|
||||
$msg1 = $connection->get();
|
||||
$this->assertSame('100-0', $msg1[0]['id']);
|
||||
|
||||
$msg2 = $connection->get();
|
||||
$this->assertSame('200-0', $msg2[0]['id']);
|
||||
|
||||
$this->assertNull($connection->get());
|
||||
}
|
||||
|
||||
public function testGetUsesFetchSizeWhenProvided()
|
||||
{
|
||||
$redis = $this->createRedisMock();
|
||||
|
||||
$redis->expects($this->once())->method('xreadgroup')
|
||||
$redis->expects($this->exactly(2))->method('xreadgroup')
|
||||
->willReturnCallback(function (...$args) {
|
||||
static $series = [
|
||||
[['symfony', 'consumer', ['queue' => '0'], 5, 1], ['queue' => [
|
||||
// pending scan from '0' with count=1: no pending messages
|
||||
[['symfony', 'consumer', ['queue' => '0'], 1, 1], []],
|
||||
// new messages with fetchSize=5
|
||||
[['symfony', 'consumer', ['queue' => '>'], 5, 1], ['queue' => [
|
||||
'1-0' => ['message' => json_encode(['body' => 'First', 'headers' => []])],
|
||||
'2-0' => ['message' => json_encode(['body' => 'Second', 'headers' => []])],
|
||||
]]],
|
||||
@@ -661,6 +684,230 @@ class ConnectionTest extends TestCase
|
||||
$connection->keepalive('redisid-123', 3000);
|
||||
}
|
||||
|
||||
public function testSkipAlreadyInflightPendingMessage()
|
||||
{
|
||||
$redis = $this->createRedisMock();
|
||||
|
||||
$redis->expects($this->exactly(5))->method('xreadgroup')
|
||||
->willReturnCallback(function (...$args) {
|
||||
static $series = [
|
||||
// get #1: pending scan returns msg-A
|
||||
[['symfony', 'consumer', ['queue' => '0'], 1, 1], ['queue' => ['msg-A' => ['message' => '{"body":"1","headers":[]}']]]],
|
||||
// get #2: pending scan from 'msg-A', no more pending
|
||||
[['symfony', 'consumer', ['queue' => 'msg-A'], 1, 1], []],
|
||||
// get #2: claim resets cursor to '0', rescan returns msg-A again — skipped (in-flight)
|
||||
[['symfony', 'consumer', ['queue' => '0'], 1, 1], ['queue' => ['msg-A' => ['message' => '{"body":"1","headers":[]}']]]],
|
||||
// get #2: cursor advances past msg-A, no more pending
|
||||
[['symfony', 'consumer', ['queue' => 'msg-A'], 1, 1], []],
|
||||
// get #2: fallback to new messages
|
||||
[['symfony', 'consumer', ['queue' => '>'], 1, 1], []],
|
||||
];
|
||||
|
||||
[$expectedArgs, $return] = array_shift($series);
|
||||
$this->assertSame($expectedArgs, $args);
|
||||
|
||||
return $return;
|
||||
});
|
||||
|
||||
$redis->expects($this->once())->method('xpending')
|
||||
->willReturn([[0 => 'other-msg', 1 => 'consumer-2', 2 => 3600001]]);
|
||||
$redis->expects($this->once())->method('xclaim')->willReturn([]);
|
||||
|
||||
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
|
||||
|
||||
$this->assertSame('msg-A', $connection->get()[0]['id']);
|
||||
|
||||
// msg-A is still in-flight, so when the claim resets the cursor and
|
||||
// the rescan encounters msg-A again, it must be skipped
|
||||
$this->assertNull($connection->get());
|
||||
}
|
||||
|
||||
public function testAckRemovesInflightId()
|
||||
{
|
||||
$redis = $this->createRedisMock();
|
||||
|
||||
$redis->expects($this->once())->method('xreadgroup')
|
||||
->with('symfony', 'consumer', ['queue' => '0'], 1, 1)
|
||||
->willReturn(['queue' => ['msg-A' => ['message' => '{"body":"1","headers":[]}']]]);
|
||||
|
||||
$redis->expects($this->once())->method('xack')
|
||||
->with('queue', 'symfony', ['msg-A'])
|
||||
->willReturn(1);
|
||||
$redis->expects($this->once())->method('xdel')
|
||||
->with('queue', ['msg-A'])
|
||||
->willReturn(1);
|
||||
|
||||
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
|
||||
|
||||
$inflightIds = (new \ReflectionClass(Connection::class))->getProperty('inflightIds');
|
||||
|
||||
$msg = $connection->get();
|
||||
$this->assertSame('msg-A', $msg[0]['id']);
|
||||
$this->assertArrayHasKey('msg-A', $inflightIds->getValue($connection));
|
||||
|
||||
$connection->ack('msg-A');
|
||||
$this->assertEmpty($inflightIds->getValue($connection));
|
||||
}
|
||||
|
||||
public function testRejectRemovesInflightId()
|
||||
{
|
||||
$redis = $this->createRedisMock();
|
||||
|
||||
$redis->expects($this->once())->method('xreadgroup')
|
||||
->with('symfony', 'consumer', ['queue' => '0'], 1, 1)
|
||||
->willReturn(['queue' => ['msg-A' => ['message' => '{"body":"1","headers":[]}']]]);
|
||||
|
||||
$redis->expects($this->once())->method('xack')
|
||||
->with('queue', 'symfony', ['msg-A'])
|
||||
->willReturn(1);
|
||||
$redis->expects($this->once())->method('xdel')
|
||||
->with('queue', ['msg-A'])
|
||||
->willReturn(1);
|
||||
|
||||
$connection = Connection::fromDsn('redis://localhost/queue?delete_after_reject=true', [], $redis);
|
||||
|
||||
$inflightIds = (new \ReflectionClass(Connection::class))->getProperty('inflightIds');
|
||||
|
||||
$msg = $connection->get();
|
||||
$this->assertSame('msg-A', $msg[0]['id']);
|
||||
$this->assertArrayHasKey('msg-A', $inflightIds->getValue($connection));
|
||||
|
||||
$connection->reject('msg-A');
|
||||
$this->assertEmpty($inflightIds->getValue($connection));
|
||||
}
|
||||
|
||||
public function testClaimCanProcessMultipleMessagesWithinOneInterval()
|
||||
{
|
||||
$redis = $this->createRedisMock();
|
||||
|
||||
// Flow:
|
||||
// get() #1: pending '0' → empty, claim finds claim-1, pending '0' → claim-1
|
||||
// ack('claim-1')
|
||||
// get() #2: pending 'claim-1' → empty, claim finds claim-2, pending '0' → claim-2
|
||||
$redis->expects($this->exactly(4))->method('xreadgroup')
|
||||
->willReturnCallback(function (...$args) {
|
||||
static $series = [
|
||||
[['symfony', 'consumer', ['queue' => '0'], 1, 1], []],
|
||||
[['symfony', 'consumer', ['queue' => '0'], 1, 1], ['queue' => ['claim-1' => ['message' => '{"body":"1","headers":[]}']]]],
|
||||
[['symfony', 'consumer', ['queue' => 'claim-1'], 1, 1], []],
|
||||
[['symfony', 'consumer', ['queue' => '0'], 1, 1], ['queue' => ['claim-2' => ['message' => '{"body":"2","headers":[]}']]]],
|
||||
];
|
||||
|
||||
[$expectedArgs, $return] = array_shift($series);
|
||||
$this->assertSame($expectedArgs, $args);
|
||||
|
||||
return $return;
|
||||
});
|
||||
|
||||
$redis->expects($this->exactly(2))->method('xpending')
|
||||
->willReturnOnConsecutiveCalls(
|
||||
[[0 => 'claim-1', 1 => 'consumer-2', 2 => 3600001]],
|
||||
[[0 => 'claim-2', 1 => 'consumer-2', 2 => 3600001]]
|
||||
);
|
||||
|
||||
$redis->expects($this->exactly(2))->method('xclaim')
|
||||
->willReturn([]);
|
||||
|
||||
$redis->expects($this->once())->method('xack')
|
||||
->with('queue', 'symfony', ['claim-1'])
|
||||
->willReturn(1);
|
||||
$redis->expects($this->once())->method('xdel')
|
||||
->with('queue', ['claim-1'])
|
||||
->willReturn(1);
|
||||
|
||||
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
|
||||
|
||||
$msg1 = $connection->get();
|
||||
$this->assertSame('claim-1', $msg1[0]['id']);
|
||||
|
||||
$connection->ack('claim-1');
|
||||
|
||||
$msg2 = $connection->get();
|
||||
$this->assertSame('claim-2', $msg2[0]['id']);
|
||||
}
|
||||
|
||||
public function testClaimIntervalAdvancedOnlyWhenNoClaimableMessages()
|
||||
{
|
||||
$redis = $this->createRedisMock();
|
||||
|
||||
$redis->expects($this->exactly(4))->method('xreadgroup')
|
||||
->willReturnCallback(function (...$args) {
|
||||
static $series = [
|
||||
[['symfony', 'consumer', ['queue' => '0'], 1, 1], []],
|
||||
[['symfony', 'consumer', ['queue' => '0'], 1, 1], ['queue' => ['msg-A' => ['message' => '{"body":"1","headers":[]}']]]],
|
||||
[['symfony', 'consumer', ['queue' => 'msg-A'], 1, 1], []],
|
||||
[['symfony', 'consumer', ['queue' => '>'], 1, 1], []],
|
||||
];
|
||||
|
||||
[$expectedArgs, $return] = array_shift($series);
|
||||
$this->assertSame($expectedArgs, $args);
|
||||
|
||||
return $return;
|
||||
});
|
||||
|
||||
$redis->expects($this->exactly(2))->method('xpending')
|
||||
->willReturnOnConsecutiveCalls(
|
||||
[[0 => 'msg-A', 1 => 'consumer-2', 2 => 3600001]],
|
||||
[]
|
||||
);
|
||||
|
||||
$redis->expects($this->once())->method('xclaim')->willReturn([]);
|
||||
$redis->expects($this->once())->method('xack')->willReturn(1);
|
||||
$redis->expects($this->once())->method('xdel')->willReturn(1);
|
||||
|
||||
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
|
||||
|
||||
$nextClaimProp = (new \ReflectionClass(Connection::class))->getProperty('nextClaim');
|
||||
|
||||
$this->assertSame(0.0, $nextClaimProp->getValue($connection));
|
||||
|
||||
$msg = $connection->get();
|
||||
$this->assertSame('msg-A', $msg[0]['id']);
|
||||
$this->assertSame(0.0, $nextClaimProp->getValue($connection));
|
||||
|
||||
$connection->ack('msg-A');
|
||||
|
||||
$this->assertNull($connection->get());
|
||||
$this->assertGreaterThan(0.0, $nextClaimProp->getValue($connection));
|
||||
}
|
||||
|
||||
public function testClaimAdvancesIntervalWhenOldestPendingBelongsToOwnConsumer()
|
||||
{
|
||||
$redis = $this->createRedisMock();
|
||||
|
||||
// get #1: pending scan from '0' returns msg-A
|
||||
// get #2: pending scan from 'msg-A' → empty (cursor exhausted)
|
||||
// claim: xpending returns msg-A owned by OUR consumer → should advance nextClaim, NOT rescan
|
||||
// fallback to new messages: none
|
||||
$redis->expects($this->exactly(3))->method('xreadgroup')
|
||||
->willReturnCallback(function (...$args) {
|
||||
static $series = [
|
||||
[['symfony', 'consumer', ['queue' => '0'], 1, 1], ['queue' => ['msg-A' => ['message' => '{"body":"1","headers":[]}']]]],
|
||||
[['symfony', 'consumer', ['queue' => 'msg-A'], 1, 1], []],
|
||||
[['symfony', 'consumer', ['queue' => '>'], 1, 1], []],
|
||||
];
|
||||
|
||||
[$expectedArgs, $return] = array_shift($series);
|
||||
$this->assertSame($expectedArgs, $args);
|
||||
|
||||
return $return;
|
||||
});
|
||||
|
||||
$redis->expects($this->once())->method('xpending')
|
||||
->willReturn([[0 => 'msg-A', 1 => 'consumer', 2 => 100]]);
|
||||
|
||||
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
|
||||
|
||||
$nextClaimProp = (new \ReflectionClass(Connection::class))->getProperty('nextClaim');
|
||||
|
||||
$msg = $connection->get();
|
||||
$this->assertSame('msg-A', $msg[0]['id']);
|
||||
$this->assertSame(0.0, $nextClaimProp->getValue($connection));
|
||||
|
||||
$this->assertNull($connection->get());
|
||||
$this->assertGreaterThan(0.0, $nextClaimProp->getValue($connection));
|
||||
}
|
||||
|
||||
private function createRedisMock(): MockObject&\Redis
|
||||
{
|
||||
$redis = $this->createMock(\Redis::class);
|
||||
|
||||
@@ -445,6 +445,35 @@ class RedisExtIntegrationTest extends TestCase
|
||||
$this->assertSame(1, $this->connection->getMessageCount());
|
||||
}
|
||||
|
||||
public function testPendingMessagesAreNotReturnedAsDuplicatesBeforeAck()
|
||||
{
|
||||
$this->connection->add('{"message": "1"}', ['type' => DummyMessage::class]);
|
||||
$this->connection->add('{"message": "2"}', ['type' => DummyMessage::class]);
|
||||
$this->connection->add('{"message": "3"}', ['type' => DummyMessage::class]);
|
||||
|
||||
// Fetch all 3 without acking — simulates a batch handler collecting messages.
|
||||
// The old code would return msg-1 three times because it always read from ID '0'.
|
||||
$msg1 = $this->connection->get();
|
||||
$msg2 = $this->connection->get();
|
||||
$msg3 = $this->connection->get();
|
||||
|
||||
$this->assertNotNull($msg1);
|
||||
$this->assertNotNull($msg2);
|
||||
$this->assertNotNull($msg3);
|
||||
|
||||
$ids = [$msg1[0]['id'], $msg2[0]['id'], $msg3[0]['id']];
|
||||
$this->assertCount(3, array_unique($ids), 'Each get() must return a distinct message, not duplicates');
|
||||
|
||||
// After all pending are consumed, next get() must return null (no new messages)
|
||||
$this->assertNull($this->connection->get());
|
||||
|
||||
$this->connection->ack($msg1[0]['id']);
|
||||
$this->connection->ack($msg2[0]['id']);
|
||||
$this->connection->ack($msg3[0]['id']);
|
||||
|
||||
$this->assertNull($this->connection->get());
|
||||
}
|
||||
|
||||
private function getConnectionGroup(Connection $connection): string
|
||||
{
|
||||
$property = (new \ReflectionClass(Connection::class))->getProperty('group');
|
||||
|
||||
@@ -68,9 +68,9 @@ class Connection
|
||||
private float $claimInterval;
|
||||
private bool $deleteAfterAck;
|
||||
private bool $deleteAfterReject;
|
||||
private bool $couldHavePendingMessages = true;
|
||||
/** @var list<array{id: string, data: mixed}> */
|
||||
private array $buffer = [];
|
||||
private ?string $lastPendingMessageId = '0';
|
||||
/** @var array<string, true> */
|
||||
private array $inflightIds = [];
|
||||
|
||||
public function __construct(array $options, \Redis|Relay|\RedisCluster|null $redis = null)
|
||||
{
|
||||
@@ -387,45 +387,48 @@ class Connection
|
||||
|
||||
private function claimOldPendingMessages(): void
|
||||
{
|
||||
$redis = $this->getRedis();
|
||||
|
||||
try {
|
||||
// This could soon be optimized with https://github.com/antirez/redis/issues/5212 or
|
||||
// https://github.com/antirez/redis/issues/6256
|
||||
$pendingMessages = $this->getRedis()->xpending($this->stream, $this->group, '-', '+', 1) ?: [];
|
||||
$pendingMessages = $redis->xpending($this->stream, $this->group, '-', '+', 1) ?: [];
|
||||
} catch (\RedisException|\Relay\Exception $e) {
|
||||
throw new TransportException($e->getMessage(), 0, $e);
|
||||
}
|
||||
|
||||
$claimableIds = [];
|
||||
foreach ($pendingMessages as $pendingMessage) {
|
||||
if ($pendingMessage[1] === $this->consumer) {
|
||||
$this->couldHavePendingMessages = true;
|
||||
if (!$pendingMessages) {
|
||||
$this->nextClaim = microtime(true) + $this->claimInterval;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if ($pendingMessage[2] >= $this->redeliverTimeout) {
|
||||
$claimableIds[] = $pendingMessage[0];
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (\count($claimableIds) > 0) {
|
||||
try {
|
||||
$this->getRedis()->xclaim(
|
||||
$this->stream,
|
||||
$this->group,
|
||||
$this->consumer,
|
||||
$this->redeliverTimeout,
|
||||
$claimableIds,
|
||||
['JUSTID']
|
||||
);
|
||||
$pendingMessage = $pendingMessages[0];
|
||||
|
||||
$this->couldHavePendingMessages = true;
|
||||
} catch (\RedisException|\Relay\Exception $e) {
|
||||
throw new TransportException($e->getMessage(), 0, $e);
|
||||
}
|
||||
if ($pendingMessage[1] === $this->consumer) {
|
||||
$this->nextClaim = microtime(true) + $this->claimInterval;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
$this->nextClaim = microtime(true) + $this->claimInterval;
|
||||
if ($pendingMessage[2] < $this->redeliverTimeout) {
|
||||
$this->nextClaim = microtime(true) + $this->claimInterval;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
$redis->xclaim(
|
||||
$this->stream,
|
||||
$this->group,
|
||||
$this->consumer,
|
||||
$this->redeliverTimeout,
|
||||
[$pendingMessage[0]],
|
||||
['JUSTID']
|
||||
);
|
||||
|
||||
$this->lastPendingMessageId = '0';
|
||||
} catch (\RedisException|\Relay\Exception $e) {
|
||||
throw new TransportException($e->getMessage(), 0, $e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -439,6 +442,31 @@ class Connection
|
||||
$this->setup();
|
||||
}
|
||||
|
||||
$this->handleDelayedMessages();
|
||||
|
||||
$messages = [];
|
||||
|
||||
while (\count($messages) < $fetchSize && null !== $message = $this->getPendingMessage()) {
|
||||
$messages[] = $message;
|
||||
}
|
||||
|
||||
if (\count($messages) < $fetchSize && null === $this->lastPendingMessageId && $this->nextClaim <= microtime(true)) {
|
||||
$this->claimOldPendingMessages();
|
||||
|
||||
while (\count($messages) < $fetchSize && null !== $message = $this->getPendingMessage()) {
|
||||
$messages[] = $message;
|
||||
}
|
||||
}
|
||||
|
||||
if (\count($messages) < $fetchSize) {
|
||||
$messages = [...$messages, ...$this->getNewMessages($fetchSize - \count($messages))];
|
||||
}
|
||||
|
||||
return $messages ?: null;
|
||||
}
|
||||
|
||||
private function handleDelayedMessages(): void
|
||||
{
|
||||
$now = microtime();
|
||||
$now = substr($now, 11).substr($now, 2, 3);
|
||||
|
||||
@@ -465,80 +493,81 @@ class Connection
|
||||
$decodedQueuedMessage = json_decode($queuedMessage, true);
|
||||
$this->add(\array_key_exists('body', $decodedQueuedMessage) ? $decodedQueuedMessage['body'] : $queuedMessage, $decodedQueuedMessage['headers'] ?? [], 0);
|
||||
}
|
||||
}
|
||||
|
||||
if (!$this->couldHavePendingMessages && $this->nextClaim <= microtime(true)) {
|
||||
$this->claimOldPendingMessages();
|
||||
}
|
||||
|
||||
$messages = $this->getPendingMessages($fetchSize);
|
||||
|
||||
if (\count($messages) >= $fetchSize) {
|
||||
return $messages;
|
||||
}
|
||||
|
||||
$redis = $this->getRedis();
|
||||
|
||||
while (true) {
|
||||
if (!$this->couldHavePendingMessages && $this->nextClaim <= microtime(true)) {
|
||||
$this->claimOldPendingMessages();
|
||||
}
|
||||
|
||||
$messageId = $this->couldHavePendingMessages ? '0' : '>';
|
||||
|
||||
try {
|
||||
$streamMessages = $redis->xreadgroup(
|
||||
$this->group,
|
||||
$this->consumer,
|
||||
[$this->stream => $messageId],
|
||||
$fetchSize,
|
||||
1
|
||||
);
|
||||
} catch (\RedisException|\Relay\Exception $e) {
|
||||
throw new TransportException($e->getMessage(), 0, $e);
|
||||
}
|
||||
|
||||
if (false === $streamMessages) {
|
||||
if ($error = $redis->getLastError() ?: null) {
|
||||
$redis->clearLastError();
|
||||
}
|
||||
|
||||
throw new TransportException($error ?? 'Could not read messages from the redis stream.');
|
||||
}
|
||||
|
||||
if ($this->couldHavePendingMessages && empty($streamMessages[$this->stream])) {
|
||||
$this->couldHavePendingMessages = false;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
foreach ($streamMessages[$this->stream] ?? [] as $key => $message) {
|
||||
$this->buffer[] = [
|
||||
'id' => $key,
|
||||
'data' => $message,
|
||||
];
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
$messages = [...$messages, ...$this->getPendingMessages($fetchSize - \count($messages))];
|
||||
|
||||
if (!$messages) {
|
||||
private function getPendingMessage(): ?array
|
||||
{
|
||||
if (null === $this->lastPendingMessageId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return $messages;
|
||||
while (true) {
|
||||
$messages = $this->xReadGroup($this->lastPendingMessageId);
|
||||
|
||||
if (empty($messages[$this->stream])) {
|
||||
$this->lastPendingMessageId = null;
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/** @var string $key */
|
||||
$key = array_key_first($messages[$this->stream]);
|
||||
$this->lastPendingMessageId = $key;
|
||||
|
||||
if (isset($this->inflightIds[$key])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$this->inflightIds[$key] = true;
|
||||
|
||||
return [
|
||||
'id' => $key,
|
||||
'data' => $messages[$this->stream][$key],
|
||||
];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return list<array{id: string, data: mixed}>
|
||||
*/
|
||||
private function getPendingMessages(int $fetchSize): array
|
||||
private function getNewMessages(int $count): array
|
||||
{
|
||||
$messages = [];
|
||||
$messages = $this->xReadGroup('>', $count);
|
||||
$result = [];
|
||||
|
||||
while ($fetchSize-- > 0 && $this->buffer) {
|
||||
$messages[] = array_shift($this->buffer);
|
||||
foreach ($messages[$this->stream] ?? [] as $key => $message) {
|
||||
$this->inflightIds[$key] = true;
|
||||
$result[] = [
|
||||
'id' => $key,
|
||||
'data' => $message,
|
||||
];
|
||||
}
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
private function xReadGroup(string $messageId, int $count = 1): array
|
||||
{
|
||||
$redis = $this->getRedis();
|
||||
|
||||
try {
|
||||
$messages = $redis->xreadgroup(
|
||||
$this->group,
|
||||
$this->consumer,
|
||||
[$this->stream => $messageId],
|
||||
$count,
|
||||
1
|
||||
);
|
||||
} catch (\RedisException|\Relay\Exception $e) {
|
||||
throw new TransportException($e->getMessage(), 0, $e);
|
||||
}
|
||||
|
||||
if (!\is_array($messages)) {
|
||||
if ($error = $redis->getLastError() ?: null) {
|
||||
$redis->clearLastError();
|
||||
}
|
||||
|
||||
throw new TransportException($error ?? 'Could not read messages from the redis stream.');
|
||||
}
|
||||
|
||||
return $messages;
|
||||
@@ -563,6 +592,8 @@ class Connection
|
||||
}
|
||||
throw new TransportException($error ?? \sprintf('Could not acknowledge redis message "%s".', $id));
|
||||
}
|
||||
|
||||
unset($this->inflightIds[$id]);
|
||||
}
|
||||
|
||||
public function reject(string $id): void
|
||||
@@ -584,6 +615,8 @@ class Connection
|
||||
}
|
||||
throw new TransportException($error ?? \sprintf('Could not delete message "%s" from the redis stream.', $id));
|
||||
}
|
||||
|
||||
unset($this->inflightIds[$id]);
|
||||
}
|
||||
|
||||
public function add(string $body, array $headers, int $delayInMs = 0): string
|
||||
|
||||
Reference in New Issue
Block a user