issue #17 add code to retry and PHPUnit for tests

This commit is contained in:
jb cr
2024-10-22 10:50:48 +02:00
parent 26ab698a72
commit ccf379dce7
18 changed files with 653 additions and 1 deletions

53
.github/workflow/quality.yaml vendored Normal file
View File

@@ -0,0 +1,53 @@
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.
name: Quality
on:
push:
branches: [ "2.x" ]
pull_request:
branches: [ "2.x" ]
permissions:
contents: read
jobs:
symfony-tests:
matrix:
php: [8.0, 8.1, 8.2, 8.3]
runs-on: ubuntu-latest
steps:
# To automatically get bug fixes and new Php versions for shivammathur/setup-php,
# change this to (see https://github.com/shivammathur/setup-php#bookmark-versioning):
# uses: shivammathur/setup-php@v2
- uses: shivammathur/setup-php@2cb9b829437ee246e9b3cac53555a39208ca6d28
with:
php-version: ${{ matrix.php }}
coverage: xdebug
- uses: actions/checkout@v4
# - name: Copy .env.test.local
# run: php -r "file_exists('.env.test.local') || copy('.env.test', '.env.test.local');"
- name: Cache Composer packages
id: composer-cache
uses: actions/cache@v3
with:
path: vendor
key: ${{ runner.os }}-php-${{ hashFiles('**/composer.lock') }}
restore-keys: |
${{ runner.os }}-php-
- name: Install Dependencies
run: composer install -q --no-ansi --no-interaction --no-scripts --no-progress --prefer-dist
- name: Create Database
run: |
mkdir -p data
touch data/database.sqlite
- name: Execute tests (Unit and Feature tests) via PHPUnit
env:
DATABASE_URL: sqlite:///%kernel.project_dir%/data/database.sqlite
run: |
bin/console doctrine:schema:update --force --complete
vendor/bin/phpunit

View File

@@ -44,7 +44,8 @@
"symfony/monolog-bundle": "^3.10",
"doctrine/doctrine-bundle": "^2.12",
"doctrine/doctrine-migrations-bundle": "^3.3",
"doctrine/orm": "^2.19"
"doctrine/orm": "^2.19",
"phpunit/phpunit": "^9.6"
},
"suggest": {
"ext-win32service": "On Windows only, install this extension to run PHP Service on Windows Service Manager"

View File

@@ -8,6 +8,8 @@ use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Exception\OutOfBoundsException;
use Symfony\Component\DependencyInjection\Reference;
use Win32ServiceBundle\MessengerSubscriber\SendFailedMessageForRetryListener;
use Win32ServiceBundle\MessengerSubscriber\SendFailedMessageToFailureTransportListener;
final class MessengerPass implements CompilerPassInterface
{
@@ -16,6 +18,13 @@ final class MessengerPass implements CompilerPassInterface
private string $win32ServiceRunnerTag = TagRunnerCompilerPass::WIN32SERVICE_RUNNER_TAG.'.messenger';
public function process(ContainerBuilder $container): void
{
$this->processService($container);
$this->processRetryConfig($container);
$this->processFailledConfig($container);
}
private function processService(ContainerBuilder $container): void
{
$busIds = [];
foreach ($container->findTaggedServiceIds($this->busTag) as $busId => $tags) {
@@ -51,4 +60,34 @@ final class MessengerPass implements CompilerPassInterface
}
}
}
private function processFailledConfig(ContainerBuilder $container): void
{
if (
$container->hasDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') === false
|| $container->hasDefinition(SendFailedMessageToFailureTransportListener::class) === false
) {
return;
}
$serviceSF = $container->findDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
$serviceWin32 = $container->findDefinition(SendFailedMessageToFailureTransportListener::class);
$serviceWin32->replaceArgument('$failureSenders', $serviceSF->getArgument(0));
}
private function processRetryConfig(ContainerBuilder $container): void
{
if (
$container->hasDefinition('messenger.retry.send_failed_message_for_retry_listener') === false
|| $container->hasDefinition(SendFailedMessageForRetryListener::class) === false
) {
return;
}
$serviceSF = $container->findDefinition('messenger.retry.send_failed_message_for_retry_listener');
$serviceWin32 = $container->findDefinition(SendFailedMessageForRetryListener::class);
$serviceWin32->replaceArgument('$sendersLocator', $serviceSF->getArgument(0));
}
}

