mirror of
https://github.com/win32service/Win32ServiceBundle.git
synced 2026-03-24 09:12:17 +01:00
Compare commits
43 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f0f241edc6 | ||
|
|
9062ff5702 | ||
|
|
2927652370 | ||
|
|
5ba4f88608 | ||
|
|
82d358dc37 | ||
|
|
0de58e5c83 | ||
|
|
733080ef3b | ||
|
|
250f445374 | ||
|
|
221212eb93 | ||
|
|
be9562e83c | ||
|
|
6b4086e927 | ||
|
|
51b359dcf8 | ||
|
|
84ccbda344 | ||
|
|
9c940bccaa | ||
|
|
47b5590267 | ||
|
|
551a35255e | ||
|
|
6599c306ae | ||
|
|
5863dc577c | ||
|
|
114c232a25 | ||
|
|
ccf379dce7 | ||
|
|
26ab698a72 | ||
|
|
764337d0a8 | ||
|
|
fbfedf4205 | ||
|
|
896da203d1 | ||
|
|
29480592cb | ||
|
|
26dbef58d3 | ||
|
|
b78326a847 | ||
|
|
5b279891af | ||
|
|
0c0a7cc756 | ||
|
|
2cd6704473 | ||
|
|
62e6137287 | ||
|
|
55fd2033ad | ||
|
|
08c0b73107 | ||
|
|
ff661eea1b | ||
|
|
bcee3e0562 | ||
|
|
2025d94208 | ||
|
|
8a9f74a606 | ||
|
|
68b542430f | ||
|
|
6644c3410b | ||
|
|
d08e424b1b | ||
|
|
3091b0beea | ||
|
|
b3bf4ff439 | ||
|
|
81052cea72 |
68
.github/workflows/quality.yaml
vendored
Normal file
68
.github/workflows/quality.yaml
vendored
Normal file
@@ -0,0 +1,68 @@
|
||||
# This workflow uses actions that are not certified by GitHub.
|
||||
# They are provided by a third-party and are governed by
|
||||
# separate terms of service, privacy policy, and support
|
||||
# documentation.
|
||||
|
||||
name: Quality
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ "2.x", "issue_17_failed_message_never_retried" ]
|
||||
pull_request:
|
||||
branches: [ "2.x" ]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
jobs:
|
||||
symfony-tests:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
php: ['8.0', '8.1', '8.2', '8.3']
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
services:
|
||||
mariadb:
|
||||
image: mariadb:10.11
|
||||
ports:
|
||||
- 3306:3306
|
||||
env:
|
||||
MYSQL_USER: user
|
||||
MYSQL_PASSWORD: nopassword
|
||||
MYSQL_DATABASE: app_test
|
||||
MYSQL_ROOT_PASSWORD: nopassword
|
||||
options: --health-cmd="mysqladmin ping" --health-interval=5s --health-timeout=2s --health-retries=3
|
||||
|
||||
|
||||
env:
|
||||
DATABASE_URL: mysql://root:nopassword@127.0.0.1:3306/app?serverVersion=mariadb-10.11.2&charset=utf8mb4
|
||||
APP_ENV: test
|
||||
steps:
|
||||
# To automatically get bug fixes and new Php versions for shivammathur/setup-php,
|
||||
# change this to (see https://github.com/shivammathur/setup-php#bookmark-versioning):
|
||||
# uses: shivammathur/setup-php@v2
|
||||
- uses: shivammathur/setup-php@v2
|
||||
with:
|
||||
php-version: ${{ matrix.php }}
|
||||
coverage: xdebug
|
||||
|
||||
- uses: actions/checkout@v4
|
||||
# - name: Copy .env.test.local
|
||||
# run: php -r "file_exists('.env.test.local') || copy('.env.test', '.env.test.local');"
|
||||
- name: Cache Composer packages
|
||||
id: composer-cache
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: vendor
|
||||
key: ${{ runner.os }}-php${{ matrix.php }}-${{ hashFiles('**/composer.lock') }}
|
||||
restore-keys: |
|
||||
${{ runner.os }}-php${{ matrix.php }}-
|
||||
- name: Install Dependencies
|
||||
run: composer install -q --no-ansi --no-interaction --no-scripts --no-progress --prefer-dist
|
||||
- name: Create Database
|
||||
run: |
|
||||
cd tests/Application
|
||||
bin/console doctrine:migration:migrate -n
|
||||
- name: Execute tests (Unit and Feature tests) via PHPUnit
|
||||
run: vendor/bin/phpunit --process-isolation
|
||||
@@ -41,10 +41,15 @@
|
||||
"symfony/dotenv": "^5.4|^6.0",
|
||||
"symfony/runtime": "^5.4|^6.0",
|
||||
"symfony/messenger": "^5.4|^6.0",
|
||||
"symfony/doctrine-messenger": "^5.4|^6.0",
|
||||
"symfony/monolog-bundle": "^3.10",
|
||||
"doctrine/doctrine-bundle": "^2.12",
|
||||
"doctrine/doctrine-migrations-bundle": "^3.3",
|
||||
"doctrine/orm": "^2.19"
|
||||
"doctrine/orm": "^2.19",
|
||||
"phpunit/phpunit": "^9.6"
|
||||
},
|
||||
"conflict": {
|
||||
"win32service/service-library": "<1.0.2"
|
||||
},
|
||||
"suggest": {
|
||||
"ext-win32service": "On Windows only, install this extension to run PHP Service on Windows Service Manager"
|
||||
|
||||
@@ -113,9 +113,9 @@ class Configuration implements ConfigurationInterface
|
||||
->integerNode('thread_count')->defaultValue(1)->min(1)->end()
|
||||
->booleanNode('delayed_start')->defaultFalse()->end()
|
||||
->integerNode('limit')->defaultValue(0)->min(0)->end()
|
||||
->integerNode('failure-limit')->defaultValue(0)->min(0)->end()
|
||||
->integerNode('time-limit')->defaultValue(0)->min(0)->end()
|
||||
->scalarNode('memory-limit')->defaultValue('')->end()
|
||||
->integerNode('failure_limit')->defaultValue(0)->min(0)->end()
|
||||
->integerNode('time_limit')->defaultValue(0)->min(0)->end()
|
||||
->scalarNode('memory_limit')->defaultValue('')->end()
|
||||
->end()
|
||||
->end()
|
||||
->end()
|
||||
|
||||
@@ -8,6 +8,9 @@ use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
|
||||
use Symfony\Component\DependencyInjection\ContainerBuilder;
|
||||
use Symfony\Component\DependencyInjection\Exception\OutOfBoundsException;
|
||||
use Symfony\Component\DependencyInjection\Reference;
|
||||
use Win32ServiceBundle\MessengerSubscriber\ResetServicesListener;
|
||||
use Win32ServiceBundle\MessengerSubscriber\SendFailedMessageForRetryListener;
|
||||
use Win32ServiceBundle\MessengerSubscriber\SendFailedMessageToFailureTransportListener;
|
||||
|
||||
final class MessengerPass implements CompilerPassInterface
|
||||
{
|
||||
@@ -16,6 +19,13 @@ final class MessengerPass implements CompilerPassInterface
|
||||
private string $win32ServiceRunnerTag = TagRunnerCompilerPass::WIN32SERVICE_RUNNER_TAG.'.messenger';
|
||||
|
||||
public function process(ContainerBuilder $container): void
|
||||
{
|
||||
$this->processService($container);
|
||||
$this->processRetryConfig($container);
|
||||
$this->processFailledConfig($container);
|
||||
}
|
||||
|
||||
private function processService(ContainerBuilder $container): void
|
||||
{
|
||||
$busIds = [];
|
||||
foreach ($container->findTaggedServiceIds($this->busTag) as $busId => $tags) {
|
||||
@@ -42,6 +52,7 @@ final class MessengerPass implements CompilerPassInterface
|
||||
$serviceRunnerDefinition = $container->getDefinition($win32ServiceId);
|
||||
|
||||
$serviceRunnerDefinition->replaceArgument(1, new Reference('messenger.routable_message_bus'));
|
||||
$serviceRunnerDefinition->replaceArgument(7, new Reference(ResetServicesListener::class));
|
||||
|
||||
$serviceRunnerDefinition->replaceArgument(6, array_values($receiverNames));
|
||||
try {
|
||||
@@ -51,4 +62,34 @@ final class MessengerPass implements CompilerPassInterface
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private function processFailledConfig(ContainerBuilder $container): void
|
||||
{
|
||||
if (
|
||||
$container->hasDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') === false
|
||||
|| $container->hasDefinition(SendFailedMessageToFailureTransportListener::class) === false
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
$serviceSF = $container->findDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
|
||||
|
||||
$serviceWin32 = $container->findDefinition(SendFailedMessageToFailureTransportListener::class);
|
||||
$serviceWin32->replaceArgument('$failureSenders', $serviceSF->getArgument(0));
|
||||
}
|
||||
|
||||
private function processRetryConfig(ContainerBuilder $container): void
|
||||
{
|
||||
if (
|
||||
$container->hasDefinition('messenger.retry.send_failed_message_for_retry_listener') === false
|
||||
|| $container->hasDefinition(SendFailedMessageForRetryListener::class) === false
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
$serviceSF = $container->findDefinition('messenger.retry.send_failed_message_for_retry_listener');
|
||||
|
||||
$serviceWin32 = $container->findDefinition(SendFailedMessageForRetryListener::class);
|
||||
$serviceWin32->replaceArgument('$sendersLocator', $serviceSF->getArgument(0));
|
||||
}
|
||||
}
|
||||
|
||||
22
lib/Event/MessengerWorkerStoppedEvent.php
Normal file
22
lib/Event/MessengerWorkerStoppedEvent.php
Normal file
@@ -0,0 +1,22 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Event;
|
||||
|
||||
use Win32ServiceBundle\Model\MessengerServiceRunner;
|
||||
|
||||
final class MessengerWorkerStoppedEvent
|
||||
{
|
||||
private MessengerServiceRunner $messengerServiceRunner;
|
||||
|
||||
public function __construct(MessengerServiceRunner $messengerServiceRunner)
|
||||
{
|
||||
$this->messengerServiceRunner = $messengerServiceRunner;
|
||||
}
|
||||
|
||||
public function getMessengerServiceRunner(): MessengerServiceRunner
|
||||
{
|
||||
return $this->messengerServiceRunner;
|
||||
}
|
||||
}
|
||||
31
lib/MessengerSubscriber/AddErrorDetailsStampListener.php
Normal file
31
lib/MessengerSubscriber/AddErrorDetailsStampListener.php
Normal file
@@ -0,0 +1,31 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\MessengerSubscriber;
|
||||
|
||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||
use Symfony\Component\Messenger\Stamp\ErrorDetailsStamp;
|
||||
use Win32ServiceBundle\Event\MessengerWorkerMessageFailedEvent;
|
||||
|
||||
final class AddErrorDetailsStampListener implements EventSubscriberInterface
|
||||
{
|
||||
public function onMessageFailed(MessengerWorkerMessageFailedEvent $event): void
|
||||
{
|
||||
$stamp = ErrorDetailsStamp::create($event->getThrowable());
|
||||
$previousStamp = $event->getEnvelope()->last(ErrorDetailsStamp::class);
|
||||
|
||||
// Do not append duplicate information
|
||||
if ($previousStamp === null || !$previousStamp->equals($stamp)) {
|
||||
$event->addStamps($stamp);
|
||||
}
|
||||
}
|
||||
|
||||
public static function getSubscribedEvents(): array
|
||||
{
|
||||
return [
|
||||
// must have higher priority than SendFailedMessageForRetryListener
|
||||
MessengerWorkerMessageFailedEvent::class => ['onMessageFailed', 200],
|
||||
];
|
||||
}
|
||||
}
|
||||
43
lib/MessengerSubscriber/ResetServicesListener.php
Normal file
43
lib/MessengerSubscriber/ResetServicesListener.php
Normal file
@@ -0,0 +1,43 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\MessengerSubscriber;
|
||||
|
||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
|
||||
use Win32ServiceBundle\Event\MessengerWorkerRunningEvent;
|
||||
use Win32ServiceBundle\Event\MessengerWorkerStoppedEvent;
|
||||
|
||||
/**
|
||||
* @author Grégoire Pineau <lyrixx@lyrixx.info>
|
||||
*/
|
||||
class ResetServicesListener implements EventSubscriberInterface
|
||||
{
|
||||
private ServicesResetter $servicesResetter;
|
||||
|
||||
public function __construct(ServicesResetter $servicesResetter)
|
||||
{
|
||||
$this->servicesResetter = $servicesResetter;
|
||||
}
|
||||
|
||||
public function resetServices(MessengerWorkerRunningEvent $event): void
|
||||
{
|
||||
if (!$event->isWorkerIdle()) {
|
||||
$this->servicesResetter->reset();
|
||||
}
|
||||
}
|
||||
|
||||
public function resetServicesAtStop(MessengerWorkerStoppedEvent $event): void
|
||||
{
|
||||
$this->servicesResetter->reset();
|
||||
}
|
||||
|
||||
public static function getSubscribedEvents(): array
|
||||
{
|
||||
return [
|
||||
MessengerWorkerRunningEvent::class => ['resetServices', -1024],
|
||||
MessengerWorkerStoppedEvent::class => ['resetServicesAtStop', -1024],
|
||||
];
|
||||
}
|
||||
}
|
||||
163
lib/MessengerSubscriber/SendFailedMessageForRetryListener.php
Normal file
163
lib/MessengerSubscriber/SendFailedMessageForRetryListener.php
Normal file
@@ -0,0 +1,163 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\MessengerSubscriber;
|
||||
|
||||
use Psr\Container\ContainerInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageRetriedEvent;
|
||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||
use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface;
|
||||
use Symfony\Component\Messenger\Exception\RuntimeException;
|
||||
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
|
||||
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||
use Symfony\Component\Messenger\Stamp\StampInterface;
|
||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
||||
use Win32ServiceBundle\Event\MessengerWorkerMessageFailedEvent;
|
||||
|
||||
class SendFailedMessageForRetryListener implements EventSubscriberInterface
|
||||
{
|
||||
private ContainerInterface $sendersLocator;
|
||||
private ContainerInterface $retryStrategyLocator;
|
||||
private ?LoggerInterface $logger;
|
||||
private ?EventDispatcherInterface $eventDispatcher;
|
||||
private int $historySize;
|
||||
|
||||
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, ?LoggerInterface $logger = null, ?EventDispatcherInterface $eventDispatcher = null, int $historySize = 10)
|
||||
{
|
||||
$this->sendersLocator = $sendersLocator;
|
||||
$this->retryStrategyLocator = $retryStrategyLocator;
|
||||
$this->logger = $logger;
|
||||
$this->eventDispatcher = $eventDispatcher;
|
||||
$this->historySize = $historySize;
|
||||
}
|
||||
|
||||
public function onMessageFailed(MessengerWorkerMessageFailedEvent $event): void
|
||||
{
|
||||
$retryStrategy = $this->getRetryStrategyForTransport($event->getReceiverName());
|
||||
$envelope = $event->getEnvelope();
|
||||
$throwable = $event->getThrowable();
|
||||
|
||||
$message = $envelope->getMessage();
|
||||
$context = [
|
||||
'class' => $message::class,
|
||||
];
|
||||
|
||||
$shouldRetry = $retryStrategy && $this->shouldRetry($throwable, $envelope, $retryStrategy);
|
||||
|
||||
$retryCount = RedeliveryStamp::getRetryCountFromEnvelope($envelope);
|
||||
if ($shouldRetry) {
|
||||
$event->setForRetry();
|
||||
|
||||
++$retryCount;
|
||||
|
||||
$delay = $retryStrategy->getWaitingTime($envelope, $throwable);
|
||||
|
||||
if ($this->logger !== null) {
|
||||
$this->logger->warning('Error thrown while handling message {class}. Sending for retry #{retryCount} using {delay} ms delay. Error: "{error}"', $context + ['retryCount' => $retryCount, 'delay' => $delay, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
|
||||
}
|
||||
|
||||
// add the delay and retry stamp info
|
||||
$retryEnvelope = $this->withLimitedHistory($envelope, new DelayStamp($delay), new RedeliveryStamp($retryCount));
|
||||
|
||||
// re-send the message for retry
|
||||
$this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
|
||||
|
||||
if ($this->eventDispatcher !== null) {
|
||||
$this->eventDispatcher->dispatch(new WorkerMessageRetriedEvent($retryEnvelope, $event->getReceiverName()));
|
||||
}
|
||||
} else {
|
||||
if ($this->logger !== null) {
|
||||
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static function getSubscribedEvents(): array
|
||||
{
|
||||
return [
|
||||
// must have higher priority than SendFailedMessageToFailureTransportListener
|
||||
MessengerWorkerMessageFailedEvent::class => ['onMessageFailed', 100],
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds stamps to the envelope by keeping only the First + Last N stamps.
|
||||
*/
|
||||
private function withLimitedHistory(Envelope $envelope, StampInterface ...$stamps): Envelope
|
||||
{
|
||||
foreach ($stamps as $stamp) {
|
||||
$history = $envelope->all($stamp::class);
|
||||
if (\count($history) < $this->historySize) {
|
||||
$envelope = $envelope->with($stamp);
|
||||
continue;
|
||||
}
|
||||
|
||||
$history = array_merge(
|
||||
[$history[0]],
|
||||
\array_slice($history, -$this->historySize + 2),
|
||||
[$stamp]
|
||||
);
|
||||
|
||||
$envelope = $envelope->withoutAll($stamp::class)->with(...$history);
|
||||
}
|
||||
|
||||
return $envelope;
|
||||
}
|
||||
|
||||
private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInterface $retryStrategy): bool
|
||||
{
|
||||
if ($e instanceof RecoverableExceptionInterface) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// if one or more nested Exceptions is an instance of RecoverableExceptionInterface we should retry
|
||||
// if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry
|
||||
if ($e instanceof HandlerFailedException) {
|
||||
$shouldNotRetry = true;
|
||||
foreach ($e->getNestedExceptions() as $nestedException) {
|
||||
if ($nestedException instanceof RecoverableExceptionInterface) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!$nestedException instanceof UnrecoverableExceptionInterface) {
|
||||
$shouldNotRetry = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if ($shouldNotRetry) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if ($e instanceof UnrecoverableExceptionInterface) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return $retryStrategy->isRetryable($envelope, $e);
|
||||
}
|
||||
|
||||
private function getRetryStrategyForTransport(string $alias): ?RetryStrategyInterface
|
||||
{
|
||||
if ($this->retryStrategyLocator->has($alias)) {
|
||||
return $this->retryStrategyLocator->get($alias);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private function getSenderForTransport(string $alias): SenderInterface
|
||||
{
|
||||
if ($this->sendersLocator->has($alias)) {
|
||||
return $this->sendersLocator->get($alias);
|
||||
}
|
||||
|
||||
throw new RuntimeException(\sprintf('Could not find sender "%s" based on the same receiver to send the failed message to for retry.', $alias));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,70 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\MessengerSubscriber;
|
||||
|
||||
use Psr\Container\ContainerInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
|
||||
use Win32ServiceBundle\Event\MessengerWorkerMessageFailedEvent;
|
||||
|
||||
class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface
|
||||
{
|
||||
private ContainerInterface $failureSenders;
|
||||
private ?LoggerInterface $logger;
|
||||
|
||||
public function __construct(ContainerInterface $failureSenders, ?LoggerInterface $logger = null)
|
||||
{
|
||||
$this->failureSenders = $failureSenders;
|
||||
$this->logger = $logger;
|
||||
}
|
||||
|
||||
public function onMessageFailed(MessengerWorkerMessageFailedEvent $event): void
|
||||
{
|
||||
if ($event->willRetry()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!$this->failureSenders->has($event->getReceiverName())) {
|
||||
return;
|
||||
}
|
||||
|
||||
$failureSender = $this->failureSenders->get($event->getReceiverName());
|
||||
if ($failureSender === null) {
|
||||
return;
|
||||
}
|
||||
|
||||
$envelope = $event->getEnvelope();
|
||||
|
||||
// avoid re-sending to the failed sender
|
||||
if ($envelope->last(SentToFailureTransportStamp::class) !== null) {
|
||||
return;
|
||||
}
|
||||
|
||||
$envelope = $envelope->with(
|
||||
new SentToFailureTransportStamp($event->getReceiverName()),
|
||||
new DelayStamp(0),
|
||||
new RedeliveryStamp(0)
|
||||
);
|
||||
|
||||
if ($this->logger !== null) {
|
||||
$this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [
|
||||
'class' => \get_class($envelope->getMessage()),
|
||||
'transport' => $failureSender::class,
|
||||
]);
|
||||
}
|
||||
|
||||
$failureSender->send($envelope);
|
||||
}
|
||||
|
||||
public static function getSubscribedEvents(): array
|
||||
{
|
||||
return [
|
||||
MessengerWorkerMessageFailedEvent::class => ['onMessageFailed', -100],
|
||||
];
|
||||
}
|
||||
}
|
||||
@@ -10,7 +10,6 @@ use Symfony\Component\Console\Exception\RuntimeException;
|
||||
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
||||
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
|
||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
@@ -21,11 +20,14 @@ use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
|
||||
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
|
||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||
use Win32Service\Exception\RecoveryActionException;
|
||||
use Win32Service\Model\AbstractServiceRunner;
|
||||
use Win32ServiceBundle\Event\MessengerWorkerMessageFailedEvent;
|
||||
use Win32ServiceBundle\Event\MessengerWorkerMessageHandledEvent;
|
||||
use Win32ServiceBundle\Event\MessengerWorkerRunningEvent;
|
||||
use Win32ServiceBundle\Event\MessengerWorkerStartedEvent;
|
||||
use Win32ServiceBundle\Event\MessengerWorkerStoppedEvent;
|
||||
use Win32ServiceBundle\MessengerSubscriber\ResetServicesListener;
|
||||
use Win32ServiceBundle\MessengerSubscriber\StopWorkerOnFailureLimitListener;
|
||||
use Win32ServiceBundle\MessengerSubscriber\StopWorkerOnMemoryLimitListener;
|
||||
use Win32ServiceBundle\MessengerSubscriber\StopWorkerOnMessageLimitListener;
|
||||
@@ -52,7 +54,7 @@ final class MessengerServiceRunner extends AbstractServiceRunner
|
||||
private ?LoggerInterface $logger = null,
|
||||
private array $receiverNames = [],
|
||||
private ?ResetServicesListener $resetServicesListener = null,
|
||||
private array $busIds = []
|
||||
private array $busIds = [],
|
||||
) {
|
||||
$this->unacks = new \SplObjectStorage();
|
||||
}
|
||||
@@ -66,15 +68,15 @@ final class MessengerServiceRunner extends AbstractServiceRunner
|
||||
if ($limit > 0) {
|
||||
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit, $this->logger));
|
||||
}
|
||||
$failureLimit = (int) $this->config['failure-limit'];
|
||||
$failureLimit = (int) $this->config['failure_limit'];
|
||||
if ($failureLimit > 0) {
|
||||
$this->eventDispatcher->addSubscriber(new StopWorkerOnFailureLimitListener($failureLimit, $this->logger));
|
||||
}
|
||||
$timeLimit = (int) $this->config['time-limit'];
|
||||
$timeLimit = (int) $this->config['time_limit'];
|
||||
if ($timeLimit > 0) {
|
||||
$this->eventDispatcher->addSubscriber(new StopWorkerOnTimeLimitListener($timeLimit, $this->logger));
|
||||
}
|
||||
$memoryLimit = (string) $this->config['memory-limit'];
|
||||
$memoryLimit = (string) $this->config['memory_limit'];
|
||||
if ($memoryLimit > 0) {
|
||||
$this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener(
|
||||
$this->convertToBytes($memoryLimit),
|
||||
@@ -85,9 +87,9 @@ final class MessengerServiceRunner extends AbstractServiceRunner
|
||||
$this->receivers = [];
|
||||
foreach ($this->config['receivers'] as $receiverName) {
|
||||
if (!$this->receiverLocator->has($receiverName)) {
|
||||
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
|
||||
$message = \sprintf('The receiver "%s" does not exist.', $receiverName);
|
||||
if ($this->receiverNames) {
|
||||
$message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
|
||||
$message .= \sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
|
||||
}
|
||||
|
||||
throw new RuntimeException($message);
|
||||
@@ -105,7 +107,8 @@ final class MessengerServiceRunner extends AbstractServiceRunner
|
||||
|
||||
$this->shouldStop = true;
|
||||
$this->requestStop();
|
||||
throw new \RuntimeException('Stop requested');
|
||||
|
||||
throw new RecoveryActionException('Restart requested');
|
||||
}
|
||||
|
||||
protected function beforeContinue(): void
|
||||
@@ -153,6 +156,8 @@ final class MessengerServiceRunner extends AbstractServiceRunner
|
||||
usleep($sleep);
|
||||
}
|
||||
}
|
||||
|
||||
$this->eventDispatcher->dispatch(new MessengerWorkerStoppedEvent($this));
|
||||
}
|
||||
|
||||
protected function lastRunIsTooSlow(float $duration): void
|
||||
|
||||
@@ -12,3 +12,24 @@ services:
|
||||
arguments:
|
||||
- '%win32service.config%'
|
||||
- '%kernel.environment%'
|
||||
|
||||
Win32ServiceBundle\MessengerSubscriber\AddErrorDetailsStampListener: ~
|
||||
|
||||
|
||||
Win32ServiceBundle\MessengerSubscriber\SendFailedMessageForRetryListener:
|
||||
arguments:
|
||||
$retryStrategyLocator: '@messenger.retry_strategy_locator'
|
||||
$logger: '@logger'
|
||||
$eventDispatcher: '@event_dispatcher'
|
||||
tags:
|
||||
- { name: 'monolog.logger', channel: 'messenger' }
|
||||
|
||||
Win32ServiceBundle\MessengerSubscriber\SendFailedMessageToFailureTransportListener:
|
||||
arguments:
|
||||
$logger: '@logger'
|
||||
tags:
|
||||
- {name: 'monolog.logger', channel: 'messenger'}
|
||||
|
||||
Win32ServiceBundle\MessengerSubscriber\ResetServicesListener:
|
||||
arguments:
|
||||
- '@services_resetter'
|
||||
|
||||
@@ -16,7 +16,7 @@ use Win32ServiceBundle\DependencyInjection\TagRunnerCompilerPass;
|
||||
|
||||
class Win32ServiceBundle extends Bundle
|
||||
{
|
||||
public function build(ContainerBuilder $container)
|
||||
public function build(ContainerBuilder $container): void
|
||||
{
|
||||
$autoconfig = $container->registerForAutoconfiguration(RunnerServiceInterface::class);
|
||||
$autoconfig->addTag(TagRunnerCompilerPass::WIN32SERVICE_RUNNER_TAG);
|
||||
|
||||
25
phpunit.xml.dist
Normal file
25
phpunit.xml.dist
Normal file
@@ -0,0 +1,25 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.6/phpunit.xsd"
|
||||
bootstrap="tests/bootstrap.php"
|
||||
forceCoversAnnotation="false"
|
||||
beStrictAboutCoversAnnotation="true"
|
||||
beStrictAboutOutputDuringTests="true"
|
||||
beStrictAboutTodoAnnotatedTests="true"
|
||||
verbose="true">
|
||||
<coverage processUncoveredFiles="true">
|
||||
<include>
|
||||
<directory suffix=".php">lib</directory>
|
||||
</include>
|
||||
</coverage>
|
||||
<testsuite name="default">
|
||||
<directory suffix="Test.php">tests/Unit</directory>
|
||||
</testsuite>
|
||||
<php>
|
||||
<ini name="display_errors" value="1" />
|
||||
<ini name="error_reporting" value="-1" />
|
||||
<server name="APP_ENV" value="test" force="true" />
|
||||
<env name="SYMFONY_DEPRECATIONS_HELPER" value="disabled"/>
|
||||
<env name="KERNEL_CLASS" value="Win32ServiceBundle\Tests\Application\Kernel"/>
|
||||
</php>
|
||||
</phpunit>
|
||||
2
tests/Application/.env.test
Normal file
2
tests/Application/.env.test
Normal file
@@ -0,0 +1,2 @@
|
||||
|
||||
DATABASE_URL="mysql://root:nopassword@127.0.0.1:3306/app?serverVersion=mariadb-10.11.2&charset=utf8mb4"
|
||||
@@ -8,11 +8,20 @@ framework:
|
||||
|
||||
transports:
|
||||
# https://symfony.com/doc/current/messenger.html#transport-configuration
|
||||
async: '%env(MESSENGER_TRANSPORT_DSN)%'
|
||||
async:
|
||||
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
|
||||
retry_strategy:
|
||||
max_retries: 1
|
||||
delay: 1000
|
||||
|
||||
failed: 'doctrine://default?queue_name=failed'
|
||||
# sync: 'sync://'
|
||||
|
||||
routing:
|
||||
# Route your messages to the transports
|
||||
# 'App\Message\YourMessage': async
|
||||
'Win32ServiceBundle\Tests\Application\Event\TestFailedMessage': async
|
||||
'Win32ServiceBundle\Tests\Application\Event\TestMemoryLimitMessage': async
|
||||
'Win32ServiceBundle\Tests\Application\Event\TestMessage': async
|
||||
'Win32ServiceBundle\Tests\Application\Event\TestRetryMessage': async
|
||||
'Win32ServiceBundle\Tests\Application\Event\TestTimeLimitMessage': async
|
||||
|
||||
@@ -6,4 +6,6 @@ win32_service:
|
||||
limit: 10
|
||||
displayed_name: Demo Messenger Consumer Async %d
|
||||
thread_count: 2
|
||||
memory_limit: 128M
|
||||
time_limit: 1
|
||||
|
||||
|
||||
9
tests/Application/src/Event/TestFailedMessage.php
Normal file
9
tests/Application/src/Event/TestFailedMessage.php
Normal file
@@ -0,0 +1,9 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Tests\Application\Event;
|
||||
|
||||
final class TestFailedMessage
|
||||
{
|
||||
}
|
||||
12
tests/Application/src/Event/TestMemoryLimitMessage.php
Normal file
12
tests/Application/src/Event/TestMemoryLimitMessage.php
Normal file
@@ -0,0 +1,12 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Tests\Application\Event;
|
||||
|
||||
final class TestMemoryLimitMessage
|
||||
{
|
||||
public function __construct(public int $size)
|
||||
{
|
||||
}
|
||||
}
|
||||
9
tests/Application/src/Event/TestRetryMessage.php
Normal file
9
tests/Application/src/Event/TestRetryMessage.php
Normal file
@@ -0,0 +1,9 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Tests\Application\Event;
|
||||
|
||||
final class TestRetryMessage
|
||||
{
|
||||
}
|
||||
12
tests/Application/src/Event/TestTimeLimitMessage.php
Normal file
12
tests/Application/src/Event/TestTimeLimitMessage.php
Normal file
@@ -0,0 +1,12 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Tests\Application\Event;
|
||||
|
||||
final class TestTimeLimitMessage
|
||||
{
|
||||
public function __construct(public int $durationInSeconds)
|
||||
{
|
||||
}
|
||||
}
|
||||
23
tests/Application/src/Handler/FailMessageHandler.php
Normal file
23
tests/Application/src/Handler/FailMessageHandler.php
Normal file
@@ -0,0 +1,23 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Tests\Application\Handler;
|
||||
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
|
||||
use Win32ServiceBundle\Tests\Application\Event\TestFailedMessage;
|
||||
|
||||
#[AsMessageHandler(fromTransport: 'async')]
|
||||
final class FailMessageHandler
|
||||
{
|
||||
public function __construct(private LoggerInterface $logger)
|
||||
{
|
||||
}
|
||||
|
||||
public function __invoke(TestFailedMessage $message): void
|
||||
{
|
||||
$this->logger->info('Failed Message');
|
||||
throw new \LogicException('Fail to process');
|
||||
}
|
||||
}
|
||||
26
tests/Application/src/Handler/MemoryLimitMessageHandler.php
Normal file
26
tests/Application/src/Handler/MemoryLimitMessageHandler.php
Normal file
@@ -0,0 +1,26 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Tests\Application\Handler;
|
||||
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
|
||||
use Win32ServiceBundle\Tests\Application\Event\TestMemoryLimitMessage;
|
||||
|
||||
#[AsMessageHandler(fromTransport: 'async')]
|
||||
final class MemoryLimitMessageHandler
|
||||
{
|
||||
/** @var string Buffer to consume memory to stop service */
|
||||
private string $buffer = '';
|
||||
|
||||
public function __construct(private LoggerInterface $logger)
|
||||
{
|
||||
}
|
||||
|
||||
public function __invoke(TestMemoryLimitMessage $message): void
|
||||
{
|
||||
$this->logger->info('Memory Limit Message : '.$message->size);
|
||||
$this->buffer = str_repeat('-*+45defse', (int) ($message->size / 10));
|
||||
}
|
||||
}
|
||||
24
tests/Application/src/Handler/RetryMessageHandler.php
Normal file
24
tests/Application/src/Handler/RetryMessageHandler.php
Normal file
@@ -0,0 +1,24 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Tests\Application\Handler;
|
||||
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
|
||||
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;
|
||||
use Win32ServiceBundle\Tests\Application\Event\TestRetryMessage;
|
||||
|
||||
#[AsMessageHandler(fromTransport: 'async')]
|
||||
final class RetryMessageHandler
|
||||
{
|
||||
public function __construct(private LoggerInterface $logger)
|
||||
{
|
||||
}
|
||||
|
||||
public function __invoke(TestRetryMessage $message): void
|
||||
{
|
||||
$this->logger->info('Retry Message');
|
||||
throw new RecoverableMessageHandlingException('Retry Message');
|
||||
}
|
||||
}
|
||||
23
tests/Application/src/Handler/TimeLimitMessageHandler.php
Normal file
23
tests/Application/src/Handler/TimeLimitMessageHandler.php
Normal file
@@ -0,0 +1,23 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Tests\Application\Handler;
|
||||
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
|
||||
use Win32ServiceBundle\Tests\Application\Event\TestTimeLimitMessage;
|
||||
|
||||
#[AsMessageHandler(fromTransport: 'async')]
|
||||
final class TimeLimitMessageHandler
|
||||
{
|
||||
public function __construct(private LoggerInterface $logger)
|
||||
{
|
||||
}
|
||||
|
||||
public function __invoke(TestTimeLimitMessage $message): void
|
||||
{
|
||||
$this->logger->info('Time Limit Message : '.$message->durationInSeconds);
|
||||
sleep($message->durationInSeconds);
|
||||
}
|
||||
}
|
||||
86
tests/Unit/MessengerIntegration/FaillureRetryMessageTest.php
Normal file
86
tests/Unit/MessengerIntegration/FaillureRetryMessageTest.php
Normal file
@@ -0,0 +1,86 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Tests\Unit\MessengerIntegration;
|
||||
|
||||
require_once \dirname(__DIR__, 2).'/Win32serviceState.php';
|
||||
|
||||
use Doctrine\DBAL\Driver\Connection;
|
||||
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Win32Service\Model\AbstractServiceRunner;
|
||||
use Win32Service\Model\ServiceIdentifier;
|
||||
use Win32Service\Model\Win32serviceState;
|
||||
use Win32ServiceBundle\Model\MessengerServiceRunner;
|
||||
use Win32ServiceBundle\Service\RunnerManager;
|
||||
use Win32ServiceBundle\Service\ServiceConfigurationManager;
|
||||
use Win32ServiceBundle\Tests\Application\Event\TestFailedMessage;
|
||||
|
||||
final class FaillureRetryMessageTest extends KernelTestCase
|
||||
{
|
||||
protected function setUp(): void
|
||||
{
|
||||
Win32serviceState::reset();
|
||||
}
|
||||
|
||||
protected function tearDown(): void
|
||||
{
|
||||
$container = static::getContainer();
|
||||
|
||||
/** @var Connection $connexion */
|
||||
$connexion = $container->get('doctrine.dbal.default_connection');
|
||||
$connexion->rollBack();
|
||||
}
|
||||
|
||||
public function testFailureMessage(): void
|
||||
{
|
||||
$serviceName = 'win32service.demo.messenger.async.0';
|
||||
self::bootKernel();
|
||||
$container = static::getContainer();
|
||||
|
||||
/** @var Connection $connexion */
|
||||
$connexion = $container->get('doctrine.dbal.default_connection');
|
||||
$connexion->beginTransaction();
|
||||
$connexion->query('DELETE FROM messenger_messages');
|
||||
/** @var MessageBusInterface $messengerBus */
|
||||
$messengerBus = $container->get('messenger.bus.default');
|
||||
$messengerBus->dispatch(new TestFailedMessage());
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
|
||||
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
|
||||
$runnerManager = $container->get(RunnerManager::class);
|
||||
$serviceConfigurationManager = $container->get(ServiceConfigurationManager::class);
|
||||
/** @var MessengerServiceRunner $runner */
|
||||
$runner = $runnerManager->getRunner($serviceConfigurationManager->getRunnerAliasForServiceId($serviceName));
|
||||
$runner->setServiceId(new ServiceIdentifier($serviceName));
|
||||
$runner->doRun(1, 0);
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NULL');
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NOT NULL');
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
|
||||
$msrRefrection = new \ReflectionClass(AbstractServiceRunner::class);
|
||||
$stopRequestedProperty = $msrRefrection->getProperty('stopRequested');
|
||||
$stopRequestedProperty->setAccessible(true);
|
||||
|
||||
Win32serviceState::reset();
|
||||
$stopRequestedProperty->setValue($runner, false);
|
||||
|
||||
usleep(1_500_000);
|
||||
|
||||
$runner->doRun(1, 0);
|
||||
$connexion->commit();
|
||||
$connexion->beginTransaction();
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NOT NULL');
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'failed\' AND delivered_at IS NULL');
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
}
|
||||
}
|
||||
78
tests/Unit/MessengerIntegration/LimitNbMessageTest.php
Normal file
78
tests/Unit/MessengerIntegration/LimitNbMessageTest.php
Normal file
@@ -0,0 +1,78 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Tests\Unit\MessengerIntegration;
|
||||
|
||||
require_once \dirname(__DIR__, 2).'/Win32serviceState.php';
|
||||
|
||||
use Doctrine\DBAL\Driver\Connection;
|
||||
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Win32Service\Model\AbstractServiceRunner;
|
||||
use Win32Service\Model\ServiceIdentifier;
|
||||
use Win32Service\Model\Win32serviceState;
|
||||
use Win32ServiceBundle\Model\MessengerServiceRunner;
|
||||
use Win32ServiceBundle\Service\RunnerManager;
|
||||
use Win32ServiceBundle\Service\ServiceConfigurationManager;
|
||||
use Win32ServiceBundle\Tests\Application\Event\TestMessage;
|
||||
|
||||
final class LimitNbMessageTest extends KernelTestCase
|
||||
{
|
||||
protected function setUp(): void
|
||||
{
|
||||
Win32serviceState::reset();
|
||||
}
|
||||
|
||||
protected function tearDown(): void
|
||||
{
|
||||
$container = static::getContainer();
|
||||
|
||||
/** @var Connection $connexion */
|
||||
$connexion = $container->get('doctrine.dbal.default_connection');
|
||||
$connexion->rollBack();
|
||||
}
|
||||
|
||||
public function testLimitMessage(): void
|
||||
{
|
||||
$serviceName = 'win32service.demo.messenger.async.0';
|
||||
self::bootKernel();
|
||||
$container = static::getContainer();
|
||||
|
||||
/** @var Connection $connexion */
|
||||
$connexion = $container->get('doctrine.dbal.default_connection');
|
||||
$connexion->beginTransaction();
|
||||
$connexion->query('DELETE FROM messenger_messages');
|
||||
/** @var MessageBusInterface $messengerBus */
|
||||
$messengerBus = $container->get('messenger.bus.default');
|
||||
$messagesTotal = 20;
|
||||
for ($i = 1; $i <= $messagesTotal; ++$i) {
|
||||
$messengerBus->dispatch(new TestMessage('message '.$i));
|
||||
}
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
|
||||
|
||||
$this->assertSame($messagesTotal, (int) $c->fetchOne());
|
||||
|
||||
$runnerManager = $container->get(RunnerManager::class);
|
||||
$serviceConfigurationManager = $container->get(ServiceConfigurationManager::class);
|
||||
/** @var MessengerServiceRunner $runner */
|
||||
$runner = $runnerManager->getRunner($serviceConfigurationManager->getRunnerAliasForServiceId($serviceName));
|
||||
$runner->setServiceId(new ServiceIdentifier($serviceName));
|
||||
$runner->doRun($messagesTotal, 0);
|
||||
|
||||
$rClass = new \ReflectionClass(AbstractServiceRunner::class);
|
||||
$value = $rClass->getProperty('stopRequested');
|
||||
$value->setAccessible(true);
|
||||
|
||||
$this->assertTrue($value->getValue($runner));
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NULL');
|
||||
|
||||
$this->assertSame(10, (int) $c->fetchOne());
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NOT NULL');
|
||||
// Other message has been deleted, only last processed message is keep
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
}
|
||||
}
|
||||
77
tests/Unit/MessengerIntegration/MemoryLimitMessageTest.php
Normal file
77
tests/Unit/MessengerIntegration/MemoryLimitMessageTest.php
Normal file
@@ -0,0 +1,77 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Tests\Unit\MessengerIntegration;
|
||||
|
||||
require_once \dirname(__DIR__, 2).'/Win32serviceState.php';
|
||||
|
||||
use Doctrine\DBAL\Driver\Connection;
|
||||
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Win32Service\Model\AbstractServiceRunner;
|
||||
use Win32Service\Model\ServiceIdentifier;
|
||||
use Win32Service\Model\Win32serviceState;
|
||||
use Win32ServiceBundle\Model\MessengerServiceRunner;
|
||||
use Win32ServiceBundle\Service\RunnerManager;
|
||||
use Win32ServiceBundle\Service\ServiceConfigurationManager;
|
||||
use Win32ServiceBundle\Tests\Application\Event\TestMemoryLimitMessage;
|
||||
use Win32ServiceBundle\Tests\Application\Event\TestMessage;
|
||||
|
||||
final class MemoryLimitMessageTest extends KernelTestCase
|
||||
{
|
||||
protected function setUp(): void
|
||||
{
|
||||
Win32serviceState::reset();
|
||||
}
|
||||
|
||||
protected function tearDown(): void
|
||||
{
|
||||
$container = static::getContainer();
|
||||
|
||||
/** @var Connection $connexion */
|
||||
$connexion = $container->get('doctrine.dbal.default_connection');
|
||||
$connexion->rollBack();
|
||||
}
|
||||
|
||||
public function testMemoryLimitMessage(): void
|
||||
{
|
||||
$serviceName = 'win32service.demo.messenger.async.0';
|
||||
self::bootKernel();
|
||||
$container = static::getContainer();
|
||||
|
||||
/** @var Connection $connexion */
|
||||
$connexion = $container->get('doctrine.dbal.default_connection');
|
||||
$connexion->beginTransaction();
|
||||
$connexion->query('DELETE FROM messenger_messages');
|
||||
/** @var MessageBusInterface $messengerBus */
|
||||
$messengerBus = $container->get('messenger.bus.default');
|
||||
$messengerBus->dispatch(new TestMemoryLimitMessage( /* 129 Mio */1024 * 1024 * 129));
|
||||
$messengerBus->dispatch(new TestMessage('message 1'));
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
|
||||
|
||||
$this->assertSame(2, (int) $c->fetchOne());
|
||||
|
||||
$runnerManager = $container->get(RunnerManager::class);
|
||||
$serviceConfigurationManager = $container->get(ServiceConfigurationManager::class);
|
||||
/** @var MessengerServiceRunner $runner */
|
||||
$runner = $runnerManager->getRunner($serviceConfigurationManager->getRunnerAliasForServiceId($serviceName));
|
||||
$runner->setServiceId(new ServiceIdentifier($serviceName));
|
||||
$runner->doRun(5, 0);
|
||||
|
||||
$rClass = new \ReflectionClass(AbstractServiceRunner::class);
|
||||
$value = $rClass->getProperty('stopRequested');
|
||||
$value->setAccessible(true);
|
||||
|
||||
$this->assertTrue($value->getValue($runner));
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NULL');
|
||||
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NOT NULL');
|
||||
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
}
|
||||
}
|
||||
68
tests/Unit/MessengerIntegration/MessageTest.php
Normal file
68
tests/Unit/MessengerIntegration/MessageTest.php
Normal file
@@ -0,0 +1,68 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Tests\Unit\MessengerIntegration;
|
||||
|
||||
require_once \dirname(__DIR__, 2).'/Win32serviceState.php';
|
||||
|
||||
use Doctrine\DBAL\Driver\Connection;
|
||||
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Win32Service\Model\ServiceIdentifier;
|
||||
use Win32Service\Model\Win32serviceState;
|
||||
use Win32ServiceBundle\Model\MessengerServiceRunner;
|
||||
use Win32ServiceBundle\Service\RunnerManager;
|
||||
use Win32ServiceBundle\Service\ServiceConfigurationManager;
|
||||
use Win32ServiceBundle\Tests\Application\Event\TestMessage;
|
||||
|
||||
final class MessageTest extends KernelTestCase
|
||||
{
|
||||
protected function setUp(): void
|
||||
{
|
||||
Win32serviceState::reset();
|
||||
}
|
||||
|
||||
protected function tearDown(): void
|
||||
{
|
||||
$container = static::getContainer();
|
||||
|
||||
/** @var Connection $connexion */
|
||||
$connexion = $container->get('doctrine.dbal.default_connection');
|
||||
$connexion->rollBack();
|
||||
}
|
||||
|
||||
public function testNormalMessage(): void
|
||||
{
|
||||
$serviceName = 'win32service.demo.messenger.async.0';
|
||||
self::bootKernel();
|
||||
$container = static::getContainer();
|
||||
|
||||
/** @var Connection $connexion */
|
||||
$connexion = $container->get('doctrine.dbal.default_connection');
|
||||
$connexion->beginTransaction();
|
||||
$connexion->query('DELETE FROM messenger_messages');
|
||||
/** @var MessageBusInterface $messengerBus */
|
||||
$messengerBus = $container->get('messenger.bus.default');
|
||||
$messengerBus->dispatch(new TestMessage('message 1'));
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
|
||||
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
|
||||
$runnerManager = $container->get(RunnerManager::class);
|
||||
$serviceConfigurationManager = $container->get(ServiceConfigurationManager::class);
|
||||
/** @var MessengerServiceRunner $runner */
|
||||
$runner = $runnerManager->getRunner($serviceConfigurationManager->getRunnerAliasForServiceId($serviceName));
|
||||
$runner->setServiceId(new ServiceIdentifier($serviceName));
|
||||
$runner->doRun(1, 0);
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NULL');
|
||||
|
||||
$this->assertSame(0, (int) $c->fetchOne());
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NOT NULL');
|
||||
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
}
|
||||
}
|
||||
64
tests/Unit/MessengerIntegration/RetryMessageTest.php
Normal file
64
tests/Unit/MessengerIntegration/RetryMessageTest.php
Normal file
@@ -0,0 +1,64 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Tests\Unit\MessengerIntegration;
|
||||
|
||||
require_once \dirname(__DIR__, 2).'/Win32serviceState.php';
|
||||
|
||||
use Doctrine\DBAL\Driver\Connection;
|
||||
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Win32Service\Model\ServiceIdentifier;
|
||||
use Win32Service\Model\Win32serviceState;
|
||||
use Win32ServiceBundle\Model\MessengerServiceRunner;
|
||||
use Win32ServiceBundle\Service\RunnerManager;
|
||||
use Win32ServiceBundle\Service\ServiceConfigurationManager;
|
||||
use Win32ServiceBundle\Tests\Application\Event\TestRetryMessage;
|
||||
|
||||
final class RetryMessageTest extends KernelTestCase
|
||||
{
|
||||
protected function setUp(): void
|
||||
{
|
||||
Win32serviceState::reset();
|
||||
}
|
||||
|
||||
protected function tearDown(): void
|
||||
{
|
||||
$container = static::getContainer();
|
||||
|
||||
/** @var Connection $connexion */
|
||||
$connexion = $container->get('doctrine.dbal.default_connection');
|
||||
$connexion->rollBack();
|
||||
}
|
||||
|
||||
public function testRetryMessage(): void
|
||||
{
|
||||
$serviceName = 'win32service.demo.messenger.async.0';
|
||||
self::bootKernel();
|
||||
$container = static::getContainer();
|
||||
|
||||
/** @var Connection $connexion */
|
||||
$connexion = $container->get('doctrine.dbal.default_connection');
|
||||
$connexion->beginTransaction();
|
||||
$connexion->query('DELETE FROM messenger_messages');
|
||||
/** @var MessageBusInterface $messengerBus */
|
||||
$messengerBus = $container->get('messenger.bus.default');
|
||||
$messengerBus->dispatch(new TestRetryMessage());
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
|
||||
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
|
||||
$runnerManager = $container->get(RunnerManager::class);
|
||||
$serviceConfigurationManager = $container->get(ServiceConfigurationManager::class);
|
||||
/** @var MessengerServiceRunner $runner */
|
||||
$runner = $runnerManager->getRunner($serviceConfigurationManager->getRunnerAliasForServiceId($serviceName));
|
||||
$runner->setServiceId(new ServiceIdentifier($serviceName));
|
||||
$runner->doRun(1, 0);
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
|
||||
|
||||
$this->assertSame(2, (int) $c->fetchOne());
|
||||
}
|
||||
}
|
||||
77
tests/Unit/MessengerIntegration/TimeLimitMessageTest.php
Normal file
77
tests/Unit/MessengerIntegration/TimeLimitMessageTest.php
Normal file
@@ -0,0 +1,77 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Win32ServiceBundle\Tests\Unit\MessengerIntegration;
|
||||
|
||||
require_once \dirname(__DIR__, 2).'/Win32serviceState.php';
|
||||
|
||||
use Doctrine\DBAL\Driver\Connection;
|
||||
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
|
||||
use Symfony\Component\Messenger\MessageBusInterface;
|
||||
use Win32Service\Model\AbstractServiceRunner;
|
||||
use Win32Service\Model\ServiceIdentifier;
|
||||
use Win32Service\Model\Win32serviceState;
|
||||
use Win32ServiceBundle\Model\MessengerServiceRunner;
|
||||
use Win32ServiceBundle\Service\RunnerManager;
|
||||
use Win32ServiceBundle\Service\ServiceConfigurationManager;
|
||||
use Win32ServiceBundle\Tests\Application\Event\TestMessage;
|
||||
use Win32ServiceBundle\Tests\Application\Event\TestTimeLimitMessage;
|
||||
|
||||
final class TimeLimitMessageTest extends KernelTestCase
|
||||
{
|
||||
protected function setUp(): void
|
||||
{
|
||||
Win32serviceState::reset();
|
||||
}
|
||||
|
||||
protected function tearDown(): void
|
||||
{
|
||||
$container = static::getContainer();
|
||||
|
||||
/** @var Connection $connexion */
|
||||
$connexion = $container->get('doctrine.dbal.default_connection');
|
||||
$connexion->rollBack();
|
||||
}
|
||||
|
||||
public function testTimeLimitMessage(): void
|
||||
{
|
||||
$serviceName = 'win32service.demo.messenger.async.0';
|
||||
self::bootKernel();
|
||||
$container = static::getContainer();
|
||||
|
||||
/** @var Connection $connexion */
|
||||
$connexion = $container->get('doctrine.dbal.default_connection');
|
||||
$connexion->beginTransaction();
|
||||
$connexion->query('DELETE FROM messenger_messages');
|
||||
/** @var MessageBusInterface $messengerBus */
|
||||
$messengerBus = $container->get('messenger.bus.default');
|
||||
$messengerBus->dispatch(new TestTimeLimitMessage(2));
|
||||
$messengerBus->dispatch(new TestMessage('message 1'));
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
|
||||
|
||||
$this->assertSame(2, (int) $c->fetchOne());
|
||||
|
||||
$runnerManager = $container->get(RunnerManager::class);
|
||||
$serviceConfigurationManager = $container->get(ServiceConfigurationManager::class);
|
||||
/** @var MessengerServiceRunner $runner */
|
||||
$runner = $runnerManager->getRunner($serviceConfigurationManager->getRunnerAliasForServiceId($serviceName));
|
||||
$runner->setServiceId(new ServiceIdentifier($serviceName));
|
||||
$runner->doRun(5, 0);
|
||||
|
||||
$rClass = new \ReflectionClass(AbstractServiceRunner::class);
|
||||
$value = $rClass->getProperty('stopRequested');
|
||||
$value->setAccessible(true);
|
||||
|
||||
$this->assertTrue($value->getValue($runner));
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NULL');
|
||||
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
|
||||
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NOT NULL');
|
||||
|
||||
$this->assertSame(1, (int) $c->fetchOne());
|
||||
}
|
||||
}
|
||||
85
tests/Win32serviceState.php
Normal file
85
tests/Win32serviceState.php
Normal file
@@ -0,0 +1,85 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
/**
|
||||
* Mock for service library abstact.
|
||||
*/
|
||||
|
||||
namespace Win32Service\Model;
|
||||
|
||||
function win32_start_service_ctrl_dispatcher(string $serviceName): bool
|
||||
{
|
||||
return Win32serviceState::getInstance()->setServiceName($serviceName);
|
||||
}
|
||||
|
||||
function win32_set_service_status(int $newState): void
|
||||
{
|
||||
Win32serviceState::getInstance()->changeState($newState);
|
||||
}
|
||||
|
||||
function win32_get_last_control_message(): int
|
||||
{
|
||||
return Win32serviceState::getInstance()->getLastControlMessage();
|
||||
}
|
||||
|
||||
class Win32serviceState
|
||||
{
|
||||
private static ?self $instance = null;
|
||||
|
||||
private int $state = WIN32_SERVICE_STOPPED;
|
||||
|
||||
private ?string $serviceName = null;
|
||||
|
||||
private int $lastControlMessage = WIN32_SERVICE_CONTROL_INTERROGATE;
|
||||
|
||||
public static function getInstance(): self
|
||||
{
|
||||
if (self::$instance === null) {
|
||||
self::$instance = new self();
|
||||
}
|
||||
|
||||
return self::$instance;
|
||||
}
|
||||
|
||||
public static function reset(): void
|
||||
{
|
||||
self::$instance = null;
|
||||
}
|
||||
|
||||
public function setServiceName(string $serviceName): bool
|
||||
{
|
||||
if ($this->serviceName === null) {
|
||||
$this->serviceName = $serviceName;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public function getServiceName(): string
|
||||
{
|
||||
return $this->serviceName;
|
||||
}
|
||||
|
||||
public function changeState(int $newState): void
|
||||
{
|
||||
$this->state = $newState;
|
||||
}
|
||||
|
||||
public function getState(): int
|
||||
{
|
||||
return $this->state;
|
||||
}
|
||||
|
||||
public function getLastControlMessage(): int
|
||||
{
|
||||
return $this->lastControlMessage;
|
||||
}
|
||||
|
||||
public function setLastControlMessage(int $newControlMessage): void
|
||||
{
|
||||
$this->lastControlMessage = $newControlMessage;
|
||||
}
|
||||
}
|
||||
13
tests/bootstrap.php
Normal file
13
tests/bootstrap.php
Normal file
@@ -0,0 +1,13 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
use Symfony\Component\Dotenv\Dotenv;
|
||||
|
||||
require \dirname(__DIR__).'/vendor/autoload.php';
|
||||
|
||||
if (file_exists(__DIR__.'/Application/config/bootstrap.php')) {
|
||||
require __DIR__.'/Application/config/bootstrap.php';
|
||||
} elseif (method_exists(Dotenv::class, 'bootEnv')) {
|
||||
(new Dotenv())->bootEnv(__DIR__.'/Application/.env', 'test');
|
||||
}
|
||||
Reference in New Issue
Block a user