20 Commits
v2.0.2 ... 2.x

Author SHA1 Message Date
macintoshplus
f0f241edc6 use RecoveryActionException to request Mesenger consumer restart 2024-11-29 18:02:37 +01:00
Jean-Baptiste Nahan
9062ff5702 Merge pull request #22 from jbcr/issue_17_failed_message_never_retried_suite
[issue #17] fix retry and faillure queue
2024-10-23 15:23:48 +02:00
jb cr
2927652370 fix failled test 2024-10-23 15:20:50 +02:00
jb cr
5ba4f88608 work in progresse for test faillure queue 2024-10-23 15:12:01 +02:00
jb cr
82d358dc37 update cache action version and cache key 2024-10-23 15:11:52 +02:00
jb cr
0de58e5c83 add tests 2024-10-23 15:11:52 +02:00
jb cr
733080ef3b revert mock namespace 2024-10-23 15:11:52 +02:00
jb cr
250f445374 fix mock 2024-10-23 15:11:51 +02:00
jb cr
221212eb93 add symfony/doctrine-messenger 2024-10-23 15:11:51 +02:00
jb cr
be9562e83c change db name and APP_ENV value 2024-10-23 15:11:51 +02:00
jb cr
6b4086e927 try fix mariadb port 2024-10-23 15:11:51 +02:00
jb cr
51b359dcf8 fix env config 2024-10-23 15:11:50 +02:00
jb cr
84ccbda344 use mariadb 2024-10-23 15:11:50 +02:00
jb cr
9c940bccaa fix database path 2024-10-23 15:11:50 +02:00
jb cr
47b5590267 fix console path 2024-10-23 15:11:50 +02:00
jb cr
551a35255e set PHP version as string instead of float 2024-10-23 15:11:50 +02:00
jb cr
6599c306ae fix matrix config 2024-10-23 15:11:50 +02:00
jb cr
5863dc577c move workflow config 2024-10-23 15:11:49 +02:00
jb cr
114c232a25 add branch for run action 2024-10-23 15:11:49 +02:00
jb cr
ccf379dce7 issue #17 add code to retry and PHPUnit for tests 2024-10-23 15:11:49 +02:00
31 changed files with 1198 additions and 5 deletions

68
.github/workflows/quality.yaml vendored Normal file
View File

@@ -0,0 +1,68 @@
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.
name: Quality
on:
push:
branches: [ "2.x", "issue_17_failed_message_never_retried" ]
pull_request:
branches: [ "2.x" ]
permissions:
contents: read
jobs:
symfony-tests:
strategy:
fail-fast: false
matrix:
php: ['8.0', '8.1', '8.2', '8.3']
runs-on: ubuntu-latest
services:
mariadb:
image: mariadb:10.11
ports:
- 3306:3306
env:
MYSQL_USER: user
MYSQL_PASSWORD: nopassword
MYSQL_DATABASE: app_test
MYSQL_ROOT_PASSWORD: nopassword
options: --health-cmd="mysqladmin ping" --health-interval=5s --health-timeout=2s --health-retries=3
env:
DATABASE_URL: mysql://root:nopassword@127.0.0.1:3306/app?serverVersion=mariadb-10.11.2&charset=utf8mb4
APP_ENV: test
steps:
# To automatically get bug fixes and new Php versions for shivammathur/setup-php,
# change this to (see https://github.com/shivammathur/setup-php#bookmark-versioning):
# uses: shivammathur/setup-php@v2
- uses: shivammathur/setup-php@v2
with:
php-version: ${{ matrix.php }}
coverage: xdebug
- uses: actions/checkout@v4
# - name: Copy .env.test.local
# run: php -r "file_exists('.env.test.local') || copy('.env.test', '.env.test.local');"
- name: Cache Composer packages
id: composer-cache
uses: actions/cache@v4
with:
path: vendor
key: ${{ runner.os }}-php${{ matrix.php }}-${{ hashFiles('**/composer.lock') }}
restore-keys: |
${{ runner.os }}-php${{ matrix.php }}-
- name: Install Dependencies
run: composer install -q --no-ansi --no-interaction --no-scripts --no-progress --prefer-dist
- name: Create Database
run: |
cd tests/Application
bin/console doctrine:migration:migrate -n
- name: Execute tests (Unit and Feature tests) via PHPUnit
run: vendor/bin/phpunit --process-isolation

View File

@@ -41,10 +41,15 @@
"symfony/dotenv": "^5.4|^6.0",
"symfony/runtime": "^5.4|^6.0",
"symfony/messenger": "^5.4|^6.0",
"symfony/doctrine-messenger": "^5.4|^6.0",
"symfony/monolog-bundle": "^3.10",
"doctrine/doctrine-bundle": "^2.12",
"doctrine/doctrine-migrations-bundle": "^3.3",
"doctrine/orm": "^2.19"
"doctrine/orm": "^2.19",
"phpunit/phpunit": "^9.6"
},
"conflict": {
"win32service/service-library": "<1.0.2"
},
"suggest": {
"ext-win32service": "On Windows only, install this extension to run PHP Service on Windows Service Manager"

View File

@@ -8,6 +8,9 @@ use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Exception\OutOfBoundsException;
use Symfony\Component\DependencyInjection\Reference;
use Win32ServiceBundle\MessengerSubscriber\ResetServicesListener;
use Win32ServiceBundle\MessengerSubscriber\SendFailedMessageForRetryListener;
use Win32ServiceBundle\MessengerSubscriber\SendFailedMessageToFailureTransportListener;
final class MessengerPass implements CompilerPassInterface
{
@@ -16,6 +19,13 @@ final class MessengerPass implements CompilerPassInterface
private string $win32ServiceRunnerTag = TagRunnerCompilerPass::WIN32SERVICE_RUNNER_TAG.'.messenger';
public function process(ContainerBuilder $container): void
{
$this->processService($container);
$this->processRetryConfig($container);
$this->processFailledConfig($container);
}
private function processService(ContainerBuilder $container): void
{
$busIds = [];
foreach ($container->findTaggedServiceIds($this->busTag) as $busId => $tags) {
@@ -42,6 +52,7 @@ final class MessengerPass implements CompilerPassInterface
$serviceRunnerDefinition = $container->getDefinition($win32ServiceId);
$serviceRunnerDefinition->replaceArgument(1, new Reference('messenger.routable_message_bus'));
$serviceRunnerDefinition->replaceArgument(7, new Reference(ResetServicesListener::class));
$serviceRunnerDefinition->replaceArgument(6, array_values($receiverNames));
try {
@@ -51,4 +62,34 @@ final class MessengerPass implements CompilerPassInterface
}
}
}
private function processFailledConfig(ContainerBuilder $container): void
{
if (
$container->hasDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') === false
|| $container->hasDefinition(SendFailedMessageToFailureTransportListener::class) === false
) {
return;
}
$serviceSF = $container->findDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
$serviceWin32 = $container->findDefinition(SendFailedMessageToFailureTransportListener::class);
$serviceWin32->replaceArgument('$failureSenders', $serviceSF->getArgument(0));
}
private function processRetryConfig(ContainerBuilder $container): void
{
if (
$container->hasDefinition('messenger.retry.send_failed_message_for_retry_listener') === false
|| $container->hasDefinition(SendFailedMessageForRetryListener::class) === false
) {
return;
}
$serviceSF = $container->findDefinition('messenger.retry.send_failed_message_for_retry_listener');
$serviceWin32 = $container->findDefinition(SendFailedMessageForRetryListener::class);
$serviceWin32->replaceArgument('$sendersLocator', $serviceSF->getArgument(0));
}
}

View File

@@ -0,0 +1,22 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Event;
use Win32ServiceBundle\Model\MessengerServiceRunner;
final class MessengerWorkerStoppedEvent
{
private MessengerServiceRunner $messengerServiceRunner;
public function __construct(MessengerServiceRunner $messengerServiceRunner)
{
$this->messengerServiceRunner = $messengerServiceRunner;
}
public function getMessengerServiceRunner(): MessengerServiceRunner
{
return $this->messengerServiceRunner;
}
}

View File

@@ -0,0 +1,31 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\MessengerSubscriber;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Stamp\ErrorDetailsStamp;
use Win32ServiceBundle\Event\MessengerWorkerMessageFailedEvent;
final class AddErrorDetailsStampListener implements EventSubscriberInterface
{
public function onMessageFailed(MessengerWorkerMessageFailedEvent $event): void
{
$stamp = ErrorDetailsStamp::create($event->getThrowable());
$previousStamp = $event->getEnvelope()->last(ErrorDetailsStamp::class);
// Do not append duplicate information
if ($previousStamp === null || !$previousStamp->equals($stamp)) {
$event->addStamps($stamp);
}
}
public static function getSubscribedEvents(): array
{
return [
// must have higher priority than SendFailedMessageForRetryListener
MessengerWorkerMessageFailedEvent::class => ['onMessageFailed', 200],
];
}
}

View File

@@ -0,0 +1,43 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\MessengerSubscriber;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
use Win32ServiceBundle\Event\MessengerWorkerRunningEvent;
use Win32ServiceBundle\Event\MessengerWorkerStoppedEvent;
/**
* @author Grégoire Pineau <lyrixx@lyrixx.info>
*/
class ResetServicesListener implements EventSubscriberInterface
{
private ServicesResetter $servicesResetter;
public function __construct(ServicesResetter $servicesResetter)
{
$this->servicesResetter = $servicesResetter;
}
public function resetServices(MessengerWorkerRunningEvent $event): void
{
if (!$event->isWorkerIdle()) {
$this->servicesResetter->reset();
}
}
public function resetServicesAtStop(MessengerWorkerStoppedEvent $event): void
{
$this->servicesResetter->reset();
}
public static function getSubscribedEvents(): array
{
return [
MessengerWorkerRunningEvent::class => ['resetServices', -1024],
MessengerWorkerStoppedEvent::class => ['resetServicesAtStop', -1024],
];
}
}

View File

@@ -0,0 +1,163 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\MessengerSubscriber;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageRetriedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface;
use Symfony\Component\Messenger\Exception\RuntimeException;
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
use Win32ServiceBundle\Event\MessengerWorkerMessageFailedEvent;
class SendFailedMessageForRetryListener implements EventSubscriberInterface
{
private ContainerInterface $sendersLocator;
private ContainerInterface $retryStrategyLocator;
private ?LoggerInterface $logger;
private ?EventDispatcherInterface $eventDispatcher;
private int $historySize;
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, ?LoggerInterface $logger = null, ?EventDispatcherInterface $eventDispatcher = null, int $historySize = 10)
{
$this->sendersLocator = $sendersLocator;
$this->retryStrategyLocator = $retryStrategyLocator;
$this->logger = $logger;
$this->eventDispatcher = $eventDispatcher;
$this->historySize = $historySize;
}
public function onMessageFailed(MessengerWorkerMessageFailedEvent $event): void
{
$retryStrategy = $this->getRetryStrategyForTransport($event->getReceiverName());
$envelope = $event->getEnvelope();
$throwable = $event->getThrowable();
$message = $envelope->getMessage();
$context = [
'class' => $message::class,
];
$shouldRetry = $retryStrategy && $this->shouldRetry($throwable, $envelope, $retryStrategy);
$retryCount = RedeliveryStamp::getRetryCountFromEnvelope($envelope);
if ($shouldRetry) {
$event->setForRetry();
++$retryCount;
$delay = $retryStrategy->getWaitingTime($envelope, $throwable);
if ($this->logger !== null) {
$this->logger->warning('Error thrown while handling message {class}. Sending for retry #{retryCount} using {delay} ms delay. Error: "{error}"', $context + ['retryCount' => $retryCount, 'delay' => $delay, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
}
// add the delay and retry stamp info
$retryEnvelope = $this->withLimitedHistory($envelope, new DelayStamp($delay), new RedeliveryStamp($retryCount));
// re-send the message for retry
$this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
if ($this->eventDispatcher !== null) {
$this->eventDispatcher->dispatch(new WorkerMessageRetriedEvent($retryEnvelope, $event->getReceiverName()));
}
} else {
if ($this->logger !== null) {
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
}
}
}
public static function getSubscribedEvents(): array
{
return [
// must have higher priority than SendFailedMessageToFailureTransportListener
MessengerWorkerMessageFailedEvent::class => ['onMessageFailed', 100],
];
}
/**
* Adds stamps to the envelope by keeping only the First + Last N stamps.
*/
private function withLimitedHistory(Envelope $envelope, StampInterface ...$stamps): Envelope
{
foreach ($stamps as $stamp) {
$history = $envelope->all($stamp::class);
if (\count($history) < $this->historySize) {
$envelope = $envelope->with($stamp);
continue;
}
$history = array_merge(
[$history[0]],
\array_slice($history, -$this->historySize + 2),
[$stamp]
);
$envelope = $envelope->withoutAll($stamp::class)->with(...$history);
}
return $envelope;
}
private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInterface $retryStrategy): bool
{
if ($e instanceof RecoverableExceptionInterface) {
return true;
}
// if one or more nested Exceptions is an instance of RecoverableExceptionInterface we should retry
// if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry
if ($e instanceof HandlerFailedException) {
$shouldNotRetry = true;
foreach ($e->getNestedExceptions() as $nestedException) {
if ($nestedException instanceof RecoverableExceptionInterface) {
return true;
}
if (!$nestedException instanceof UnrecoverableExceptionInterface) {
$shouldNotRetry = false;
break;
}
}
if ($shouldNotRetry) {
return false;
}
}
if ($e instanceof UnrecoverableExceptionInterface) {
return false;
}
return $retryStrategy->isRetryable($envelope, $e);
}
private function getRetryStrategyForTransport(string $alias): ?RetryStrategyInterface
{
if ($this->retryStrategyLocator->has($alias)) {
return $this->retryStrategyLocator->get($alias);
}
return null;
}
private function getSenderForTransport(string $alias): SenderInterface
{
if ($this->sendersLocator->has($alias)) {
return $this->sendersLocator->get($alias);
}
throw new RuntimeException(\sprintf('Could not find sender "%s" based on the same receiver to send the failed message to for retry.', $alias));
}
}

View File

@@ -0,0 +1,70 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\MessengerSubscriber;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Win32ServiceBundle\Event\MessengerWorkerMessageFailedEvent;
class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface
{
private ContainerInterface $failureSenders;
private ?LoggerInterface $logger;
public function __construct(ContainerInterface $failureSenders, ?LoggerInterface $logger = null)
{
$this->failureSenders = $failureSenders;
$this->logger = $logger;
}
public function onMessageFailed(MessengerWorkerMessageFailedEvent $event): void
{
if ($event->willRetry()) {
return;
}
if (!$this->failureSenders->has($event->getReceiverName())) {
return;
}
$failureSender = $this->failureSenders->get($event->getReceiverName());
if ($failureSender === null) {
return;
}
$envelope = $event->getEnvelope();
// avoid re-sending to the failed sender
if ($envelope->last(SentToFailureTransportStamp::class) !== null) {
return;
}
$envelope = $envelope->with(
new SentToFailureTransportStamp($event->getReceiverName()),
new DelayStamp(0),
new RedeliveryStamp(0)
);
if ($this->logger !== null) {
$this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [
'class' => \get_class($envelope->getMessage()),
'transport' => $failureSender::class,
]);
}
$failureSender->send($envelope);
}
public static function getSubscribedEvents(): array
{
return [
MessengerWorkerMessageFailedEvent::class => ['onMessageFailed', -100],
];
}
}

