22 Commits

Author SHA1 Message Date
macintoshplus f0f241edc6 use RecoveryActionException to request Mesenger consumer restart 2024-11-29 18:02:37 +01:00
Jean-Baptiste Nahan 9062ff5702 Merge pull request #22 from jbcr/issue_17_failed_message_never_retried_suite
[issue #17] fix retry and faillure queue
2024-10-23 15:23:48 +02:00
jb cr 2927652370 fix failled test 2024-10-23 15:20:50 +02:00
jb cr 5ba4f88608 work in progresse for test faillure queue 2024-10-23 15:12:01 +02:00
jb cr 82d358dc37 update cache action version and cache key 2024-10-23 15:11:52 +02:00
jb cr 0de58e5c83 add tests 2024-10-23 15:11:52 +02:00
jb cr 733080ef3b revert mock namespace 2024-10-23 15:11:52 +02:00
jb cr 250f445374 fix mock 2024-10-23 15:11:51 +02:00
jb cr 221212eb93 add symfony/doctrine-messenger 2024-10-23 15:11:51 +02:00
jb cr be9562e83c change db name and APP_ENV value 2024-10-23 15:11:51 +02:00
jb cr 6b4086e927 try fix mariadb port 2024-10-23 15:11:51 +02:00
jb cr 51b359dcf8 fix env config 2024-10-23 15:11:50 +02:00
jb cr 84ccbda344 use mariadb 2024-10-23 15:11:50 +02:00
jb cr 9c940bccaa fix database path 2024-10-23 15:11:50 +02:00
jb cr 47b5590267 fix console path 2024-10-23 15:11:50 +02:00
jb cr 551a35255e set PHP version as string instead of float 2024-10-23 15:11:50 +02:00
jb cr 6599c306ae fix matrix config 2024-10-23 15:11:50 +02:00
jb cr 5863dc577c move workflow config 2024-10-23 15:11:49 +02:00
jb cr 114c232a25 add branch for run action 2024-10-23 15:11:49 +02:00
jb cr ccf379dce7 issue #17 add code to retry and PHPUnit for tests 2024-10-23 15:11:49 +02:00
Jean-Baptiste Nahan 26ab698a72 Merge pull request #21 from win32service/revert-20-issue_17_failed_message_never_retried
Revert "issue #17 add code to retry and PHPUnit for tests"
2024-10-23 14:51:46 +02:00
Jean-Baptiste Nahan 764337d0a8 Revert "issue #17 add code to retry and PHPUnit for tests" 2024-10-23 14:51:15 +02:00
9 changed files with 138 additions and 4 deletions
+3
View File
@@ -48,6 +48,9 @@
"doctrine/orm": "^2.19",
"phpunit/phpunit": "^9.6"
},
"conflict": {
"win32service/service-library": "<1.0.2"
},
"suggest": {
"ext-win32service": "On Windows only, install this extension to run PHP Service on Windows Service Manager"
},
@@ -8,6 +8,7 @@ use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Exception\OutOfBoundsException;
use Symfony\Component\DependencyInjection\Reference;
use Win32ServiceBundle\MessengerSubscriber\ResetServicesListener;
use Win32ServiceBundle\MessengerSubscriber\SendFailedMessageForRetryListener;
use Win32ServiceBundle\MessengerSubscriber\SendFailedMessageToFailureTransportListener;
@@ -51,6 +52,7 @@ final class MessengerPass implements CompilerPassInterface
$serviceRunnerDefinition = $container->getDefinition($win32ServiceId);
$serviceRunnerDefinition->replaceArgument(1, new Reference('messenger.routable_message_bus'));
$serviceRunnerDefinition->replaceArgument(7, new Reference(ResetServicesListener::class));
$serviceRunnerDefinition->replaceArgument(6, array_values($receiverNames));
try {
@@ -6,8 +6,8 @@ namespace Win32ServiceBundle\MessengerSubscriber;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
use Symfony\Component\Messenger\Event\MessengerWorkerStoppedEvent;
use Win32ServiceBundle\Event\MessengerWorkerRunningEvent;
use Win32ServiceBundle\Event\MessengerWorkerStoppedEvent;
/**
* @author Grégoire Pineau <lyrixx@lyrixx.info>
+4 -1
View File
@@ -10,7 +10,6 @@ use Symfony\Component\Console\Exception\RuntimeException;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\MessageBusInterface;
@@ -21,12 +20,14 @@ use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Win32Service\Exception\RecoveryActionException;
use Win32Service\Model\AbstractServiceRunner;
use Win32ServiceBundle\Event\MessengerWorkerMessageFailedEvent;
use Win32ServiceBundle\Event\MessengerWorkerMessageHandledEvent;
use Win32ServiceBundle\Event\MessengerWorkerRunningEvent;
use Win32ServiceBundle\Event\MessengerWorkerStartedEvent;
use Win32ServiceBundle\Event\MessengerWorkerStoppedEvent;
use Win32ServiceBundle\MessengerSubscriber\ResetServicesListener;
use Win32ServiceBundle\MessengerSubscriber\StopWorkerOnFailureLimitListener;
use Win32ServiceBundle\MessengerSubscriber\StopWorkerOnMemoryLimitListener;
use Win32ServiceBundle\MessengerSubscriber\StopWorkerOnMessageLimitListener;
@@ -106,6 +107,8 @@ final class MessengerServiceRunner extends AbstractServiceRunner
$this->shouldStop = true;
$this->requestStop();
throw new RecoveryActionException('Restart requested');
}
protected function beforeContinue(): void
+2
View File
@@ -13,6 +13,8 @@ services:
- '%win32service.config%'
- '%kernel.environment%'
Win32ServiceBundle\MessengerSubscriber\AddErrorDetailsStampListener: ~
Win32ServiceBundle\MessengerSubscriber\SendFailedMessageForRetryListener:
arguments:
@@ -8,14 +8,20 @@ framework:
transports:
# https://symfony.com/doc/current/messenger.html#transport-configuration
async: '%env(MESSENGER_TRANSPORT_DSN)%'
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
retry_strategy:
max_retries: 1
delay: 1000
failed: 'doctrine://default?queue_name=failed'
# sync: 'sync://'
routing:
# Route your messages to the transports
# 'App\Message\YourMessage': async
'Win32ServiceBundle\Tests\Application\Event\TestFailedMessage': async
'Win32ServiceBundle\Tests\Application\Event\TestMemoryLimitMessage': async
'Win32ServiceBundle\Tests\Application\Event\TestMessage': async
'Win32ServiceBundle\Tests\Application\Event\TestRetryMessage': async
'Win32ServiceBundle\Tests\Application\Event\TestMemoryLimitMessage': async
'Win32ServiceBundle\Tests\Application\Event\TestTimeLimitMessage': async
@@ -0,0 +1,9 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Application\Event;
final class TestFailedMessage
{
}
@@ -0,0 +1,23 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Application\Handler;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Win32ServiceBundle\Tests\Application\Event\TestFailedMessage;
#[AsMessageHandler(fromTransport: 'async')]
final class FailMessageHandler
{
public function __construct(private LoggerInterface $logger)
{
}
public function __invoke(TestFailedMessage $message): void
{
$this->logger->info('Failed Message');
throw new \LogicException('Fail to process');
}
}
@@ -0,0 +1,86 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Unit\MessengerIntegration;
require_once \dirname(__DIR__, 2).'/Win32serviceState.php';
use Doctrine\DBAL\Driver\Connection;
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Symfony\Component\Messenger\MessageBusInterface;
use Win32Service\Model\AbstractServiceRunner;
use Win32Service\Model\ServiceIdentifier;
use Win32Service\Model\Win32serviceState;
use Win32ServiceBundle\Model\MessengerServiceRunner;
use Win32ServiceBundle\Service\RunnerManager;
use Win32ServiceBundle\Service\ServiceConfigurationManager;
use Win32ServiceBundle\Tests\Application\Event\TestFailedMessage;
final class FaillureRetryMessageTest extends KernelTestCase
{
protected function setUp(): void
{
Win32serviceState::reset();
}
protected function tearDown(): void
{
$container = static::getContainer();
/** @var Connection $connexion */
$connexion = $container->get('doctrine.dbal.default_connection');
$connexion->rollBack();
}
public function testFailureMessage(): void
{
$serviceName = 'win32service.demo.messenger.async.0';
self::bootKernel();
$container = static::getContainer();
/** @var Connection $connexion */
$connexion = $container->get('doctrine.dbal.default_connection');
$connexion->beginTransaction();
$connexion->query('DELETE FROM messenger_messages');
/** @var MessageBusInterface $messengerBus */
$messengerBus = $container->get('messenger.bus.default');
$messengerBus->dispatch(new TestFailedMessage());
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
$this->assertSame(1, (int) $c->fetchOne());
$runnerManager = $container->get(RunnerManager::class);
$serviceConfigurationManager = $container->get(ServiceConfigurationManager::class);
/** @var MessengerServiceRunner $runner */
$runner = $runnerManager->getRunner($serviceConfigurationManager->getRunnerAliasForServiceId($serviceName));
$runner->setServiceId(new ServiceIdentifier($serviceName));
$runner->doRun(1, 0);
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NULL');
$this->assertSame(1, (int) $c->fetchOne());
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NOT NULL');
$this->assertSame(1, (int) $c->fetchOne());
$msrRefrection = new \ReflectionClass(AbstractServiceRunner::class);
$stopRequestedProperty = $msrRefrection->getProperty('stopRequested');
$stopRequestedProperty->setAccessible(true);
Win32serviceState::reset();
$stopRequestedProperty->setValue($runner, false);
usleep(1_500_000);
$runner->doRun(1, 0);
$connexion->commit();
$connexion->beginTransaction();
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NOT NULL');
$this->assertSame(1, (int) $c->fetchOne());
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'failed\' AND delivered_at IS NULL');
$this->assertSame(1, (int) $c->fetchOne());
}
}