View File

@@ -0,0 +1,22 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Event;
use Win32ServiceBundle\Model\MessengerServiceRunner;
final class MessengerWorkerStoppedEvent
{
private MessengerServiceRunner $messengerServiceRunner;
public function __construct(MessengerServiceRunner $messengerServiceRunner)
{
$this->messengerServiceRunner = $messengerServiceRunner;
}
public function getMessengerServiceRunner(): MessengerServiceRunner
{
return $this->messengerServiceRunner;
}
}

View File

@@ -0,0 +1,31 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\MessengerSubscriber;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Stamp\ErrorDetailsStamp;
use Win32ServiceBundle\Event\MessengerWorkerMessageFailedEvent;
final class AddErrorDetailsStampListener implements EventSubscriberInterface
{
public function onMessageFailed(MessengerWorkerMessageFailedEvent $event): void
{
$stamp = ErrorDetailsStamp::create($event->getThrowable());
$previousStamp = $event->getEnvelope()->last(ErrorDetailsStamp::class);
// Do not append duplicate information
if ($previousStamp === null || !$previousStamp->equals($stamp)) {
$event->addStamps($stamp);
}
}
public static function getSubscribedEvents(): array
{
return [
// must have higher priority than SendFailedMessageForRetryListener
MessengerWorkerMessageFailedEvent::class => ['onMessageFailed', 200],
];
}
}

View File

@@ -0,0 +1,43 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\MessengerSubscriber;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
use Symfony\Component\Messenger\Event\MessengerWorkerStoppedEvent;
use Win32ServiceBundle\Event\MessengerWorkerRunningEvent;
/**
* @author Grégoire Pineau <lyrixx@lyrixx.info>
*/
class ResetServicesListener implements EventSubscriberInterface
{
private ServicesResetter $servicesResetter;
public function __construct(ServicesResetter $servicesResetter)
{
$this->servicesResetter = $servicesResetter;
}
public function resetServices(MessengerWorkerRunningEvent $event): void
{
if (!$event->isWorkerIdle()) {
$this->servicesResetter->reset();
}
}
public function resetServicesAtStop(MessengerWorkerStoppedEvent $event): void
{
$this->servicesResetter->reset();
}
public static function getSubscribedEvents(): array
{
return [
MessengerWorkerRunningEvent::class => ['resetServices', -1024],
MessengerWorkerStoppedEvent::class => ['resetServicesAtStop', -1024],
];
}
}

View File