View File

@@ -10,7 +10,6 @@ use Symfony\Component\Console\Exception\RuntimeException;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\MessageBusInterface;
@@ -21,11 +20,14 @@ use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Win32Service\Exception\RecoveryActionException;
use Win32Service\Model\AbstractServiceRunner;
use Win32ServiceBundle\Event\MessengerWorkerMessageFailedEvent;
use Win32ServiceBundle\Event\MessengerWorkerMessageHandledEvent;
use Win32ServiceBundle\Event\MessengerWorkerRunningEvent;
use Win32ServiceBundle\Event\MessengerWorkerStartedEvent;
use Win32ServiceBundle\Event\MessengerWorkerStoppedEvent;
use Win32ServiceBundle\MessengerSubscriber\ResetServicesListener;
use Win32ServiceBundle\MessengerSubscriber\StopWorkerOnFailureLimitListener;
use Win32ServiceBundle\MessengerSubscriber\StopWorkerOnMemoryLimitListener;
use Win32ServiceBundle\MessengerSubscriber\StopWorkerOnMessageLimitListener;
@@ -105,6 +107,8 @@ final class MessengerServiceRunner extends AbstractServiceRunner
$this->shouldStop = true;
$this->requestStop();
throw new RecoveryActionException('Restart requested');
}
protected function beforeContinue(): void
@@ -152,6 +156,8 @@ final class MessengerServiceRunner extends AbstractServiceRunner
usleep($sleep);
}
}
$this->eventDispatcher->dispatch(new MessengerWorkerStoppedEvent($this));
}
protected function lastRunIsTooSlow(float $duration): void

