mirror of
https://github.com/win32service/Win32ServiceBundle.git
synced 2026-04-24 01:08:01 +02:00
Compare commits
22 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f0f241edc6 | |||
| 9062ff5702 | |||
| 2927652370 | |||
| 5ba4f88608 | |||
| 82d358dc37 | |||
| 0de58e5c83 | |||
| 733080ef3b | |||
| 250f445374 | |||
| 221212eb93 | |||
| be9562e83c | |||
| 6b4086e927 | |||
| 51b359dcf8 | |||
| 84ccbda344 | |||
| 9c940bccaa | |||
| 47b5590267 | |||
| 551a35255e | |||
| 6599c306ae | |||
| 5863dc577c | |||
| 114c232a25 | |||
| ccf379dce7 | |||
| 26ab698a72 | |||
| 764337d0a8 |
@@ -48,6 +48,9 @@
|
||||
"doctrine/orm": "^2.19",
|
||||
"phpunit/phpunit": "^9.6"
|
||||
},
|
||||
"conflict": {
|
||||
"win32service/service-library": "<1.0.2"
|
||||
},
|
||||
"suggest": {
|
||||
"ext-win32service": "On Windows only, install this extension to run PHP Service on Windows Service Manager"
|
||||
},
|
||||
|
||||
@@ -8,6 +8,7 @@ use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
|
||||
use Symfony\Component\DependencyInjection\ContainerBuilder;
|
||||
use Symfony\Component\DependencyInjection\Exception\OutOfBoundsException;
|
||||
use Symfony\Component\DependencyInjection\Reference;
|
||||
use Win32ServiceBundle\MessengerSubscriber\ResetServicesListener;
|
||||
use Win32ServiceBundle\MessengerSubscriber\SendFailedMessageForRetryListener;
|
||||
use Win32ServiceBundle\MessengerSubscriber\SendFailedMessageToFailureTransportListener;
|
||||
|
||||
@@ -51,6 +52,7 @@ final class MessengerPass implements CompilerPassInterface
|
||||
$serviceRunnerDefinition = $container->getDefinition($win32ServiceId);
|
||||
|
||||
$serviceRunnerDefinition->replaceArgument(1, new Reference('messenger.routable_message_bus'));
|
||||
$serviceRunnerDefinition->replaceArgument(7, new Reference(ResetServicesListener::class));
|
||||
|
||||
$serviceRunnerDefinition->replaceArgument(6, array_values($receiverNames));
|
||||
try {
|
||||
|
||||
@@ -6,8 +6,8 @@ namespace Win32ServiceBundle\MessengerSubscriber;
|
||||
|
||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
|
||||
use Symfony\Component\Messenger\Event\MessengerWorkerStoppedEvent;
|
||||
use Win32ServiceBundle\Event\MessengerWorkerRunningEvent;
|
||||
use Win32ServiceBundle\Event\MessengerWorkerStoppedEvent;
|
||||
|
||||
/**
|
||||
* @author Grégoire Pineau <lyrixx@lyrixx.info>
|
||||
|
||||
@@ -10,7 +10,6 @@ use Symfony\Component\Console\Exception\RuntimeException;
|
||||
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
||||
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
|
||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
@@ -21,12 +20,14 @@ use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
|
||||
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
|
||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||
use Win32Service\Exception\RecoveryActionException;
|
||||
use Win32Service\Model\AbstractServiceRunner;
|
||||
use Win32ServiceBundle\Event\MessengerWorkerMessageFailedEvent;
|
||||
use Win32ServiceBundle\Event\MessengerWorkerMessageHandledEvent;
|
||||
use Win32ServiceBundle\Event\MessengerWorkerRunningEvent;
|
||||
use Win32ServiceBundle\Event\MessengerWorkerStartedEvent;
|
||||
use Win32ServiceBundle\Event\MessengerWorkerStoppedEvent;
|
||||
use Win32ServiceBundle\MessengerSubscriber\ResetServicesListener;
|
||||
use Win32ServiceBundle\MessengerSubscriber\StopWorkerOnFailureLimitListener;
|
||||
use Win32ServiceBundle\MessengerSubscriber\StopWorkerOnMemoryLimitListener;
|
||||
use Win32ServiceBundle\MessengerSubscriber\StopWorkerOnMessageLimitListener;
|
||||
@@ -106,6 +107,8 @@ final class MessengerServiceRunner extends AbstractServiceRunner
|
||||
|
||||
$this->shouldStop = true;
|
||||
$this->requestStop();
|
||||
|
||||
throw new RecoveryActionException('Restart requested');
|
||||
}
|
||||
|
||||
protected function beforeContinue(): void
|
||||
|
||||
@@ -13,6 +13,8 @@ services:
|
||||
- '%win32service.config%'
|
||||
- '%kernel.environment%'
|
||||
|
||||
Win32ServiceBundle\MessengerSubscriber\AddErrorDetailsStampListener: ~
|
||||
|
||||
|
||||
Win32ServiceBundle\MessengerSubscriber\SendFailedMessageForRetryListener:
|
||||
arguments:
|
||||
|
||||
@@ -8,14 +8,20 @@ framework:
|
||||
|
||||
transports:
|
||||
# https://symfony.com/doc/current/messenger.html#transport-configuration
|
||||
async: '%env(MESSENGER_TRANSPORT_DSN)%'
|
||||
async:
|
||||
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
|
||||
retry_strategy:
|
||||
max_retries: 1
|
||||
delay: 1000
|
||||
|
||||
failed: 'doctrine://default?queue_name=failed'
|
||||
# sync: 'sync://'
|
||||
|
||||
routing:
|
||||
# Route your messages to the transports
|
||||
# 'App\Message\YourMessage': async
|
||||
'Win32ServiceBundle\Tests\Application\Event\TestFailedMessage': async
|
||||
'Win32ServiceBundle\Tests\Application\Event\TestMemoryLimitMessage': async
|
||||
'Win32ServiceBundle\Tests\Application\Event\TestMessage': async
|
||||
'Win32ServiceBundle\Tests\Application\Event\TestRetryMessage': async
|
||||
'Win32ServiceBundle\Tests\Application\Event\TestMemoryLimitMessage': async
|
||||
'Win32ServiceBundle\Tests\Application\Event\TestTimeLimitMessage': async
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Tests\Application\Event;
|
||||
|
||||
final class TestFailedMessage
|
||||
{
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Tests\Application\Handler;
|
||||
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
|
||||
use Win32ServiceBundle\Tests\Application\Event\TestFailedMessage;
|
||||
|
||||
#[AsMessageHandler(fromTransport: 'async')]
|
||||
final class FailMessageHandler
|
||||
{
|
||||
public function __construct(private LoggerInterface $logger)
|
||||
{
|
||||
}
|
||||
|
||||
public function __invoke(TestFailedMessage $message): void
|
||||
{
|
||||
$this->logger->info('Failed Message');
|
||||
throw new \LogicException('Fail to process');
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,86 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Tests\Unit\MessengerIntegration;
|
||||
|
||||
require_once \dirname(__DIR__, 2).'/Win32serviceState.php';
|
||||
|
||||
use Doctrine\DBAL\Driver\Connection;
|
||||
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Win32Service\Model\AbstractServiceRunner;
|
||||
use Win32Service\Model\ServiceIdentifier;
|
||||
use Win32Service\Model\Win32serviceState;
|
||||
use Win32ServiceBundle\Model\MessengerServiceRunner;
|
||||
use Win32ServiceBundle\Service\RunnerManager;
|
||||
use Win32ServiceBundle\Service\ServiceConfigurationManager;
|
||||
use Win32ServiceBundle\Tests\Application\Event\TestFailedMessage;
|
||||
|
||||
final class FaillureRetryMessageTest extends KernelTestCase
|
||||
{
|
||||
protected function setUp(): void
|
||||
{
|
||||
Win32serviceState::reset();
|
||||
}
|
||||
|
||||
protected function tearDown(): void
|
||||
{
|
||||
$container = static::getContainer();
|
||||
|
||||
/** @var Connection $connexion */
|
||||
$connexion = $container->get('doctrine.dbal.default_connection');
|
||||
$connexion->rollBack();
|
||||
}
|
||||
|
||||
public function testFailureMessage(): void
|
||||
{
|
||||
$serviceName = 'win32service.demo.messenger.async.0';
|
||||
self::bootKernel();
|
||||
$container = static::getContainer();
|
||||
|
||||
/** @var Connection $connexion */
|
||||
$connexion = $container->get('doctrine.dbal.default_connection');
|
||||
$connexion->beginTransaction();
|
||||
$connexion->query('DELETE FROM messenger_messages');
|
||||
/** @var MessageBusInterface $messengerBus */
|
||||
$messengerBus = $container->get('messenger.bus.default');
|
||||
$messengerBus->dispatch(new TestFailedMessage());
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
|
||||
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
|
||||
$runnerManager = $container->get(RunnerManager::class);
|
||||
$serviceConfigurationManager = $container->get(ServiceConfigurationManager::class);
|
||||
/** @var MessengerServiceRunner $runner */
|
||||
$runner = $runnerManager->getRunner($serviceConfigurationManager->getRunnerAliasForServiceId($serviceName));
|
||||
$runner->setServiceId(new ServiceIdentifier($serviceName));
|
||||
$runner->doRun(1, 0);
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NULL');
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NOT NULL');
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
|
||||
$msrRefrection = new \ReflectionClass(AbstractServiceRunner::class);
|
||||
$stopRequestedProperty = $msrRefrection->getProperty('stopRequested');
|
||||
$stopRequestedProperty->setAccessible(true);
|
||||
|
||||
Win32serviceState::reset();
|
||||
$stopRequestedProperty->setValue($runner, false);
|
||||
|
||||
usleep(1_500_000);
|
||||
|
||||
$runner->doRun(1, 0);
|
||||
$connexion->commit();
|
||||
$connexion->beginTransaction();
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NOT NULL');
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'failed\' AND delivered_at IS NULL');
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user