@@ -0,0 +1,163 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\MessengerSubscriber;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageRetriedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface;
use Symfony\Component\Messenger\Exception\RuntimeException;
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
use Win32ServiceBundle\Event\MessengerWorkerMessageFailedEvent;
class SendFailedMessageForRetryListener implements EventSubscriberInterface
{
private ContainerInterface $sendersLocator;
private ContainerInterface $retryStrategyLocator;
private ?LoggerInterface $logger;
private ?EventDispatcherInterface $eventDispatcher;
private int $historySize;
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, ?LoggerInterface $logger = null, ?EventDispatcherInterface $eventDispatcher = null, int $historySize = 10)
{
$this->sendersLocator = $sendersLocator;
$this->retryStrategyLocator = $retryStrategyLocator;
$this->logger = $logger;
$this->eventDispatcher = $eventDispatcher;
$this->historySize = $historySize;
}
public function onMessageFailed(MessengerWorkerMessageFailedEvent $event): void
{
$retryStrategy = $this->getRetryStrategyForTransport($event->getReceiverName());
$envelope = $event->getEnvelope();
$throwable = $event->getThrowable();
$message = $envelope->getMessage();
$context = [
'class' => $message::class,
];
$shouldRetry = $retryStrategy && $this->shouldRetry($throwable, $envelope, $retryStrategy);
$retryCount = RedeliveryStamp::getRetryCountFromEnvelope($envelope);
if ($shouldRetry) {
$event->setForRetry();
++$retryCount;
$delay = $retryStrategy->getWaitingTime($envelope, $throwable);
if ($this->logger !== null) {
$this->logger->warning('Error thrown while handling message {class}. Sending for retry #{retryCount} using {delay} ms delay. Error: "{error}"', $context + ['retryCount' => $retryCount, 'delay' => $delay, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
}
// add the delay and retry stamp info
$retryEnvelope = $this->withLimitedHistory($envelope, new DelayStamp($delay), new RedeliveryStamp($retryCount));
// re-send the message for retry
$this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
if ($this->eventDispatcher !== null) {
$this->eventDispatcher->dispatch(new WorkerMessageRetriedEvent($retryEnvelope, $event->getReceiverName()));
}
} else {
if ($this->logger !== null) {
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
}
}
}
public static function getSubscribedEvents(): array
{
return [
// must have higher priority than SendFailedMessageToFailureTransportListener
MessengerWorkerMessageFailedEvent::class => ['onMessageFailed', 100],
];
}
/**
* Adds stamps to the envelope by keeping only the First + Last N stamps.
*/
private function withLimitedHistory(Envelope $envelope, StampInterface ...$stamps): Envelope
{
foreach ($stamps as $stamp) {
$history = $envelope->all($stamp::class);
if (\count($history) < $this->historySize) {
$envelope = $envelope->with($stamp);
continue;
}
$history = array_merge(
[$history[0]],
\array_slice($history, -$this->historySize + 2),
[$stamp]
);
$envelope = $envelope->withoutAll($stamp::class)->with(...$history);
}
return $envelope;
}
private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInterface $retryStrategy): bool
{
if ($e instanceof RecoverableExceptionInterface) {
return true;
}
// if one or more nested Exceptions is an instance of RecoverableExceptionInterface we should retry
// if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry
if ($e instanceof HandlerFailedException) {
$shouldNotRetry = true;
foreach ($e->getNestedExceptions() as $nestedException) {
if ($nestedException instanceof RecoverableExceptionInterface) {
return true;
}
if (!$nestedException instanceof UnrecoverableExceptionInterface) {
$shouldNotRetry = false;
break;
}
}
if ($shouldNotRetry) {
return false;
}
}
if ($e instanceof UnrecoverableExceptionInterface) {
return false;
}
return $retryStrategy->isRetryable($envelope, $e);
}
private function getRetryStrategyForTransport(string $alias): ?RetryStrategyInterface
{
if ($this->retryStrategyLocator->has($alias)) {
return $this->retryStrategyLocator->get($alias);
}
return null;
}
private function getSenderForTransport(string $alias): SenderInterface
{
if ($this->sendersLocator->has($alias)) {
return $this->sendersLocator->get($alias);
}
throw new RuntimeException(\sprintf('Could not find sender "%s" based on the same receiver to send the failed message to for retry.', $alias));
}
}

View File

@@ -0,0 +1,70 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\MessengerSubscriber;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Win32ServiceBundle\Event\MessengerWorkerMessageFailedEvent;
class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface
{
private ContainerInterface $failureSenders;
private ?LoggerInterface $logger;
public function __construct(ContainerInterface $failureSenders, ?LoggerInterface $logger = null)
{
$this->failureSenders = $failureSenders;
$this->logger = $logger;
}
public function onMessageFailed(MessengerWorkerMessageFailedEvent $event): void
{
if ($event->willRetry()) {
return;
}
if (!$this->failureSenders->has($event->getReceiverName())) {
return;
}
$failureSender = $this->failureSenders->get($event->getReceiverName());
if ($failureSender === null) {
return;
}
$envelope = $event->getEnvelope();
// avoid re-sending to the failed sender
if ($envelope->last(SentToFailureTransportStamp::class) !== null) {
return;
}
$envelope = $envelope->with(
new SentToFailureTransportStamp($event->getReceiverName()),
new DelayStamp(0),
new RedeliveryStamp(0)
);
if ($this->logger !== null) {
$this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [
'class' => \get_class($envelope->getMessage()),
'transport' => $failureSender::class,
]);
}
$failureSender->send($envelope);
}
public static function getSubscribedEvents(): array
{
return [
MessengerWorkerMessageFailedEvent::class => ['onMessageFailed', -100],
];
}
}