View File

@@ -12,3 +12,24 @@ services:
arguments:
- '%win32service.config%'
- '%kernel.environment%'
Win32ServiceBundle\MessengerSubscriber\AddErrorDetailsStampListener: ~
Win32ServiceBundle\MessengerSubscriber\SendFailedMessageForRetryListener:
arguments:
$retryStrategyLocator: '@messenger.retry_strategy_locator'
$logger: '@logger'
$eventDispatcher: '@event_dispatcher'
tags:
- { name: 'monolog.logger', channel: 'messenger' }
Win32ServiceBundle\MessengerSubscriber\SendFailedMessageToFailureTransportListener:
arguments:
$logger: '@logger'
tags:
- {name: 'monolog.logger', channel: 'messenger'}
Win32ServiceBundle\MessengerSubscriber\ResetServicesListener:
arguments:
- '@services_resetter'

View File

@@ -16,7 +16,7 @@ use Win32ServiceBundle\DependencyInjection\TagRunnerCompilerPass;
class Win32ServiceBundle extends Bundle
{
public function build(ContainerBuilder $container)
public function build(ContainerBuilder $container): void
{
$autoconfig = $container->registerForAutoconfiguration(RunnerServiceInterface::class);
$autoconfig->addTag(TagRunnerCompilerPass::WIN32SERVICE_RUNNER_TAG);

25
phpunit.xml.dist Normal file
View File

@@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.6/phpunit.xsd"
bootstrap="tests/bootstrap.php"
forceCoversAnnotation="false"
beStrictAboutCoversAnnotation="true"
beStrictAboutOutputDuringTests="true"
beStrictAboutTodoAnnotatedTests="true"
verbose="true">
<coverage processUncoveredFiles="true">
<include>
<directory suffix=".php">lib</directory>
</include>
</coverage>
<testsuite name="default">
<directory suffix="Test.php">tests/Unit</directory>
</testsuite>
<php>
<ini name="display_errors" value="1" />
<ini name="error_reporting" value="-1" />
<server name="APP_ENV" value="test" force="true" />
<env name="SYMFONY_DEPRECATIONS_HELPER" value="disabled"/>
<env name="KERNEL_CLASS" value="Win32ServiceBundle\Tests\Application\Kernel"/>
</php>
</phpunit>

View File

@@ -0,0 +1,2 @@
DATABASE_URL="mysql://root:nopassword@127.0.0.1:3306/app?serverVersion=mariadb-10.11.2&charset=utf8mb4"

View File

@@ -8,11 +8,20 @@ framework:
transports:
# https://symfony.com/doc/current/messenger.html#transport-configuration
async: '%env(MESSENGER_TRANSPORT_DSN)%'
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
retry_strategy:
max_retries: 1
delay: 1000
failed: 'doctrine://default?queue_name=failed'
# sync: 'sync://'
routing:
# Route your messages to the transports
# 'App\Message\YourMessage': async
'Win32ServiceBundle\Tests\Application\Event\TestFailedMessage': async
'Win32ServiceBundle\Tests\Application\Event\TestMemoryLimitMessage': async
'Win32ServiceBundle\Tests\Application\Event\TestMessage': async
'Win32ServiceBundle\Tests\Application\Event\TestRetryMessage': async
'Win32ServiceBundle\Tests\Application\Event\TestTimeLimitMessage': async

View File

@@ -6,5 +6,6 @@ win32_service:
limit: 10
displayed_name: Demo Messenger Consumer Async %d
thread_count: 2
memory_limit: 3600
memory_limit: 128M
time_limit: 1

View File

@@ -0,0 +1,9 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Application\Event;
final class TestFailedMessage
{
}

View File

@@ -0,0 +1,12 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Application\Event;
final class TestMemoryLimitMessage
{
public function __construct(public int $size)
{
}
}

View File

@@ -0,0 +1,9 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Application\Event;
final class TestRetryMessage
{
}

View File

@@ -0,0 +1,12 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Application\Event;
final class TestTimeLimitMessage
{
public function __construct(public int $durationInSeconds)
{
}
}

View File

@@ -0,0 +1,23 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Application\Handler;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Win32ServiceBundle\Tests\Application\Event\TestFailedMessage;
#[AsMessageHandler(fromTransport: 'async')]
final class FailMessageHandler
{
public function __construct(private LoggerInterface $logger)
{
}
public function __invoke(TestFailedMessage $message): void
{
$this->logger->info('Failed Message');
throw new \LogicException('Fail to process');
}
}

View File

@@ -0,0 +1,26 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Application\Handler;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Win32ServiceBundle\Tests\Application\Event\TestMemoryLimitMessage;
#[AsMessageHandler(fromTransport: 'async')]
final class MemoryLimitMessageHandler
{
/** @var string Buffer to consume memory to stop service */
private string $buffer = '';
public function __construct(private LoggerInterface $logger)
{
}
public function __invoke(TestMemoryLimitMessage $message): void
{
$this->logger->info('Memory Limit Message : '.$message->size);
$this->buffer = str_repeat('-*+45defse', (int) ($message->size / 10));
}
}

View File

@@ -0,0 +1,24 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Application\Handler;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;
use Win32ServiceBundle\Tests\Application\Event\TestRetryMessage;
#[AsMessageHandler(fromTransport: 'async')]
final class RetryMessageHandler
{
public function __construct(private LoggerInterface $logger)
{
}
public function __invoke(TestRetryMessage $message): void
{
$this->logger->info('Retry Message');
throw new RecoverableMessageHandlingException('Retry Message');
}
}

View File

@@ -0,0 +1,23 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Application\Handler;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Win32ServiceBundle\Tests\Application\Event\TestTimeLimitMessage;
#[AsMessageHandler(fromTransport: 'async')]
final class TimeLimitMessageHandler
{
public function __construct(private LoggerInterface $logger)
{
}
public function __invoke(TestTimeLimitMessage $message): void
{
$this->logger->info('Time Limit Message : '.$message->durationInSeconds);
sleep($message->durationInSeconds);
}
}

View File

@@ -0,0 +1,86 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Unit\MessengerIntegration;
require_once \dirname(__DIR__, 2).'/Win32serviceState.php';
use Doctrine\DBAL\Driver\Connection;
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Symfony\Component\Messenger\MessageBusInterface;
use Win32Service\Model\AbstractServiceRunner;
use Win32Service\Model\ServiceIdentifier;
use Win32Service\Model\Win32serviceState;
use Win32ServiceBundle\Model\MessengerServiceRunner;
use Win32ServiceBundle\Service\RunnerManager;
use Win32ServiceBundle\Service\ServiceConfigurationManager;
use Win32ServiceBundle\Tests\Application\Event\TestFailedMessage;
final class FaillureRetryMessageTest extends KernelTestCase
{
protected function setUp(): void
{
Win32serviceState::reset();
}
protected function tearDown(): void
{
$container = static::getContainer();
/** @var Connection $connexion */
$connexion = $container->get('doctrine.dbal.default_connection');
$connexion->rollBack();
}
public function testFailureMessage(): void
{
$serviceName = 'win32service.demo.messenger.async.0';
self::bootKernel();
$container = static::getContainer();
/** @var Connection $connexion */
$connexion = $container->get('doctrine.dbal.default_connection');
$connexion->beginTransaction();
$connexion->query('DELETE FROM messenger_messages');
/** @var MessageBusInterface $messengerBus */
$messengerBus = $container->get('messenger.bus.default');
$messengerBus->dispatch(new TestFailedMessage());
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
$this->assertSame(1, (int) $c->fetchOne());
$runnerManager = $container->get(RunnerManager::class);
$serviceConfigurationManager = $container->get(ServiceConfigurationManager::class);
/** @var MessengerServiceRunner $runner */
$runner = $runnerManager->getRunner($serviceConfigurationManager->getRunnerAliasForServiceId($serviceName));
$runner->setServiceId(new ServiceIdentifier($serviceName));
$runner->doRun(1, 0);
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NULL');
$this->assertSame(1, (int) $c->fetchOne());
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NOT NULL');
$this->assertSame(1, (int) $c->fetchOne());
$msrRefrection = new \ReflectionClass(AbstractServiceRunner::class);
$stopRequestedProperty = $msrRefrection->getProperty('stopRequested');
$stopRequestedProperty->setAccessible(true);
Win32serviceState::reset();
$stopRequestedProperty->setValue($runner, false);
usleep(1_500_000);
$runner->doRun(1, 0);
$connexion->commit();
$connexion->beginTransaction();
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NOT NULL');
$this->assertSame(1, (int) $c->fetchOne());
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'failed\' AND delivered_at IS NULL');
$this->assertSame(1, (int) $c->fetchOne());
}
}

View File

@@ -0,0 +1,78 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Unit\MessengerIntegration;
require_once \dirname(__DIR__, 2).'/Win32serviceState.php';
use Doctrine\DBAL\Driver\Connection;
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Symfony\Component\Messenger\MessageBusInterface;
use Win32Service\Model\AbstractServiceRunner;
use Win32Service\Model\ServiceIdentifier;
use Win32Service\Model\Win32serviceState;
use Win32ServiceBundle\Model\MessengerServiceRunner;
use Win32ServiceBundle\Service\RunnerManager;
use Win32ServiceBundle\Service\ServiceConfigurationManager;
use Win32ServiceBundle\Tests\Application\Event\TestMessage;
final class LimitNbMessageTest extends KernelTestCase
{
protected function setUp(): void
{
Win32serviceState::reset();
}
protected function tearDown(): void
{
$container = static::getContainer();
/** @var Connection $connexion */
$connexion = $container->get('doctrine.dbal.default_connection');
$connexion->rollBack();
}
public function testLimitMessage(): void
{
$serviceName = 'win32service.demo.messenger.async.0';
self::bootKernel();
$container = static::getContainer();
/** @var Connection $connexion */
$connexion = $container->get('doctrine.dbal.default_connection');
$connexion->beginTransaction();
$connexion->query('DELETE FROM messenger_messages');
/** @var MessageBusInterface $messengerBus */
$messengerBus = $container->get('messenger.bus.default');
$messagesTotal = 20;
for ($i = 1; $i <= $messagesTotal; ++$i) {
$messengerBus->dispatch(new TestMessage('message '.$i));
}
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
$this->assertSame($messagesTotal, (int) $c->fetchOne());
$runnerManager = $container->get(RunnerManager::class);
$serviceConfigurationManager = $container->get(ServiceConfigurationManager::class);
/** @var MessengerServiceRunner $runner */
$runner = $runnerManager->getRunner($serviceConfigurationManager->getRunnerAliasForServiceId($serviceName));
$runner->setServiceId(new ServiceIdentifier($serviceName));
$runner->doRun($messagesTotal, 0);
$rClass = new \ReflectionClass(AbstractServiceRunner::class);
$value = $rClass->getProperty('stopRequested');
$value->setAccessible(true);
$this->assertTrue($value->getValue($runner));
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NULL');
$this->assertSame(10, (int) $c->fetchOne());
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NOT NULL');
// Other message has been deleted, only last processed message is keep
$this->assertSame(1, (int) $c->fetchOne());
}
}

View File

@@ -0,0 +1,77 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Unit\MessengerIntegration;
require_once \dirname(__DIR__, 2).'/Win32serviceState.php';
use Doctrine\DBAL\Driver\Connection;
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Symfony\Component\Messenger\MessageBusInterface;
use Win32Service\Model\AbstractServiceRunner;
use Win32Service\Model\ServiceIdentifier;
use Win32Service\Model\Win32serviceState;
use Win32ServiceBundle\Model\MessengerServiceRunner;
use Win32ServiceBundle\Service\RunnerManager;
use Win32ServiceBundle\Service\ServiceConfigurationManager;
use Win32ServiceBundle\Tests\Application\Event\TestMemoryLimitMessage;
use Win32ServiceBundle\Tests\Application\Event\TestMessage;
final class MemoryLimitMessageTest extends KernelTestCase
{
protected function setUp(): void
{
Win32serviceState::reset();
}
protected function tearDown(): void
{
$container = static::getContainer();
/** @var Connection $connexion */
$connexion = $container->get('doctrine.dbal.default_connection');
$connexion->rollBack();
}
public function testMemoryLimitMessage(): void
{
$serviceName = 'win32service.demo.messenger.async.0';
self::bootKernel();
$container = static::getContainer();
/** @var Connection $connexion */
$connexion = $container->get('doctrine.dbal.default_connection');
$connexion->beginTransaction();
$connexion->query('DELETE FROM messenger_messages');
/** @var MessageBusInterface $messengerBus */
$messengerBus = $container->get('messenger.bus.default');
$messengerBus->dispatch(new TestMemoryLimitMessage( /* 129 Mio */1024 * 1024 * 129));
$messengerBus->dispatch(new TestMessage('message 1'));
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
$this->assertSame(2, (int) $c->fetchOne());
$runnerManager = $container->get(RunnerManager::class);
$serviceConfigurationManager = $container->get(ServiceConfigurationManager::class);
/** @var MessengerServiceRunner $runner */
$runner = $runnerManager->getRunner($serviceConfigurationManager->getRunnerAliasForServiceId($serviceName));
$runner->setServiceId(new ServiceIdentifier($serviceName));
$runner->doRun(5, 0);
$rClass = new \ReflectionClass(AbstractServiceRunner::class);
$value = $rClass->getProperty('stopRequested');
$value->setAccessible(true);
$this->assertTrue($value->getValue($runner));
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NULL');
$this->assertSame(1, (int) $c->fetchOne());
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NOT NULL');
$this->assertSame(1, (int) $c->fetchOne());
}
}

View File

@@ -0,0 +1,68 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Unit\MessengerIntegration;
require_once \dirname(__DIR__, 2).'/Win32serviceState.php';
use Doctrine\DBAL\Driver\Connection;
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Symfony\Component\Messenger\MessageBusInterface;
use Win32Service\Model\ServiceIdentifier;
use Win32Service\Model\Win32serviceState;
use Win32ServiceBundle\Model\MessengerServiceRunner;
use Win32ServiceBundle\Service\RunnerManager;
use Win32ServiceBundle\Service\ServiceConfigurationManager;
use Win32ServiceBundle\Tests\Application\Event\TestMessage;
final class MessageTest extends KernelTestCase
{
protected function setUp(): void
{
Win32serviceState::reset();
}
protected function tearDown(): void
{
$container = static::getContainer();
/** @var Connection $connexion */
$connexion = $container->get('doctrine.dbal.default_connection');
$connexion->rollBack();
}
public function testNormalMessage(): void
{
$serviceName = 'win32service.demo.messenger.async.0';
self::bootKernel();
$container = static::getContainer();
/** @var Connection $connexion */
$connexion = $container->get('doctrine.dbal.default_connection');
$connexion->beginTransaction();
$connexion->query('DELETE FROM messenger_messages');
/** @var MessageBusInterface $messengerBus */
$messengerBus = $container->get('messenger.bus.default');
$messengerBus->dispatch(new TestMessage('message 1'));
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
$this->assertSame(1, (int) $c->fetchOne());
$runnerManager = $container->get(RunnerManager::class);
$serviceConfigurationManager = $container->get(ServiceConfigurationManager::class);
/** @var MessengerServiceRunner $runner */
$runner = $runnerManager->getRunner($serviceConfigurationManager->getRunnerAliasForServiceId($serviceName));
$runner->setServiceId(new ServiceIdentifier($serviceName));
$runner->doRun(1, 0);
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NULL');
$this->assertSame(0, (int) $c->fetchOne());
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NOT NULL');
$this->assertSame(1, (int) $c->fetchOne());
}
}

View File

@@ -0,0 +1,64 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Unit\MessengerIntegration;
require_once \dirname(__DIR__, 2).'/Win32serviceState.php';
use Doctrine\DBAL\Driver\Connection;
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Symfony\Component\Messenger\MessageBusInterface;
use Win32Service\Model\ServiceIdentifier;
use Win32Service\Model\Win32serviceState;
use Win32ServiceBundle\Model\MessengerServiceRunner;
use Win32ServiceBundle\Service\RunnerManager;
use Win32ServiceBundle\Service\ServiceConfigurationManager;
use Win32ServiceBundle\Tests\Application\Event\TestRetryMessage;
final class RetryMessageTest extends KernelTestCase
{
protected function setUp(): void
{
Win32serviceState::reset();
}
protected function tearDown(): void
{
$container = static::getContainer();
/** @var Connection $connexion */
$connexion = $container->get('doctrine.dbal.default_connection');
$connexion->rollBack();
}
public function testRetryMessage(): void
{
$serviceName = 'win32service.demo.messenger.async.0';
self::bootKernel();
$container = static::getContainer();
/** @var Connection $connexion */
$connexion = $container->get('doctrine.dbal.default_connection');
$connexion->beginTransaction();
$connexion->query('DELETE FROM messenger_messages');
/** @var MessageBusInterface $messengerBus */
$messengerBus = $container->get('messenger.bus.default');
$messengerBus->dispatch(new TestRetryMessage());
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
$this->assertSame(1, (int) $c->fetchOne());
$runnerManager = $container->get(RunnerManager::class);
$serviceConfigurationManager = $container->get(ServiceConfigurationManager::class);
/** @var MessengerServiceRunner $runner */
$runner = $runnerManager->getRunner($serviceConfigurationManager->getRunnerAliasForServiceId($serviceName));
$runner->setServiceId(new ServiceIdentifier($serviceName));
$runner->doRun(1, 0);
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
$this->assertSame(2, (int) $c->fetchOne());
}
}

View File

@@ -0,0 +1,77 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Unit\MessengerIntegration;
require_once \dirname(__DIR__, 2).'/Win32serviceState.php';
use Doctrine\DBAL\Driver\Connection;
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Symfony\Component\Messenger\MessageBusInterface;
use Win32Service\Model\AbstractServiceRunner;
use Win32Service\Model\ServiceIdentifier;
use Win32Service\Model\Win32serviceState;
use Win32ServiceBundle\Model\MessengerServiceRunner;
use Win32ServiceBundle\Service\RunnerManager;
use Win32ServiceBundle\Service\ServiceConfigurationManager;
use Win32ServiceBundle\Tests\Application\Event\TestMessage;
use Win32ServiceBundle\Tests\Application\Event\TestTimeLimitMessage;
final class TimeLimitMessageTest extends KernelTestCase
{
protected function setUp(): void
{
Win32serviceState::reset();
}
protected function tearDown(): void
{
$container = static::getContainer();
/** @var Connection $connexion */
$connexion = $container->get('doctrine.dbal.default_connection');
$connexion->rollBack();
}
public function testTimeLimitMessage(): void
{
$serviceName = 'win32service.demo.messenger.async.0';
self::bootKernel();
$container = static::getContainer();
/** @var Connection $connexion */
$connexion = $container->get('doctrine.dbal.default_connection');
$connexion->beginTransaction();
$connexion->query('DELETE FROM messenger_messages');
/** @var MessageBusInterface $messengerBus */
$messengerBus = $container->get('messenger.bus.default');
$messengerBus->dispatch(new TestTimeLimitMessage(2));
$messengerBus->dispatch(new TestMessage('message 1'));
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
$this->assertSame(2, (int) $c->fetchOne());
$runnerManager = $container->get(RunnerManager::class);
$serviceConfigurationManager = $container->get(ServiceConfigurationManager::class);
/** @var MessengerServiceRunner $runner */
$runner = $runnerManager->getRunner($serviceConfigurationManager->getRunnerAliasForServiceId($serviceName));
$runner->setServiceId(new ServiceIdentifier($serviceName));
$runner->doRun(5, 0);
$rClass = new \ReflectionClass(AbstractServiceRunner::class);
$value = $rClass->getProperty('stopRequested');
$value->setAccessible(true);
$this->assertTrue($value->getValue($runner));
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NULL');
$this->assertSame(1, (int) $c->fetchOne());
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\' AND delivered_at IS NOT NULL');
$this->assertSame(1, (int) $c->fetchOne());
}
}