View File

@@ -26,6 +26,7 @@ use Win32ServiceBundle\Event\MessengerWorkerMessageFailedEvent;
use Win32ServiceBundle\Event\MessengerWorkerMessageHandledEvent;
use Win32ServiceBundle\Event\MessengerWorkerRunningEvent;
use Win32ServiceBundle\Event\MessengerWorkerStartedEvent;
use Win32ServiceBundle\Event\MessengerWorkerStoppedEvent;
use Win32ServiceBundle\MessengerSubscriber\StopWorkerOnFailureLimitListener;
use Win32ServiceBundle\MessengerSubscriber\StopWorkerOnMemoryLimitListener;
use Win32ServiceBundle\MessengerSubscriber\StopWorkerOnMessageLimitListener;
@@ -152,6 +153,8 @@ final class MessengerServiceRunner extends AbstractServiceRunner
usleep($sleep);
}
}
$this->eventDispatcher->dispatch(new MessengerWorkerStoppedEvent($this));
}
protected function lastRunIsTooSlow(float $duration): void

View File

@@ -12,3 +12,22 @@ services:
arguments:
- '%win32service.config%'
- '%kernel.environment%'
Win32ServiceBundle\MessengerSubscriber\SendFailedMessageForRetryListener:
arguments:
$retryStrategyLocator: '@messenger.retry_strategy_locator'
$logger: '@logger'
$eventDispatcher: '@event_dispatcher'
tags:
- { name: 'monolog.logger', channel: 'messenger' }
Win32ServiceBundle\MessengerSubscriber\SendFailedMessageToFailureTransportListener:
arguments:
$logger: '@logger'
tags:
- {name: 'monolog.logger', channel: 'messenger'}
Win32ServiceBundle\MessengerSubscriber\ResetServicesListener:
arguments:
- '@services_resetter'

25
phpunit.xml.dist Normal file
View File

@@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.6/phpunit.xsd"
bootstrap="tests/bootstrap.php"
forceCoversAnnotation="false"
beStrictAboutCoversAnnotation="true"
beStrictAboutOutputDuringTests="true"
beStrictAboutTodoAnnotatedTests="true"
verbose="true">
<coverage processUncoveredFiles="true">
<include>
<directory suffix=".php">lib</directory>
</include>
</coverage>
<testsuite name="default">
<directory suffix="Test.php">tests/Unit</directory>
</testsuite>
<php>
<ini name="display_errors" value="1" />
<ini name="error_reporting" value="-1" />
<server name="APP_ENV" value="test" force="true" />
<env name="SYMFONY_DEPRECATIONS_HELPER" value="disabled"/>
<env name="KERNEL_CLASS" value="Win32ServiceBundle\Tests\Application\Kernel"/>
</php>
</phpunit>

View File

@@ -0,0 +1,2 @@
DATABASE_URL="mysql://root:nopassword@127.0.0.1:3306/app?serverVersion=mariadb-10.11.2&charset=utf8mb4"

View File

@@ -16,3 +16,4 @@ framework:
# Route your messages to the transports
# 'App\Message\YourMessage': async
'Win32ServiceBundle\Tests\Application\Event\TestMessage': async
'Win32ServiceBundle\Tests\Application\Event\TestRetryMessage': async

View File

@@ -0,0 +1,9 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Application\Event;
final class TestRetryMessage
{
}

View File

@@ -0,0 +1,24 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Application\Handler;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;
use Win32ServiceBundle\Tests\Application\Event\TestRetryMessage;
#[AsMessageHandler(fromTransport: 'async')]
final class RetryMessageHandler
{
public function __construct(private LoggerInterface $logger)
{
}
public function __invoke(TestRetryMessage $message): void
{
$this->logger->info('Retry Message');
throw new RecoverableMessageHandlingException('Retry Message');
}
}