View File

@@ -0,0 +1,85 @@
<?php
declare(strict_types=1);
/**
* Mock for service library abstact.
*/
namespace Win32Service\Model;
function win32_start_service_ctrl_dispatcher(string $serviceName): bool
{
return Win32serviceState::getInstance()->setServiceName($serviceName);
}
function win32_set_service_status(int $newState): void
{
Win32serviceState::getInstance()->changeState($newState);
}
function win32_get_last_control_message(): int
{
return Win32serviceState::getInstance()->getLastControlMessage();
}
class Win32serviceState
{
private static ?self $instance = null;
private int $state = WIN32_SERVICE_STOPPED;
private ?string $serviceName = null;
private int $lastControlMessage = WIN32_SERVICE_CONTROL_INTERROGATE;
public static function getInstance(): self
{
if (self::$instance === null) {
self::$instance = new self();
}
return self::$instance;
}
public static function reset(): void
{
self::$instance = null;
}
public function setServiceName(string $serviceName): bool
{
if ($this->serviceName === null) {
$this->serviceName = $serviceName;
return true;
}
return false;
}
public function getServiceName(): string
{
return $this->serviceName;
}
public function changeState(int $newState): void
{
$this->state = $newState;
}
public function getState(): int
{
return $this->state;
}
public function getLastControlMessage(): int
{
return $this->lastControlMessage;
}
public function setLastControlMessage(int $newControlMessage): void
{
$this->lastControlMessage = $newControlMessage;
}
}

13
tests/bootstrap.php Normal file
View File

@@ -0,0 +1,13 @@
<?php
declare(strict_types=1);
use Symfony\Component\Dotenv\Dotenv;
require \dirname(__DIR__).'/vendor/autoload.php';
if (file_exists(__DIR__.'/Application/config/bootstrap.php')) {
require __DIR__.'/Application/config/bootstrap.php';
} elseif (method_exists(Dotenv::class, 'bootEnv')) {
(new Dotenv())->bootEnv(__DIR__.'/Application/.env', 'test');
}