View File

@@ -0,0 +1,58 @@
<?php
declare(strict_types=1);
namespace Win32ServiceBundle\Tests\Unit\MessengerIntegration;
require_once \dirname(__DIR__, 2).'/win32service_mock_function.php';
use Doctrine\DBAL\Driver\Connection;
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
use Symfony\Component\Messenger\MessageBusInterface;
use Win32Service\Model\ServiceIdentifier;
use Win32ServiceBundle\Model\MessengerServiceRunner;
use Win32ServiceBundle\Service\RunnerManager;
use Win32ServiceBundle\Service\ServiceConfigurationManager;
use Win32ServiceBundle\Tests\Application\Event\TestRetryMessage;
final class RetryMessageTest extends KernelTestCase
{
protected function tearDown(): void
{
$container = static::getContainer();
/** @var Connection $connexion */
$connexion = $container->get('doctrine.dbal.default_connection');
$connexion->rollBack();
}
public function testRetryMessage(): void
{
$serviceName = 'win32service.demo.messenger.async.0';
self::bootKernel();
$container = static::getContainer();
/** @var Connection $connexion */
$connexion = $container->get('doctrine.dbal.default_connection');
$connexion->beginTransaction();
$connexion->query('DELETE FROM messenger_messages');
/** @var MessageBusInterface $messengerBus */
$messengerBus = $container->get('messenger.bus.default');
$messengerBus->dispatch(new TestRetryMessage());
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
$this->assertSame(1, (int) $c->fetchOne());
$runnerManager = $container->get(RunnerManager::class);
$serviceConfigurationManager = $container->get(ServiceConfigurationManager::class);
/** @var MessengerServiceRunner $runner */
$runner = $runnerManager->getRunner($serviceConfigurationManager->getRunnerAliasForServiceId($serviceName));
$runner->setServiceId(new ServiceIdentifier($serviceName));
$runner->doRun(1, 0);
$c = $connexion->query('SELECT count(*) FROM messenger_messages WHERE queue_name = \'default\'');
$this->assertSame(2, (int) $c->fetchOne());
}
}

13
tests/bootstrap.php Normal file
View File

@@ -0,0 +1,13 @@
<?php
declare(strict_types=1);
use Symfony\Component\Dotenv\Dotenv;
require \dirname(__DIR__).'/vendor/autoload.php';
if (file_exists(__DIR__.'/Application/config/bootstrap.php')) {
require __DIR__.'/Application/config/bootstrap.php';
} elseif (method_exists(Dotenv::class, 'bootEnv')) {
(new Dotenv())->bootEnv(__DIR__.'/Application/.env', 'test');
}

View File

@@ -0,0 +1,76 @@
<?php
declare(strict_types=1);
namespace Win32Service\Model;
function win32_start_service_ctrl_dispatcher(string $serviceName): bool
{
return Win32serviceState::getInstance()->setServiceName($serviceName);
}
function win32_set_service_status(int $newState): void
{
Win32serviceState::getInstance()->changeState($newState);
}
function win32_get_last_control_message(): int
{
return Win32serviceState::getInstance()->getLastControlMessage();
}
class win32service_mock_function
{
private static ?self $instance = null;
private int $state = WIN32_SERVICE_STOPPED;
private ?string $serviceName = null;
private int $lastControlMessage = WIN32_SERVICE_CONTROL_INTERROGATE;
public static function getInstance(): Win32serviceState
{
if (self::$instance === null) {
self::$instance = new self();
}
return self::$instance;
}
public function setServiceName(string $serviceName): bool
{
if ($this->serviceName === null) {
$this->serviceName = $serviceName;
return true;
}
return false;
}
public function getServiceName(): string
{
return $this->serviceName;
}
public function changeState(int $newState): void
{
$this->state = $newState;
}
public function getState(): int
{
return $this->state;
}
public function getLastControlMessage(): int
{
return $this->lastControlMessage;
}
public function setLastControlMessage(int $newControlMessage): void
{
$this->lastControlMessage = $newControlMessage;
}
}