[Scheduler] Rework the component

This commit is contained in:
Fabien Potencier
2023-01-21 09:32:23 +01:00
parent b1d04364a9
commit 3d48efb2c5
45 changed files with 960 additions and 1567 deletions

28
Attribute/AsSchedule.php Normal file
View File

@@ -0,0 +1,28 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Attribute;
/**
* Service tag to autoconfigure schedules.
*
* @author Fabien Potencier <fabien@symfony.com>
*
* @experimental
*/
#[\Attribute(\Attribute::TARGET_CLASS)]
class AsSchedule
{
public function __construct(
public string $name = 'default',
) {
}
}

View File

@@ -4,4 +4,4 @@ CHANGELOG
6.3
---
* Add the component
* Add the component as experimental

View File

@@ -0,0 +1,51 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\DependencyInjection;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @internal
*/
class AddScheduleMessengerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container): void
{
$receivers = [];
foreach ($container->findTaggedServiceIds('messenger.receiver') as $tags) {
$receivers[$tags[0]['alias']] = true;
}
foreach ($container->findTaggedServiceIds('scheduler.schedule_provider') as $tags) {
$name = $tags[0]['name'];
$transportName = 'scheduler_'.$name;
// allows to override the default transport registration
// in case one needs to configure it further (like choosing a different serializer)
if (isset($receivers[$transportName])) {
continue;
}
$transportDefinition = (new Definition(TransportInterface::class))
->setFactory([new Reference('messenger.transport_factory'), 'createTransport'])
->setArguments(['schedule://'.$name, ['transport_name' => $transportName], new Reference('messenger.default_serializer')])
->addTag('messenger.receiver', ['alias' => $transportName])
;
$container->setDefinition($transportId = 'messenger.transport.'.$transportName, $transportDefinition);
$senderAliases[$transportName] = $transportId;
}
}
}

View File

@@ -1,125 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\DependencyInjection;
use Symfony\Component\Cache\CacheItem;
use Symfony\Component\DependencyInjection\ChildDefinition;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Exception\RuntimeException;
use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Scheduler\Messenger\ScheduleTransportFactory;
use Symfony\Contracts\Cache\CacheInterface;
class SchedulerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container)
{
$usedCachePools = [];
$usedLockFactories = [];
foreach ($container->findTaggedServiceIds('messenger.receiver') as $id => $tags) {
$transport = $container->getDefinition($id);
[$dsn, $options] = $transport->getArguments();
if (!ScheduleTransportFactory::isSupported($dsn)) {
continue;
}
if (\is_string($options['cache'] ?? null) && $options['cache']) {
$usedCachePools[] = $options['cache'];
}
if (\is_string($options['lock'] ?? null) && $options['lock']) {
$usedLockFactories[] = $options['lock'];
}
if (\is_array($options['lock'] ?? null) &&
\is_string($options['lock']['resource'] ?? null) &&
$options['lock']['resource']
) {
$usedLockFactories[] = $options['lock']['resource'];
}
}
if ($usedCachePools) {
$this->locateCachePools($container, $usedCachePools);
}
if ($usedLockFactories) {
$this->locateLockFactories($container, $usedLockFactories);
}
}
/**
* @param string[] $cachePools
*/
private function locateCachePools(ContainerBuilder $container, array $cachePools): void
{
if (!class_exists(CacheItem::class)) {
throw new \LogicException('You cannot use the "cache" option if the Cache Component is not available. Try running "composer require symfony/cache".');
}
$references = [];
foreach (array_unique($cachePools) as $name) {
if (!$this->isServiceInstanceOf($container, $id = $name, CacheInterface::class) &&
!$this->isServiceInstanceOf($container, $id = 'cache.'.$name, CacheInterface::class)
) {
throw new RuntimeException(sprintf('The cache pool "%s" does not exist.', $name));
}
$references[$name] = new Reference($id);
}
$container->getDefinition('scheduler.cache_locator')
->replaceArgument(0, $references);
}
/**
* @param string[] $lockFactories
*/
private function locateLockFactories(ContainerBuilder $container, array $lockFactories): void
{
if (!class_exists(LockFactory::class)) {
throw new \LogicException('You cannot use the "lock" option if the Lock Component is not available. Try running "composer require symfony/lock".');
}
$references = [];
foreach (array_unique($lockFactories) as $name) {
if (!$this->isServiceInstanceOf($container, $id = $name, LockFactory::class) &&
!$this->isServiceInstanceOf($container, $id = 'lock.'.$name.'.factory', LockFactory::class)
) {
throw new RuntimeException(sprintf('The lock resource "%s" does not exist.', $name));
}
$references[$name] = new Reference($id);
}
$container->getDefinition('scheduler.lock_locator')
->replaceArgument(0, $references);
}
private function isServiceInstanceOf(ContainerBuilder $container, string $serviceId, string $className): bool
{
if (!$container->hasDefinition($serviceId)) {
return false;
}
while (true) {
$definition = $container->getDefinition($serviceId);
if (!$definition->getClass() && $definition instanceof ChildDefinition) {
$serviceId = $definition->getParent();
continue;
}
return $definition->getClass() && is_a($definition->getClass(), $className, true);
}
}
}

View File

@@ -9,24 +9,31 @@
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\State;
namespace Symfony\Component\Scheduler\Generator;
use Symfony\Component\Lock\LockInterface;
use Symfony\Contracts\Cache\CacheInterface;
final class LockStateDecorator implements StateInterface
/**
* @experimental
*/
final class Checkpoint implements CheckpointInterface
{
private \DateTimeImmutable $time;
private int $index = -1;
private bool $reset = false;
public function __construct(
private readonly State $inner,
private readonly LockInterface $lock,
private readonly string $name,
private readonly ?LockInterface $lock = null,
private readonly ?CacheInterface $cache = null,
) {
}
public function acquire(\DateTimeImmutable $now): bool
{
if (!$this->lock->acquire()) {
// Reset local state if a `Lock` is acquired by another `Worker`.
if ($this->lock && !$this->lock->acquire()) {
// Reset local state if a Lock is acquired by another Worker.
$this->reset = true;
return false;
@@ -34,35 +41,44 @@ final class LockStateDecorator implements StateInterface
if ($this->reset) {
$this->reset = false;
$this->inner->save($now, -1);
$this->save($now, -1);
}
return $this->inner->acquire($now);
$this->time ??= $now;
if ($this->cache) {
$this->save(...$this->cache->get($this->name, fn () => [$now, -1]));
}
return true;
}
public function time(): \DateTimeImmutable
{
return $this->inner->time();
return $this->time;
}
public function index(): int
{
return $this->inner->index();
return $this->index;
}
public function save(\DateTimeImmutable $time, int $index): void
{
$this->inner->save($time, $index);
$this->time = $time;
$this->index = $index;
$this->cache?->get($this->name, fn () => [$time, $index], \INF);
}
/**
* Releases `State`, not `Lock`.
* Releases State, not Lock.
*
* It tries to keep a `Lock` as long as a `Worker` is alive.
* It tries to keep a Lock as long as a Worker is alive.
*/
public function release(\DateTimeImmutable $now, ?\DateTimeImmutable $nextTime): void
{
$this->inner->release($now, $nextTime);
if (!$this->lock) {
return;
}
if (!$nextTime) {
$this->lock->release();

View File

@@ -9,9 +9,12 @@
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\State;
namespace Symfony\Component\Scheduler\Generator;
interface StateInterface
/**
* @experimental
*/
interface CheckpointInterface
{
public function acquire(\DateTimeImmutable $now): bool;

View File

@@ -0,0 +1,98 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Generator;
use Psr\Clock\ClockInterface;
use Symfony\Component\Clock\Clock;
use Symfony\Component\Scheduler\Schedule;
use Symfony\Component\Scheduler\Trigger\TriggerInterface;
/**
* @experimental
*/
final class MessageGenerator implements MessageGeneratorInterface
{
private TriggerHeap $triggerHeap;
private ?\DateTimeImmutable $waitUntil;
private CheckpointInterface $checkpoint;
public function __construct(
private Schedule $schedule,
string|CheckpointInterface $checkpoint,
private ClockInterface $clock = new Clock(),
) {
$this->waitUntil = new \DateTimeImmutable('@0');
if (\is_string($checkpoint)) {
$checkpoint = new Checkpoint('scheduler_checkpoint_'.$checkpoint, $this->schedule->getLock(), $this->schedule->getState());
}
$this->checkpoint = $checkpoint;
}
public function getMessages(): \Generator
{
if (!$this->waitUntil
|| $this->waitUntil > ($now = $this->clock->now())
|| !$this->checkpoint->acquire($now)
) {
return;
}
$lastTime = $this->checkpoint->time();
$lastIndex = $this->checkpoint->index();
$heap = $this->heap($lastTime);
while (!$heap->isEmpty() && $heap->top()[0] <= $now) {
/** @var TriggerInterface $trigger */
[$time, $index, $trigger, $message] = $heap->extract();
$yield = true;
if ($time < $lastTime) {
$time = $lastTime;
$yield = false;
} elseif ($time == $lastTime && $index <= $lastIndex) {
$yield = false;
}
if ($nextTime = $trigger->getNextRunDate($time)) {
$heap->insert([$nextTime, $index, $trigger, $message]);
}
if ($yield) {
yield $message;
$this->checkpoint->save($time, $index);
}
}
$this->waitUntil = $heap->isEmpty() ? null : $heap->top()[0];
$this->checkpoint->release($now, $this->waitUntil);
}
private function heap(\DateTimeImmutable $time): TriggerHeap
{
if (isset($this->triggerHeap) && $this->triggerHeap->time <= $time) {
return $this->triggerHeap;
}
$heap = new TriggerHeap($time);
foreach ($this->schedule->getRecurringMessages() as $index => $recurringMessage) {
if (!$nextTime = $recurringMessage->getTrigger()->getNextRunDate($time)) {
continue;
}
$heap->insert([$nextTime, $index, $recurringMessage->getTrigger(), $recurringMessage->getMessage()]);
}
return $this->triggerHeap = $heap;
}
}

View File

@@ -9,9 +9,12 @@
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Schedule;
namespace Symfony\Component\Scheduler\Generator;
interface ScheduleInterface
/**
* @experimental
*/
interface MessageGeneratorInterface
{
public function getMessages(): iterable;
}

View File

@@ -9,7 +9,7 @@
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Schedule;
namespace Symfony\Component\Scheduler\Generator;
use Symfony\Component\Scheduler\Trigger\TriggerInterface;
@@ -17,8 +17,10 @@ use Symfony\Component\Scheduler\Trigger\TriggerInterface;
* @internal
*
* @extends \SplHeap<array{\DateTimeImmutable, int, TriggerInterface, object}>
*
* @experimental
*/
final class ScheduleHeap extends \SplHeap
final class TriggerHeap extends \SplHeap
{
public function __construct(
public \DateTimeImmutable $time,

View File

@@ -1,64 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Locator;
use Psr\Container\NotFoundExceptionInterface;
use Symfony\Component\Scheduler\Schedule\ScheduleConfig;
final class ChainScheduleConfigLocator implements ScheduleConfigLocatorInterface
{
/**
* @var ScheduleConfigLocatorInterface[]
*/
private array $locators;
private array $lastFound = [];
/**
* @param iterable<ScheduleConfigLocatorInterface> $locators
*/
public function __construct(iterable $locators)
{
$this->locators = (static fn (ScheduleConfigLocatorInterface ...$l) => $l)(...$locators);
}
public function get(string $id): ScheduleConfig
{
if ($locator = $this->findLocator($id)) {
return $locator->get($id);
}
throw new class(sprintf('You have requested a non-existent schedule "%s".', $id)) extends \InvalidArgumentException implements NotFoundExceptionInterface { };
}
public function has(string $id): bool
{
return null !== $this->findLocator($id);
}
private function findLocator(string $id): ?ScheduleConfigLocatorInterface
{
if (isset($this->lastFound[$id])) {
return $this->lastFound[$id];
}
foreach ($this->locators as $locator) {
if ($locator->has($id)) {
$this->lastFound = [$id => $locator];
return $locator;
}
}
return null;
}
}

View File

@@ -1,20 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Locator;
use Psr\Container\ContainerInterface;
use Symfony\Component\Scheduler\Schedule\ScheduleConfig;
interface ScheduleConfigLocatorInterface extends ContainerInterface
{
public function get(string $id): ScheduleConfig;
}

View File

@@ -1,73 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Messenger;
use Psr\Clock\ClockInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Scheduler\Exception\InvalidArgumentException;
use Symfony\Component\Scheduler\Locator\ScheduleConfigLocatorInterface;
use Symfony\Component\Scheduler\Schedule\Schedule;
use Symfony\Component\Scheduler\State\StateFactoryInterface;
class ScheduleTransportFactory implements TransportFactoryInterface
{
protected const DEFAULT_OPTIONS = [
'cache' => null,
'lock' => null,
];
public function __construct(
private readonly ClockInterface $clock,
private readonly ScheduleConfigLocatorInterface $schedules,
private readonly StateFactoryInterface $stateFactory,
) {
}
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): ScheduleTransport
{
if ('schedule://' === $dsn) {
throw new InvalidArgumentException('The Schedule DSN must contains a name, e.g. "schedule://default".');
}
if (false === $scheduleName = parse_url($dsn, \PHP_URL_HOST)) {
throw new InvalidArgumentException(sprintf('The given Schedule DSN "%s" is invalid.', $dsn));
}
unset($options['transport_name']);
$options += static::DEFAULT_OPTIONS;
if (0 < \count($invalidOptions = array_diff_key($options, static::DEFAULT_OPTIONS))) {
throw new InvalidArgumentException(sprintf('Invalid option(s) "%s" passed to the Schedule Messenger transport.', implode('", "', array_keys($invalidOptions))));
}
if (!$this->schedules->has($scheduleName)) {
throw new InvalidArgumentException(sprintf('The schedule "%s" is not found.', $scheduleName));
}
return new ScheduleTransport(
new Schedule(
$this->clock,
$this->stateFactory->create($scheduleName, $options),
$this->schedules->get($scheduleName)
)
);
}
public function supports(string $dsn, array $options): bool
{
return self::isSupported($dsn);
}
final public static function isSupported(string $dsn): bool
{
return str_starts_with($dsn, 'schedule://');
}
}

View File

@@ -13,6 +13,9 @@ namespace Symfony\Component\Scheduler\Messenger;
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
/**
* @experimental
*/
final class ScheduledStamp implements NonSendableStampInterface
{
}

View File

@@ -13,23 +13,23 @@ namespace Symfony\Component\Scheduler\Messenger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\Scheduler\Exception\LogicMessengerException;
use Symfony\Component\Scheduler\Schedule\ScheduleInterface;
use Symfony\Component\Scheduler\Exception\LogicException;
use Symfony\Component\Scheduler\Generator\MessageGeneratorInterface;
class ScheduleTransport implements TransportInterface
/**
* @experimental
*/
class SchedulerTransport implements TransportInterface
{
private readonly array $stamps;
public function __construct(
private readonly ScheduleInterface $schedule,
private readonly MessageGeneratorInterface $messageGenerator,
) {
$this->stamps = [new ScheduledStamp()];
}
public function get(): iterable
{
foreach ($this->schedule->getMessages() as $message) {
yield new Envelope($message, $this->stamps);
foreach ($this->messageGenerator->getMessages() as $message) {
yield Envelope::wrap($message, [new ScheduledStamp()]);
}
}
@@ -40,11 +40,11 @@ class ScheduleTransport implements TransportInterface
public function reject(Envelope $envelope): void
{
throw new LogicMessengerException('Messages from ScheduleTransport must not be rejected.');
throw new LogicException(sprintf('Messages from "%s" must not be rejected.', __CLASS__));
}
public function send(Envelope $envelope): Envelope
{
throw new LogicMessengerException('The ScheduleTransport cannot send messages.');
throw new LogicException(sprintf('"%s" cannot send messages.', __CLASS__));
}
}

View File

@@ -0,0 +1,58 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Messenger;
use Psr\Clock\ClockInterface;
use Psr\Container\ContainerInterface;
use Symfony\Component\Clock\Clock;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Scheduler\Exception\InvalidArgumentException;
use Symfony\Component\Scheduler\Generator\Checkpoint;
use Symfony\Component\Scheduler\Generator\MessageGenerator;
use Symfony\Component\Scheduler\Schedule;
/**
* @experimental
*/
class SchedulerTransportFactory implements TransportFactoryInterface
{
public function __construct(
private readonly ContainerInterface $scheduleProviders,
private readonly ClockInterface $clock = new Clock(),
) {
}
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): SchedulerTransport
{
if ('schedule://' === $dsn) {
throw new InvalidArgumentException('The Schedule DSN must contains a name, e.g. "schedule://default".');
}
if (false === $scheduleName = parse_url($dsn, \PHP_URL_HOST)) {
throw new InvalidArgumentException(sprintf('The given Schedule DSN "%s" is invalid.', $dsn));
}
if (!$this->scheduleProviders->has($scheduleName)) {
throw new InvalidArgumentException(sprintf('The schedule "%s" is not found.', $scheduleName));
}
/** @var Schedule $schedule */
$schedule = $this->scheduleProviders->get($scheduleName)->getSchedule();
$checkpoint = new Checkpoint('scheduler_checkpoint_'.$scheduleName, $schedule->getLock(), $schedule->getState());
return new SchedulerTransport(new MessageGenerator($schedule, $checkpoint, $this->clock));
}
public function supports(string $dsn, array $options): bool
{
return str_starts_with($dsn, 'schedule://');
}
}

View File

@@ -1,50 +1,12 @@
Scheduler Component
====================
===================
Provides basic scheduling through the Symfony Messenger.
Provides scheduling through Symfony Messenger.
Getting Started
---------------
```
$ composer require symfony/scheduler
```
Full DSN with schedule name: `schedule://<name>`
```yaml
# messenger.yaml
framework:
messenger:
transports:
schedule_default: 'schedule://default'
```
```php
<?php
use Symfony\Component\Scheduler\ScheduleConfig;
use Symfony\Component\Scheduler\Trigger\PeriodicalTrigger;
class ExampleLocator implements ScheduleConfigLocatorInterface
{
public function get(string $id): ScheduleConfig
{
return (new ScheduleConfig())
->add(
// do the MaintenanceJob every night at 3 a.m. UTC
PeriodicalTrigger::create('P1D', '03:00:00+00'),
new MaintenanceJob()
)
;
}
public function has(string $id): bool
{
return 'default' === $id;
}
}
```
**This Component is experimental**.
[Experimental features](https://symfony.com/doc/current/contributing/code/experimental.html)
are not covered by Symfony's
[Backward Compatibility Promise](https://symfony.com/doc/current/contributing/code/bc.html).
Resources
---------

58
RecurringMessage.php Normal file
View File

@@ -0,0 +1,58 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler;
use Symfony\Component\Scheduler\Trigger\CronExpressionTrigger;
use Symfony\Component\Scheduler\Trigger\PeriodicalTrigger;
use Symfony\Component\Scheduler\Trigger\TriggerInterface;
/**
* @experimental
*/
final class RecurringMessage
{
private function __construct(
private readonly TriggerInterface $trigger,
private readonly object $message,
) {
}
/**
* Uses a relative date format to define the frequency.
*
* @see https://php.net/datetime.formats.relative
*/
public static function every(string $frequency, object $message, \DateTimeImmutable $from = new \DateTimeImmutable(), ?\DateTimeImmutable $until = new \DateTimeImmutable('3000-01-01')): self
{
return new self(PeriodicalTrigger::create(\DateInterval::createFromDateString($frequency), $from, $until), $message);
}
public static function cron(string $expression, object $message): self
{
return new self(CronExpressionTrigger::fromSpec($expression), $message);
}
public static function trigger(TriggerInterface $trigger, object $message): self
{
return new self($trigger, $message);
}
public function getMessage(): object
{
return $this->message;
}
public function getTrigger(): TriggerInterface
{
return $this->trigger;
}
}

84
Schedule.php Normal file
View File

@@ -0,0 +1,84 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler;
use Symfony\Component\Lock\LockInterface;
use Symfony\Component\Lock\NoLock;
use Symfony\Contracts\Cache\CacheInterface;
/**
* @experimental
*/
final class Schedule implements ScheduleProviderInterface
{
/** @var array<RecurringMessage> */
private array $messages = [];
private ?LockInterface $lock = null;
private ?CacheInterface $state = null;
/**
* @return $this
*/
public function add(RecurringMessage $message, RecurringMessage ...$messages): static
{
$this->messages[] = $message;
$this->messages = array_merge($this->messages, $messages);
return $this;
}
/**
* @return $this
*/
public function lock(LockInterface $lock): static
{
$this->lock = $lock;
return $this;
}
public function getLock(): LockInterface
{
return $this->lock ?? new NoLock();
}
/**
* @return $this
*/
public function stateful(CacheInterface $state): static
{
$this->state = $state;
return $this;
}
public function getState(): ?CacheInterface
{
return $this->state;
}
/**
* @return array<RecurringMessage>
*/
public function getRecurringMessages(): array
{
return $this->messages;
}
/**
* @return $this
*/
public function getSchedule(): static
{
return $this;
}
}

View File

@@ -1,95 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Schedule;
use Psr\Clock\ClockInterface;
use Symfony\Component\Scheduler\State\StateInterface;
use Symfony\Component\Scheduler\Trigger\TriggerInterface;
final class Schedule implements ScheduleInterface
{
/**
* @var array<int, array{TriggerInterface, object}>
*/
private readonly array $schedule;
private ScheduleHeap $scheduleHeap;
private ?\DateTimeImmutable $waitUntil;
public function __construct(
private readonly ClockInterface $clock,
private readonly StateInterface $state,
ScheduleConfig $scheduleConfig,
) {
$this->schedule = $scheduleConfig->getSchedule();
$this->waitUntil = new \DateTimeImmutable('@0');
}
public function getMessages(): \Generator
{
if (!$this->waitUntil ||
$this->waitUntil > ($now = $this->clock->now()) ||
!$this->state->acquire($now)
) {
return;
}
$lastTime = $this->state->time();
$lastIndex = $this->state->index();
$heap = $this->heap($lastTime);
while (!$heap->isEmpty() && $heap->top()[0] <= $now) {
/** @var TriggerInterface $trigger */
[$time, $index, $trigger, $message] = $heap->extract();
$yield = true;
if ($time < $lastTime) {
$time = $lastTime;
$yield = false;
} elseif ($time == $lastTime && $index <= $lastIndex) {
$yield = false;
}
if ($nextTime = $trigger->nextTo($time)) {
$heap->insert([$nextTime, $index, $trigger, $message]);
}
if ($yield) {
yield $message;
$this->state->save($time, $index);
}
}
$this->waitUntil = $heap->isEmpty() ? null : $heap->top()[0];
$this->state->release($now, $this->waitUntil);
}
private function heap(\DateTimeImmutable $time): ScheduleHeap
{
if (isset($this->scheduleHeap) && $this->scheduleHeap->time <= $time) {
return $this->scheduleHeap;
}
$heap = new ScheduleHeap($time);
foreach ($this->schedule as $index => [$trigger, $message]) {
/** @var TriggerInterface $trigger */
if (!$nextTime = $trigger->nextTo($time)) {
continue;
}
$heap->insert([$nextTime, $index, $trigger, $message]);
}
return $this->scheduleHeap = $heap;
}
}

View File

@@ -1,47 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Schedule;
use Symfony\Component\Scheduler\Trigger\TriggerInterface;
final class ScheduleConfig
{
/**
* @var array<int, array{TriggerInterface, object}>
*/
private array $schedule = [];
/**
* @param iterable<array{TriggerInterface, object}> $schedule
*/
public function __construct(iterable $schedule = [])
{
foreach ($schedule as $args) {
$this->add(...$args);
}
}
public function add(TriggerInterface $trigger, object $message): self
{
$this->schedule[] = [$trigger, $message];
return $this;
}
/**
* @return array<int, array{TriggerInterface, object}>
*/
public function getSchedule(): array
{
return $this->schedule;
}
}

View File

@@ -9,11 +9,12 @@
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Exception;
namespace Symfony\Component\Scheduler;
use Symfony\Component\Messenger\Exception\ExceptionInterface;
// not sure about this
class LogicMessengerException extends LogicException implements ExceptionInterface
/**
* @experimental
*/
interface ScheduleProviderInterface
{
public function getSchedule(): Schedule;
}

View File

@@ -1,56 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\State;
use Symfony\Contracts\Cache\CacheInterface;
final class CacheStateDecorator implements StateInterface
{
public function __construct(
private readonly StateInterface $inner,
private readonly CacheInterface $cache,
private readonly string $name,
) {
}
public function acquire(\DateTimeImmutable $now): bool
{
if (!$this->inner->acquire($now)) {
return false;
}
$this->inner->save(...$this->cache->get($this->name, fn () => [$now, -1]));
return true;
}
public function time(): \DateTimeImmutable
{
return $this->inner->time();
}
public function index(): int
{
return $this->inner->index();
}
public function save(\DateTimeImmutable $time, int $index): void
{
$this->inner->save($time, $index);
$this->cache->get($this->name, fn () => [$time, $index], \INF);
}
public function release(\DateTimeImmutable $now, ?\DateTimeImmutable $nextTime): void
{
$this->inner->release($now, $nextTime);
}
}

View File

@@ -1,48 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\State;
final class State implements StateInterface
{
private \DateTimeImmutable $time;
private int $index = -1;
public function acquire(\DateTimeImmutable $now): bool
{
if (!isset($this->time)) {
$this->time = $now;
}
return true;
}
public function time(): \DateTimeImmutable
{
return $this->time;
}
public function index(): int
{
return $this->index;
}
public function save(\DateTimeImmutable $time, int $index): void
{
$this->time = $time;
$this->index = $index;
}
public function release(\DateTimeImmutable $now, ?\DateTimeImmutable $nextTime): void
{
// skip
}
}

View File

@@ -1,91 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\State;
use Psr\Container\ContainerInterface;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\LockInterface;
use Symfony\Component\Scheduler\Exception\LogicException;
use Symfony\Contracts\Cache\CacheInterface;
final class StateFactory implements StateFactoryInterface
{
public function __construct(
private readonly ContainerInterface $lockFactories,
private readonly ContainerInterface $caches,
) {
}
public function create(string $scheduleName, array $options): StateInterface
{
$name = 'messenger.schedule.'.$scheduleName;
$state = new State();
if ($lock = $this->createLock($scheduleName, $name, $options)) {
$state = new LockStateDecorator($state, $lock);
}
if ($cache = $this->createCache($scheduleName, $options)) {
$state = new CacheStateDecorator($state, $cache, $name);
}
return $state;
}
private function createLock(string $scheduleName, string $resourceName, array $options): ?LockInterface
{
if (!($options['lock'] ?? false)) {
return null;
}
if (\is_string($options['lock'])) {
$options['lock'] = ['resource' => $options['lock']];
}
if (\is_array($options['lock']) && \is_string($resource = $options['lock']['resource'] ?? null)) {
if (!$this->lockFactories->has($resource)) {
throw new LogicException(sprintf('The lock resource "%s" does not exist.', $resource));
}
/** @var LockFactory $lockFactory */
$lockFactory = $this->lockFactories->get($resource);
$args = ['resource' => $resourceName];
if (isset($options['lock']['ttl'])) {
$args['ttl'] = (float) $options['lock']['ttl'];
}
if (isset($options['lock']['auto_release'])) {
$args['autoRelease'] = (float) $options['lock']['auto_release'];
}
return $lockFactory->createLock(...$args);
}
throw new LogicException(sprintf('Invalid lock configuration for "%s" schedule.', $scheduleName));
}
private function createCache(string $scheduleName, array $options): ?CacheInterface
{
if (!($options['cache'] ?? false)) {
return null;
}
if (\is_string($options['cache'])) {
if (!$this->caches->has($options['cache'])) {
throw new LogicException(sprintf('The cache pool "%s" does not exist.', $options['cache']));
}
return $this->caches->get($options['cache']);
}
throw new LogicException(sprintf('Invalid cache configuration for "%s" schedule.', $scheduleName));
}
}

View File

@@ -1,20 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\State;
interface StateFactoryInterface
{
/**
* @param array<string, int|float|string|bool|null> $options
*/
public function create(string $scheduleName, array $options): StateInterface;
}

View File

@@ -0,0 +1,252 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Tests\Generator;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Cache\Adapter\ArrayAdapter;
use Symfony\Component\Lock\Key;
use Symfony\Component\Lock\Lock;
use Symfony\Component\Lock\LockInterface;
use Symfony\Component\Lock\NoLock;
use Symfony\Component\Lock\Store\InMemoryStore;
use Symfony\Component\Scheduler\Generator\Checkpoint;
class CheckpointTest extends TestCase
{
public function testWithoutLockAndWithoutState()
{
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
$later = $now->modify('1 hour');
$checkpoint = new Checkpoint('dummy');
$this->assertTrue($checkpoint->acquire($now));
$this->assertSame($now, $checkpoint->time());
$this->assertSame(-1, $checkpoint->index());
$checkpoint->save($later, 7);
$this->assertSame($later, $checkpoint->time());
$this->assertSame(7, $checkpoint->index());
$checkpoint->release($later, null);
}
public function testWithStateInitStateOnFirstAcquiring()
{
$checkpoint = new Checkpoint('cache', new NoLock(), $cache = new ArrayAdapter());
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
$this->assertTrue($checkpoint->acquire($now));
$this->assertEquals($now, $checkpoint->time());
$this->assertEquals(-1, $checkpoint->index());
$this->assertEquals([$now, -1], $cache->get('cache', fn () => []));
}
public function testWithStateLoadStateOnAcquiring()
{
$checkpoint = new Checkpoint('cache', new NoLock(), $cache = new ArrayAdapter());
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
$cache->get('cache', fn () => [$now, 0], \INF);
$this->assertTrue($checkpoint->acquire($now->modify('1 min')));
$this->assertEquals($now, $checkpoint->time());
$this->assertEquals(0, $checkpoint->index());
$this->assertEquals([$now, 0], $cache->get('cache', fn () => []));
}
public function testWithLockInitStateOnFirstAcquiring()
{
$lock = new Lock(new Key('lock'), new InMemoryStore());
$checkpoint = new Checkpoint('dummy', $lock);
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
$this->assertTrue($checkpoint->acquire($now));
$this->assertEquals($now, $checkpoint->time());
$this->assertEquals(-1, $checkpoint->index());
$this->assertTrue($lock->isAcquired());
}
public function testwithLockLoadStateOnAcquiring()
{
$lock = new Lock(new Key('lock'), new InMemoryStore());
$checkpoint = new Checkpoint('dummy', $lock);
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
$checkpoint->save($now, 0);
$this->assertTrue($checkpoint->acquire($now->modify('1 min')));
$this->assertEquals($now, $checkpoint->time());
$this->assertEquals(0, $checkpoint->index());
$this->assertTrue($lock->isAcquired());
}
public function testWithLockCannotAcquireIfAlreadyAcquired()
{
$concurrentLock = new Lock(new Key('locked'), $store = new InMemoryStore(), autoRelease: false);
$concurrentLock->acquire();
$this->assertTrue($concurrentLock->isAcquired());
$lock = new Lock(new Key('locked'), $store, autoRelease: false);
$checkpoint = new Checkpoint('locked', $lock);
$this->assertFalse($checkpoint->acquire(new \DateTimeImmutable()));
}
public function testWithCacheSave()
{
$checkpoint = new Checkpoint('cache', new NoLock(), $cache = new ArrayAdapter());
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
$checkpoint->acquire($n = $now->modify('-1 hour'));
$checkpoint->save($now, 3);
$this->assertSame($now, $checkpoint->time());
$this->assertSame(3, $checkpoint->index());
$this->assertEquals([$now, 3], $cache->get('cache', fn () => []));
}
public function testWithLockSave()
{
$lock = new Lock(new Key('lock'), new InMemoryStore());
$checkpoint = new Checkpoint('dummy', $lock);
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
$checkpoint->acquire($now->modify('-1 hour'));
$checkpoint->save($now, 3);
$this->assertSame($now, $checkpoint->time());
$this->assertSame(3, $checkpoint->index());
}
public function testWithLockAndCacheSave()
{
$lock = new Lock(new Key('lock'), new InMemoryStore());
$checkpoint = new Checkpoint('dummy', $lock, $cache = new ArrayAdapter());
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
$checkpoint->acquire($now->modify('-1 hour'));
$checkpoint->save($now, 3);
$this->assertSame($now, $checkpoint->time());
$this->assertSame(3, $checkpoint->index());
$this->assertEquals([$now, 3], $cache->get('dummy', fn () => []));
}
public function testWithCacheFullCycle()
{
$checkpoint = new Checkpoint('cache', new NoLock(), $cache = new ArrayAdapter());
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
// init
$cache->get('cache', fn () => [$now->modify('-1 min'), 3], \INF);
// action
$acquired = $checkpoint->acquire($now);
$lastTime = $checkpoint->time();
$lastIndex = $checkpoint->index();
$checkpoint->save($now, 0);
$checkpoint->release($now, null);
// asserting
$this->assertTrue($acquired);
$this->assertEquals($now->modify('-1 min'), $lastTime);
$this->assertSame(3, $lastIndex);
$this->assertEquals($now, $checkpoint->time());
$this->assertSame(0, $checkpoint->index());
$this->assertEquals([$now, 0], $cache->get('cache', fn () => []));
}
public function testWithLockResetStateAfterLockedAcquiring()
{
$concurrentLock = new Lock(new Key('locked'), $store = new InMemoryStore(), autoRelease: false);
$concurrentLock->acquire();
$this->assertTrue($concurrentLock->isAcquired());
$lock = new Lock(new Key('locked'), $store, autoRelease: false);
$checkpoint = new Checkpoint('locked', $lock);
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
$checkpoint->save($now->modify('-2 min'), 0);
$checkpoint->acquire($now->modify('-1 min'));
$concurrentLock->release();
$this->assertTrue($checkpoint->acquire($now));
$this->assertEquals($now, $checkpoint->time());
$this->assertEquals(-1, $checkpoint->index());
$this->assertTrue($lock->isAcquired());
$this->assertFalse($concurrentLock->isAcquired());
}
public function testWithLockKeepLock()
{
$lock = new Lock(new Key('lock'), new InMemoryStore());
$checkpoint = new Checkpoint('dummy', $lock);
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
$checkpoint->acquire($now->modify('-1 min'));
$checkpoint->release($now, $now->modify('1 min'));
$this->assertTrue($lock->isAcquired());
}
public function testWithLockReleaseLock()
{
$lock = new Lock(new Key('lock'), new InMemoryStore());
$checkpoint = new Checkpoint('dummy', $lock);
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
$checkpoint->acquire($now->modify('-1 min'));
$checkpoint->release($now, null);
$this->assertFalse($lock->isAcquired());
}
public function testWithLockRefreshLock()
{
$lock = $this->createMock(LockInterface::class);
$lock->method('acquire')->willReturn(true);
$lock->method('getRemainingLifetime')->willReturn(120.0);
$lock->expects($this->once())->method('refresh')->with(120.0 + 60.0);
$lock->expects($this->never())->method('release');
$checkpoint = new Checkpoint('dummy', $lock);
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
$checkpoint->acquire($now->modify('-10 sec'));
$checkpoint->release($now, $now->modify('60 sec'));
}
public function testWithLockFullCycle()
{
$lock = new Lock(new Key('lock'), new InMemoryStore());
$checkpoint = new Checkpoint('dummy', $lock);
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
// init
$checkpoint->save($now->modify('-1 min'), 3);
// action
$acquired = $checkpoint->acquire($now);
$lastTime = $checkpoint->time();
$lastIndex = $checkpoint->index();
$checkpoint->save($now, 0);
$checkpoint->release($now, null);
// asserting
$this->assertTrue($acquired);
$this->assertEquals($now->modify('-1 min'), $lastTime);
$this->assertSame(3, $lastIndex);
$this->assertEquals($now, $checkpoint->time());
$this->assertSame(0, $checkpoint->index());
$this->assertFalse($lock->isAcquired());
}
}

View File

@@ -9,18 +9,49 @@
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Tests\Schedule;
namespace Symfony\Component\Scheduler\Tests\Generator;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Cache\Adapter\ArrayAdapter;
use Symfony\Component\Clock\ClockInterface;
use Symfony\Component\Scheduler\Schedule\Schedule;
use Symfony\Component\Scheduler\Schedule\ScheduleConfig;
use Symfony\Component\Scheduler\State\State;
use Symfony\Component\Scheduler\Generator\MessageGenerator;
use Symfony\Component\Scheduler\RecurringMessage;
use Symfony\Component\Scheduler\Schedule;
use Symfony\Component\Scheduler\Trigger\TriggerInterface;
class ScheduleTest extends TestCase
class MessageGeneratorTest extends TestCase
{
public function messagesProvider(): \Generator
/**
* @dataProvider messagesProvider
*/
public function testGetMessages(string $startTime, array $runs, array $schedule)
{
// for referencing
$now = self::makeDateTime($startTime);
$clock = $this->createMock(ClockInterface::class);
$clock->method('now')->willReturnReference($now);
foreach ($schedule as $i => $s) {
if (\is_array($s)) {
$schedule[$i] = $this->createMessage(...$s);
}
}
$schedule = (new Schedule())->add(...$schedule);
$schedule->stateful(new ArrayAdapter());
$scheduler = new MessageGenerator($schedule, 'dummy', $clock);
// Warmup. The first run is always returns nothing.
$this->assertSame([], iterator_to_array($scheduler->getMessages()));
foreach ($runs as $time => $expected) {
$now = self::makeDateTime($time);
$this->assertSame($expected, iterator_to_array($scheduler->getMessages()));
}
}
public static function messagesProvider(): \Generator
{
$first = (object) ['id' => 'first'];
$second = (object) ['id' => 'second'];
@@ -34,9 +65,7 @@ class ScheduleTest extends TestCase
'22:13:00' => [$first],
'22:13:01' => [],
],
'schedule' => [
$this->makeSchedule($first, '22:13:00', '22:14:00'),
],
'schedule' => [[$first, '22:13:00', '22:14:00']],
];
yield 'microseconds' => [
@@ -46,9 +75,7 @@ class ScheduleTest extends TestCase
'22:13:00' => [$first],
'22:13:01' => [],
],
'schedule' => [
$this->makeSchedule($first, '22:13:00', '22:14:00', '22:15:00'),
],
'schedule' => [[$first, '22:13:00', '22:14:00', '22:15:00']],
];
yield 'skipped' => [
@@ -56,9 +83,7 @@ class ScheduleTest extends TestCase
'runs' => [
'22:14:01' => [$first, $first],
],
'schedule' => [
$this->makeSchedule($first, '22:13:00', '22:14:00', '22:15:00'),
],
'schedule' => [[$first, '22:13:00', '22:14:00', '22:15:00']],
];
yield 'sequence' => [
@@ -71,9 +96,7 @@ class ScheduleTest extends TestCase
'22:14:00' => [$first],
'22:14:01' => [],
],
'schedule' => [
$this->makeSchedule($first, '22:13:00', '22:14:00', '22:15:00'),
],
'schedule' => [[$first, '22:13:00', '22:14:00', '22:15:00']],
];
yield 'concurrency' => [
@@ -85,9 +108,9 @@ class ScheduleTest extends TestCase
'22:13:02.555' => [],
],
'schedule' => [
$this->makeSchedule($first, '22:12:59', '22:13:00', '22:13:01', '22:13:02', '22:13:03'),
$this->makeSchedule($second, '22:13:00', '22:14:00'),
$this->makeSchedule($third, '22:12:30', '22:13:30'),
[$first, '22:12:59', '22:13:00', '22:13:01', '22:13:02', '22:13:03'],
[$second, '22:13:00', '22:14:00'],
[$third, '22:12:30', '22:13:30'],
],
];
@@ -100,8 +123,8 @@ class ScheduleTest extends TestCase
'22:14:01' => [],
],
'schedule' => [
$this->makeSchedule($first, '22:13:00', '22:14:00', '22:15:00'),
$this->makeSchedule($second, '22:13:00', '22:14:00', '22:15:00'),
[$first, '22:13:00', '22:14:00', '22:15:00'],
[$second, '22:13:00', '22:14:00', '22:15:00'],
],
];
@@ -111,51 +134,25 @@ class ScheduleTest extends TestCase
'22:12:01' => [],
],
'schedule' => [
[$this->createMock(TriggerInterface::class), $this],
RecurringMessage::trigger(new class() implements TriggerInterface {
public function getNextRunDate(\DateTimeImmutable $run): ?\DateTimeImmutable
{
return null;
}
}, (object) []),
],
];
}
/**
* @dataProvider messagesProvider
*/
public function testGetMessages(string $startTime, array $runs, array $schedule)
private function createMessage(object $message, string ...$runs): RecurringMessage
{
// for referencing
$now = $this->makeDateTime($startTime);
$clock = $this->createMock(ClockInterface::class);
$clock->method('now')->willReturnReference($now);
$scheduler = new Schedule($clock, new State(), new ScheduleConfig($schedule));
// Warmup. The first run is always returns nothing.
$this->assertSame([], iterator_to_array($scheduler->getMessages()));
foreach ($runs as $time => $expected) {
$now = $this->makeDateTime($time);
$this->assertSame($expected, iterator_to_array($scheduler->getMessages()));
}
}
private function makeDateTime(string $time): \DateTimeImmutable
{
return new \DateTimeImmutable('2020-02-20T'.$time, new \DateTimeZone('UTC'));
}
/**
* @return array{TriggerInterface, object}
*/
private function makeSchedule(object $message, string ...$runs): array
{
$runs = array_map(fn ($time) => $this->makeDateTime($time), $runs);
$runs = array_map(fn ($time) => self::makeDateTime($time), $runs);
sort($runs);
$ticks = [$this->makeDateTime(''), 0];
$ticks = [self::makeDateTime(''), 0];
$trigger = $this->createMock(TriggerInterface::class);
$trigger
->method('nextTo')
->method('getNextRunDate')
->willReturnCallback(function (\DateTimeImmutable $lastTick) use ($runs, &$ticks): \DateTimeImmutable {
[$tick, $count] = $ticks;
if ($lastTick > $tick) {
@@ -175,6 +172,11 @@ class ScheduleTest extends TestCase
$this->fail(sprintf('There is no next run for tick %s', $lastTick->format(\DateTimeImmutable::RFC3339_EXTENDED)));
});
return [$trigger, $message];
return RecurringMessage::trigger($trigger, $message);
}
private static function makeDateTime(string $time): \DateTimeImmutable
{
return new \DateTimeImmutable('2020-02-20T'.$time, new \DateTimeZone('UTC'));
}
}

View File

@@ -1,49 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Tests\Locator;
use PHPUnit\Framework\TestCase;
use Psr\Container\NotFoundExceptionInterface;
use Symfony\Component\Scheduler\Locator\ChainScheduleConfigLocator;
use Symfony\Component\Scheduler\Locator\ScheduleConfigLocatorInterface;
use Symfony\Component\Scheduler\Schedule\ScheduleConfig;
class ChainScheduleConfigLocatorTest extends TestCase
{
public function testExists()
{
$schedule = new ScheduleConfig();
$empty = $this->createMock(ScheduleConfigLocatorInterface::class);
$empty->expects($this->once())->method('has')->with('exists')->willReturn(false);
$empty->expects($this->never())->method('get');
$full = $this->createMock(ScheduleConfigLocatorInterface::class);
$full->expects($this->once())->method('has')->with('exists')->willReturn(true);
$full->expects($this->once())->method('get')->with('exists')->willReturn($schedule);
$locator = new ChainScheduleConfigLocator([$empty, $full]);
$this->assertTrue($locator->has('exists'));
$this->assertSame($schedule, $locator->get('exists'));
}
public function testNonExists()
{
$locator = new ChainScheduleConfigLocator([$this->createMock(ScheduleConfigLocatorInterface::class)]);
$this->assertFalse($locator->has('non-exists'));
$this->expectException(NotFoundExceptionInterface::class);
$locator->get('non-exists');
}
}

View File

@@ -13,52 +13,39 @@ namespace Symfony\Component\Scheduler\Tests\Messenger;
use PHPUnit\Framework\TestCase;
use Psr\Clock\ClockInterface;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Scheduler\Exception\InvalidArgumentException;
use Symfony\Component\Scheduler\Locator\ScheduleConfigLocatorInterface;
use Symfony\Component\Scheduler\Messenger\ScheduleTransport;
use Symfony\Component\Scheduler\Messenger\ScheduleTransportFactory;
use Symfony\Component\Scheduler\Schedule\Schedule;
use Symfony\Component\Scheduler\Schedule\ScheduleConfig;
use Symfony\Component\Scheduler\State\StateFactoryInterface;
use Symfony\Component\Scheduler\State\StateInterface;
use Symfony\Component\Scheduler\Generator\MessageGenerator;
use Symfony\Component\Scheduler\Messenger\SchedulerTransport;
use Symfony\Component\Scheduler\Messenger\SchedulerTransportFactory;
use Symfony\Component\Scheduler\RecurringMessage;
use Symfony\Component\Scheduler\Schedule;
use Symfony\Component\Scheduler\ScheduleProviderInterface;
use Symfony\Component\Scheduler\Trigger\TriggerInterface;
use Symfony\Contracts\Service\ServiceLocatorTrait;
class ScheduleTransportFactoryTest extends TestCase
class SchedulerTransportFactoryTest extends TestCase
{
public function testCreateTransport()
{
$trigger = $this->createMock(TriggerInterface::class);
$serializer = $this->createMock(SerializerInterface::class);
$clock = $this->createMock(ClockInterface::class);
$container = new class() extends \ArrayObject implements ScheduleConfigLocatorInterface {
public function get(string $id): ScheduleConfig
{
return $this->offsetGet($id);
}
public function has(string $id): bool
{
return $this->offsetExists($id);
}
};
$defaultRecurringMessage = RecurringMessage::trigger($trigger, (object) ['id' => 'default']);
$customRecurringMessage = RecurringMessage::trigger($trigger, (object) ['id' => 'custom']);
$stateFactory = $this->createMock(StateFactoryInterface::class);
$stateFactory
->expects($this->exactly(2))
->method('create')
->withConsecutive(
['default', ['cache' => null, 'lock' => null]],
['custom', ['cache' => 'app', 'lock' => null]]
)
->willReturn($state = $this->createMock(StateInterface::class));
$default = new SchedulerTransport(new MessageGenerator((new SomeScheduleProvider([$defaultRecurringMessage]))->getSchedule(), 'default', $clock));
$custom = new SchedulerTransport(new MessageGenerator((new SomeScheduleProvider([$customRecurringMessage]))->getSchedule(), 'custom', $clock));
$container['default'] = new ScheduleConfig([[$trigger, (object) ['id' => 'default']]]);
$container['custom'] = new ScheduleConfig([[$trigger, (object) ['id' => 'custom']]]);
$default = new ScheduleTransport(new Schedule($clock, $state, $container['default']));
$custom = new ScheduleTransport(new Schedule($clock, $state, $container['custom']));
$factory = new ScheduleTransportFactory($clock, $container, $stateFactory);
$factory = new SchedulerTransportFactory(
new Container([
'default' => fn () => (new SomeScheduleProvider([$defaultRecurringMessage]))->getSchedule(),
'custom' => fn () => (new SomeScheduleProvider([$customRecurringMessage]))->getSchedule(),
]),
$clock,
);
$this->assertEquals($default, $factory->createTransport('schedule://default', [], $serializer));
$this->assertEquals($custom, $factory->createTransport('schedule://custom', ['cache' => 'app'], $serializer));
@@ -84,16 +71,6 @@ class ScheduleTransportFactoryTest extends TestCase
$factory->createTransport('schedule://', [], $this->createMock(SerializerInterface::class));
}
public function testInvalidOption()
{
$factory = $this->makeTransportFactoryWithStubs();
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('Invalid option(s) "invalid" passed to the Schedule Messenger transport.');
$factory->createTransport('schedule://name', ['invalid' => true], $this->createMock(SerializerInterface::class));
}
public function testNotFound()
{
$factory = $this->makeTransportFactoryWithStubs();
@@ -114,12 +91,31 @@ class ScheduleTransportFactoryTest extends TestCase
$this->assertFalse($factory->supports('string', []));
}
private function makeTransportFactoryWithStubs(): ScheduleTransportFactory
private function makeTransportFactoryWithStubs(): SchedulerTransportFactory
{
return new ScheduleTransportFactory(
return new SchedulerTransportFactory(
new Container([
'default' => fn () => $this->createMock(ScheduleProviderInterface::class),
]),
$this->createMock(ClockInterface::class),
$this->createMock(ScheduleConfigLocatorInterface::class),
$this->createMock(StateFactoryInterface::class)
);
}
}
class SomeScheduleProvider implements ScheduleProviderInterface
{
public function __construct(
private array $messages,
) {
}
public function getSchedule(): Schedule
{
return (new Schedule())->add(...$this->messages);
}
}
class Container implements ContainerInterface
{
use ServiceLocatorTrait;
}

View File

@@ -13,12 +13,12 @@ namespace Symfony\Component\Scheduler\Tests\Messenger;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Scheduler\Exception\LogicMessengerException;
use Symfony\Component\Scheduler\Exception\LogicException;
use Symfony\Component\Scheduler\Generator\MessageGeneratorInterface;
use Symfony\Component\Scheduler\Messenger\ScheduledStamp;
use Symfony\Component\Scheduler\Messenger\ScheduleTransport;
use Symfony\Component\Scheduler\Schedule\ScheduleInterface;
use Symfony\Component\Scheduler\Messenger\SchedulerTransport;
class ScheduleTransportTest extends TestCase
class SchedulerTransportTest extends TestCase
{
public function testGetFromIterator()
{
@@ -26,10 +26,10 @@ class ScheduleTransportTest extends TestCase
(object) ['id' => 'first'],
(object) ['id' => 'second'],
];
$scheduler = $this->createConfiguredMock(ScheduleInterface::class, [
$generator = $this->createConfiguredMock(MessageGeneratorInterface::class, [
'getMessages' => $messages,
]);
$transport = new ScheduleTransport($scheduler);
$transport = new SchedulerTransport($generator);
foreach ($transport->get() as $envelope) {
$this->assertInstanceOf(Envelope::class, $envelope);
@@ -42,26 +42,25 @@ class ScheduleTransportTest extends TestCase
public function testAckIgnored()
{
$transport = new ScheduleTransport($this->createMock(ScheduleInterface::class));
$transport = new SchedulerTransport($this->createMock(MessageGeneratorInterface::class));
$this->expectNotToPerformAssertions();
$transport->ack(new Envelope(new \stdClass()));
$this->assertTrue(true); // count coverage
}
public function testRejectException()
{
$transport = new ScheduleTransport($this->createMock(ScheduleInterface::class));
$transport = new SchedulerTransport($this->createMock(MessageGeneratorInterface::class));
$this->expectException(LogicMessengerException::class);
$this->expectException(LogicException::class);
$transport->reject(new Envelope(new \stdClass()));
}
public function testSendException()
{
$transport = new ScheduleTransport($this->createMock(ScheduleInterface::class));
$transport = new SchedulerTransport($this->createMock(MessageGeneratorInterface::class));
$this->expectException(LogicMessengerException::class);
$this->expectException(LogicException::class);
$transport->send(new Envelope(new \stdClass()));
}
}

View File

@@ -1,61 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Tests\Schedule;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Scheduler\Schedule\ScheduleConfig;
use Symfony\Component\Scheduler\Trigger\TriggerInterface;
class ScheduleConfigTest extends TestCase
{
public function testEmpty()
{
$config = new ScheduleConfig();
$this->assertSame([], $config->getSchedule());
}
public function testAdd()
{
$config = new ScheduleConfig();
$config->add($t1 = $this->createMock(TriggerInterface::class), $o1 = (object) ['name' => 'first']);
$config->add($t2 = $this->createMock(TriggerInterface::class), $o2 = (object) ['name' => 'second']);
$expected = [
[$t1, $o1],
[$t2, $o2],
];
$this->assertSame($expected, $config->getSchedule());
}
public function testFromIterator()
{
$expected = [
[$this->createMock(TriggerInterface::class), (object) ['name' => 'first']],
[$this->createMock(TriggerInterface::class), (object) ['name' => 'second']],
];
$config = new ScheduleConfig(new \ArrayObject($expected));
$this->assertSame($expected, $config->getSchedule());
}
public function testFromBadIterator()
{
$this->expectException(\TypeError::class);
$this->expectExceptionMessage('must be of type Symfony\Component\Scheduler\Trigger\TriggerInterface');
new ScheduleConfig([new \ArrayObject(['wrong'])]);
}
}

View File

@@ -1,115 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Tests\State;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Cache\Adapter\ArrayAdapter;
use Symfony\Component\Scheduler\State\CacheStateDecorator;
use Symfony\Component\Scheduler\State\State;
use Symfony\Component\Scheduler\State\StateInterface;
use Symfony\Contracts\Cache\CacheInterface;
class CacheStateDecoratorTest extends TestCase
{
private ArrayAdapter $cache;
private State $inner;
private CacheStateDecorator $state;
private \DateTimeImmutable $now;
protected function setUp(): void
{
$this->cache = new ArrayAdapter(storeSerialized: false);
$this->inner = new State();
$this->state = new CacheStateDecorator($this->inner, $this->cache, 'cache');
$this->now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
}
public function testInitStateOnFirstAcquiring()
{
[$cache, $state, $now] = [$this->cache, $this->state, $this->now];
$this->assertTrue($state->acquire($now));
$this->assertEquals($now, $state->time());
$this->assertEquals(-1, $state->index());
$this->assertEquals([$now, -1], $cache->get('cache', fn () => []));
}
public function testLoadStateOnAcquiring()
{
[$cache, $inner, $state, $now] = [$this->cache, $this->inner, $this->state, $this->now];
$cache->get('cache', fn () => [$now, 0], \INF);
$this->assertTrue($state->acquire($now->modify('1 min')));
$this->assertEquals($now, $state->time());
$this->assertEquals(0, $state->index());
$this->assertEquals([$now, 0], $cache->get('cache', fn () => []));
}
public function testCannotAcquereIfInnerAcquered()
{
$inner = $this->createMock(StateInterface::class);
$inner->method('acquire')->willReturn(false);
$state = new CacheStateDecorator($inner, $this->cache, 'cache');
$this->assertFalse($state->acquire($this->now));
}
public function testSave()
{
[$cache, $inner, $state, $now] = [$this->cache, $this->inner, $this->state, $this->now];
$state->acquire($now->modify('-1 hour'));
$state->save($now, 3);
$this->assertSame($now, $state->time());
$this->assertSame(3, $state->index());
$this->assertSame($inner->time(), $state->time());
$this->assertSame($inner->index(), $state->index());
$this->assertSame([$now, 3], $cache->get('cache', fn () => []));
}
public function testRelease()
{
$now = $this->now;
$later = $now->modify('1 min');
$cache = $this->createMock(CacheInterface::class);
$inner = $this->createMock(StateInterface::class);
$inner->expects($this->once())->method('release')->with($now, $later);
$state = new CacheStateDecorator($inner, $cache, 'cache');
$state->release($now, $later);
}
public function testFullCycle()
{
[$cache, $inner, $state, $now] = [$this->cache, $this->inner, $this->state, $this->now];
// init
$cache->get('cache', fn () => [$now->modify('-1 min'), 3], \INF);
// action
$acquired = $state->acquire($now);
$lastTime = $state->time();
$lastIndex = $state->index();
$state->save($now, 0);
$state->release($now, null);
// asserting
$this->assertTrue($acquired);
$this->assertEquals($now->modify('-1 min'), $lastTime);
$this->assertSame(3, $lastIndex);
$this->assertEquals($now, $inner->time());
$this->assertSame(0, $inner->index());
$this->assertEquals([$now, 0], $cache->get('cache', fn () => []));
}
}

View File

@@ -1,172 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Tests\State;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Lock\Key;
use Symfony\Component\Lock\Lock;
use Symfony\Component\Lock\LockInterface;
use Symfony\Component\Lock\Store\InMemoryStore;
use Symfony\Component\Scheduler\State\LockStateDecorator;
use Symfony\Component\Scheduler\State\State;
class LockStateDecoratorTest extends TestCase
{
private InMemoryStore $store;
private Lock $lock;
private State $inner;
private LockStateDecorator $state;
private \DateTimeImmutable $now;
protected function setUp(): void
{
$this->store = new InMemoryStore();
$this->lock = new Lock(new Key('lock'), $this->store);
$this->inner = new State();
$this->state = new LockStateDecorator($this->inner, $this->lock);
$this->now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
}
public function testSave()
{
[$inner, $state, $now] = [$this->inner, $this->state, $this->now];
$state->acquire($now->modify('-1 hour'));
$state->save($now, 3);
$this->assertSame($now, $state->time());
$this->assertSame(3, $state->index());
$this->assertSame($inner->time(), $state->time());
$this->assertSame($inner->index(), $state->index());
}
public function testInitStateOnFirstAcquiring()
{
[$lock, $state, $now] = [$this->lock, $this->state, $this->now];
$this->assertTrue($state->acquire($now));
$this->assertEquals($now, $state->time());
$this->assertEquals(-1, $state->index());
$this->assertTrue($lock->isAcquired());
}
public function testLoadStateOnAcquiring()
{
[$lock, $inner, $state, $now] = [$this->lock, $this->inner, $this->state, $this->now];
$inner->save($now, 0);
$this->assertTrue($state->acquire($now->modify('1 min')));
$this->assertEquals($now, $state->time());
$this->assertEquals(0, $state->index());
$this->assertTrue($lock->isAcquired());
}
public function testCannotAcquereIfLocked()
{
[$state, $now] = [$this->state, $this->now];
$this->concurrentLock();
$this->assertFalse($state->acquire($now));
}
public function testResetStateAfterLockedAcquiring()
{
[$lock, $inner, $state, $now] = [$this->lock, $this->inner, $this->state, $this->now];
$concurrentLock = $this->concurrentLock();
$inner->save($now->modify('-2 min'), 0);
$state->acquire($now->modify('-1 min'));
$concurrentLock->release();
$this->assertTrue($state->acquire($now));
$this->assertEquals($now, $state->time());
$this->assertEquals(-1, $state->index());
$this->assertTrue($lock->isAcquired());
$this->assertFalse($concurrentLock->isAcquired());
}
public function testKeepLock()
{
[$lock, $state, $now] = [$this->lock, $this->state, $this->now];
$state->acquire($now->modify('-1 min'));
$state->release($now, $now->modify('1 min'));
$this->assertTrue($lock->isAcquired());
}
public function testReleaseLock()
{
[$lock, $state, $now] = [$this->lock, $this->state, $this->now];
$state->acquire($now->modify('-1 min'));
$state->release($now, null);
$this->assertFalse($lock->isAcquired());
}
public function testRefreshLock()
{
$lock = $this->createMock(LockInterface::class);
$lock->method('acquire')->willReturn(true);
$lock->method('getRemainingLifetime')->willReturn(120.0);
$lock->expects($this->once())->method('refresh')->with(120.0 + 60.0);
$lock->expects($this->never())->method('release');
$state = new LockStateDecorator(new State(), $lock);
$now = $this->now;
$state->acquire($now->modify('-10 sec'));
$state->release($now, $now->modify('60 sec'));
}
public function testFullCycle()
{
[$lock, $inner, $state, $now] = [$this->lock, $this->inner, $this->state, $this->now];
// init
$inner->save($now->modify('-1 min'), 3);
// action
$acquired = $state->acquire($now);
$lastTime = $state->time();
$lastIndex = $state->index();
$state->save($now, 0);
$state->release($now, null);
// asserting
$this->assertTrue($acquired);
$this->assertEquals($now->modify('-1 min'), $lastTime);
$this->assertSame(3, $lastIndex);
$this->assertEquals($now, $inner->time());
$this->assertSame(0, $inner->index());
$this->assertFalse($lock->isAcquired());
}
// No need to unlock after test, because the `InMemoryStore` is deleted
private function concurrentLock(): Lock
{
$lock = new Lock(
key: new Key('lock'),
store: $this->store,
autoRelease: false
);
if (!$lock->acquire()) {
throw new \LogicException('Already locked.');
}
return $lock;
}
}

View File

@@ -1,176 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Tests\State;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\Store\InMemoryStore;
use Symfony\Component\Scheduler\Exception\LogicException;
use Symfony\Component\Scheduler\State\CacheStateDecorator;
use Symfony\Component\Scheduler\State\LockStateDecorator;
use Symfony\Component\Scheduler\State\State;
use Symfony\Component\Scheduler\State\StateFactory;
use Symfony\Contracts\Cache\CacheInterface;
class StateFactoryTest extends TestCase
{
public function testCreateSimple()
{
$factory = new StateFactory(
$this->makeContainer([]),
$this->makeContainer([])
);
$expected = new State();
$this->assertEquals($expected, $factory->create('name', []));
}
public function testCreateWithCache()
{
$cache = $this->createMock(CacheInterface::class);
$cache->method('get')->willReturnCallback(fn ($key, \Closure $f) => $f());
$factory = new StateFactory(
$this->makeContainer([]),
$this->makeContainer(['app' => $cache]),
);
$state = new State();
$expected = new CacheStateDecorator($state, $cache, 'messenger.schedule.name');
$this->assertEquals($expected, $factory->create('name', ['cache' => 'app']));
}
public function testCreateWithCacheAndLock()
{
$cache = $this->createMock(CacheInterface::class);
$cache->method('get')->willReturnCallback(fn ($key, \Closure $f) => $f());
$lockFactory = new LockFactory(new InMemoryStore());
$factory = new StateFactory(
$this->makeContainer(['unlock' => $lockFactory]),
$this->makeContainer(['app' => $cache]),
);
$lock = $lockFactory->createLock($name = 'messenger.schedule.name');
$state = new State();
$state = new LockStateDecorator($state, $lock);
$expected = new CacheStateDecorator($state, $cache, $name);
$this->assertEquals($expected, $factory->create('name', ['cache' => 'app', 'lock' => 'unlock']));
}
public function testCreateWithConfiguredLock()
{
$cache = $this->createMock(CacheInterface::class);
$cache->method('get')->willReturnCallback(fn ($key, \Closure $f) => $f());
$lockFactory = new LockFactory(new InMemoryStore());
$factory = new StateFactory(
$this->makeContainer(['unlock' => $lockFactory]),
$this->makeContainer([]),
);
$lock = $lockFactory->createLock('messenger.schedule.name', $ttl = 77.7, false);
$state = new State();
$expected = new LockStateDecorator($state, $lock);
$cfg = [
'resource' => 'unlock',
'ttl' => $ttl,
'auto_release' => false,
];
$this->assertEquals($expected, $factory->create('name', ['lock' => $cfg]));
}
public function testInvalidCacheName()
{
$factory = new StateFactory(
$this->makeContainer([]),
$this->makeContainer([])
);
$this->expectException(LogicException::class);
$this->expectExceptionMessage('The cache pool "wrong-cache" does not exist.');
$factory->create('name', ['cache' => 'wrong-cache']);
}
public function testInvalidLockName()
{
$factory = new StateFactory(
$this->makeContainer([]),
$this->makeContainer([])
);
$this->expectException(LogicException::class);
$this->expectExceptionMessage('The lock resource "wrong-lock" does not exist.');
$factory->create('name', ['lock' => 'wrong-lock']);
}
public function testInvalidConfiguredLockName()
{
$factory = new StateFactory(
$this->makeContainer([]),
$this->makeContainer([])
);
$this->expectException(LogicException::class);
$this->expectExceptionMessage('The lock resource "wrong-lock" does not exist.');
$factory->create('name', ['lock' => ['resource' => 'wrong-lock']]);
}
public function testInvalidCacheOption()
{
$factory = new StateFactory(
$this->makeContainer([]),
$this->makeContainer([]),
);
$this->expectException(LogicException::class);
$this->expectExceptionMessage('Invalid cache configuration for "default" schedule.');
$factory->create('default', ['cache' => true]);
}
public function testInvalidLockOption()
{
$factory = new StateFactory(
$this->makeContainer([]),
$this->makeContainer([]),
);
$this->expectException(LogicException::class);
$this->expectExceptionMessage('Invalid lock configuration for "default" schedule.');
$factory->create('default', ['lock' => true]);
}
private function makeContainer(array $services): ContainerInterface|\ArrayObject
{
return new class($services) extends \ArrayObject implements ContainerInterface {
public function get(string $id): mixed
{
return $this->offsetGet($id);
}
public function has(string $id): bool
{
return $this->offsetExists($id);
}
};
}
}

View File

@@ -1,36 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Tests\State;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Scheduler\State\State;
class StateTest extends TestCase
{
public function testState()
{
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
$later = $now->modify('1 hour');
$state = new State();
$this->assertTrue($state->acquire($now));
$this->assertSame($now, $state->time());
$this->assertSame(-1, $state->index());
$state->save($later, 7);
$this->assertSame($later, $state->time());
$this->assertSame(7, $state->index());
$state->release($later, null);
}
}

View File

@@ -21,7 +21,7 @@ class ExcludeTimeTriggerTest extends TestCase
{
$inner = $this->createMock(TriggerInterface::class);
$inner
->method('nextTo')
->method('getNextRunDate')
->willReturnCallback(static fn (\DateTimeImmutable $d) => $d->modify('+1 sec'));
$scheduled = new ExcludeTimeTrigger(
@@ -30,9 +30,9 @@ class ExcludeTimeTriggerTest extends TestCase
new \DateTimeImmutable('2020-02-20T20:20:20Z')
);
$this->assertEquals(new \DateTimeImmutable('2020-02-20T02:02:01Z'), $scheduled->nextTo(new \DateTimeImmutable('2020-02-20T02:02:00Z')));
$this->assertEquals(new \DateTimeImmutable('2020-02-20T20:20:21Z'), $scheduled->nextTo(new \DateTimeImmutable('2020-02-20T02:02:02Z')));
$this->assertEquals(new \DateTimeImmutable('2020-02-20T20:20:21Z'), $scheduled->nextTo(new \DateTimeImmutable('2020-02-20T20:20:20Z')));
$this->assertEquals(new \DateTimeImmutable('2020-02-20T22:22:23Z'), $scheduled->nextTo(new \DateTimeImmutable('2020-02-20T22:22:22Z')));
$this->assertEquals(new \DateTimeImmutable('2020-02-20T02:02:01Z'), $scheduled->getNextRunDate(new \DateTimeImmutable('2020-02-20T02:02:00Z')));
$this->assertEquals(new \DateTimeImmutable('2020-02-20T20:20:21Z'), $scheduled->getNextRunDate(new \DateTimeImmutable('2020-02-20T02:02:02Z')));
$this->assertEquals(new \DateTimeImmutable('2020-02-20T20:20:21Z'), $scheduled->getNextRunDate(new \DateTimeImmutable('2020-02-20T20:20:20Z')));
$this->assertEquals(new \DateTimeImmutable('2020-02-20T22:22:23Z'), $scheduled->getNextRunDate(new \DateTimeImmutable('2020-02-20T22:22:22Z')));
}
}

View File

@@ -1,29 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Tests\Trigger;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Scheduler\Trigger\OnceTrigger;
class OnceTriggerTest extends TestCase
{
public function testNextTo()
{
$time = new \DateTimeImmutable('2020-02-20 20:00:00');
$schedule = new OnceTrigger($time);
$this->assertEquals($time, $schedule->nextTo(new \DateTimeImmutable('@0'), ''));
$this->assertEquals($time, $schedule->nextTo($time->modify('-1 sec'), ''));
$this->assertNull($schedule->nextTo($time, ''));
$this->assertNull($schedule->nextTo($time->modify('+1 sec'), ''));
}
}

View File

@@ -17,7 +17,15 @@ use Symfony\Component\Scheduler\Trigger\PeriodicalTrigger;
class PeriodicalTriggerTest extends TestCase
{
public function providerNextTo(): iterable
/**
* @dataProvider providerGetNextRunDate
*/
public function testGetNextRunDate(PeriodicalTrigger $periodicalMessage, \DateTimeImmutable $lastRun, ?\DateTimeImmutable $expected)
{
$this->assertEquals($expected, $periodicalMessage->getNextRunDate($lastRun));
}
public static function providerGetNextRunDate(): iterable
{
$periodicalMessage = new PeriodicalTrigger(
600,
@@ -79,14 +87,6 @@ class PeriodicalTriggerTest extends TestCase
];
}
/**
* @dataProvider providerNextTo
*/
public function testNextTo(PeriodicalTrigger $periodicalMessage, \DateTimeImmutable $lastRun, ?\DateTimeImmutable $expected)
{
$this->assertEquals($expected, $periodicalMessage->nextTo($lastRun));
}
public function testConstructors()
{
$firstRun = new \DateTimeImmutable($now = '2222-02-22');
@@ -115,14 +115,6 @@ class PeriodicalTriggerTest extends TestCase
PeriodicalTrigger::create(\PHP_INT_MAX.'0', new \DateTimeImmutable('2002-02-02'));
}
public function getInvalidIntervals(): iterable
{
yield ['wrong'];
yield ['3600.5'];
yield [0];
yield [-3600];
}
/**
* @dataProvider getInvalidIntervals
*/
@@ -132,6 +124,14 @@ class PeriodicalTriggerTest extends TestCase
PeriodicalTrigger::create($interval, $now = new \DateTimeImmutable(), $now->modify('1 day'));
}
public static function getInvalidIntervals(): iterable
{
yield ['wrong'];
yield ['3600.5'];
yield [0];
yield [-3600];
}
public function testNegativeInterval()
{
$this->expectException(InvalidArgumentException::class);

View File

@@ -0,0 +1,32 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Trigger;
/**
* @author Fabien Potencier <fabien@symfony.com>
*
* @experimental
*/
final class CallbackTrigger implements TriggerInterface
{
private \Closure $callback;
public function __construct(callable $callback)
{
$this->callback = $callback(...);
}
public function getNextRunDate(\DateTimeImmutable $run): ?\DateTimeImmutable
{
return ($this->callback)($run);
}
}

View File

@@ -0,0 +1,44 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Trigger;
use Cron\CronExpression;
use Symfony\Component\Scheduler\Exception\LogicException;
/**
* Use cron expressions to describe a periodical trigger.
*
* @author Fabien Potencier <fabien@symfony.com>
*
* @experimental
*/
final class CronExpressionTrigger implements TriggerInterface
{
public function __construct(
private CronExpression $expression = new CronExpression('* * * * *'),
) {
}
public static function fromSpec(string $expression = '* * * * *'): self
{
if (!class_exists(CronExpression::class)) {
throw new LogicException(sprintf('You cannot use "%s" as the "cron expression" package is not installed; try running "composer require dragonmantank/cron-expression".', __CLASS__));
}
return new self(new CronExpression($expression));
}
public function getNextRunDate(\DateTimeImmutable $run): ?\DateTimeImmutable
{
return \DateTimeImmutable::createFromMutable($this->expression->getNextRunDate($run));
}
}

View File

@@ -11,6 +11,9 @@
namespace Symfony\Component\Scheduler\Trigger;
/**
* @experimental
*/
final class ExcludeTimeTrigger implements TriggerInterface
{
public function __construct(
@@ -20,11 +23,11 @@ final class ExcludeTimeTrigger implements TriggerInterface
) {
}
public function nextTo(\DateTimeImmutable $run): \DateTimeImmutable
public function getNextRunDate(\DateTimeImmutable $run): ?\DateTimeImmutable
{
$nextRun = $this->inner->nextTo($run);
$nextRun = $this->inner->getNextRunDate($run);
if ($nextRun >= $this->from && $nextRun <= $this->to) {
return $this->inner->nextTo($this->to);
return $this->inner->getNextRunDate($this->to);
}
return $nextRun;

View File

@@ -1,25 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Scheduler\Trigger;
final class OnceTrigger implements TriggerInterface
{
public function __construct(
private readonly \DateTimeImmutable $time,
) {
}
public function nextTo(\DateTimeImmutable $run): ?\DateTimeImmutable
{
return $run < $this->time ? $this->time : null;
}
}

View File

@@ -13,21 +13,24 @@ namespace Symfony\Component\Scheduler\Trigger;
use Symfony\Component\Scheduler\Exception\InvalidArgumentException;
/**
* @experimental
*/
final class PeriodicalTrigger implements TriggerInterface
{
public function __construct(
private readonly int $intervalInSeconds,
private readonly \DateTimeImmutable $firstRun,
private readonly \DateTimeImmutable $priorTo,
private readonly \DateTimeImmutable $firstRun = new \DateTimeImmutable(),
private readonly \DateTimeImmutable $priorTo = new \DateTimeImmutable('3000-01-01'),
) {
if (0 >= $this->intervalInSeconds) {
throw new InvalidArgumentException('The `$intervalInSeconds` argument must be greater then zero.');
throw new InvalidArgumentException('The "$intervalInSeconds" argument must be greater then zero.');
}
}
public static function create(
string|int|\DateInterval $interval,
string|\DateTimeImmutable $firstRun,
string|\DateTimeImmutable $firstRun = new \DateTimeImmutable(),
string|\DateTimeImmutable $priorTo = new \DateTimeImmutable('3000-01-01'),
): self {
if (\is_string($firstRun)) {
@@ -67,6 +70,24 @@ final class PeriodicalTrigger implements TriggerInterface
return new self($interval, $firstRun, $priorTo);
}
public function getNextRunDate(\DateTimeImmutable $run): ?\DateTimeImmutable
{
if ($this->firstRun > $run) {
return $this->firstRun;
}
if ($this->priorTo <= $run) {
return null;
}
$delta = $run->format('U.u') - $this->firstRun->format('U.u');
$recurrencesPassed = (int) ($delta / $this->intervalInSeconds);
$nextRunTimestamp = ($recurrencesPassed + 1) * $this->intervalInSeconds + $this->firstRun->getTimestamp();
/** @var \DateTimeImmutable $nextRun */
$nextRun = \DateTimeImmutable::createFromFormat('U.u', $nextRunTimestamp.$this->firstRun->format('.u'));
return $this->priorTo > $nextRun ? $nextRun : null;
}
private static function calcInterval(\DateTimeImmutable $from, \DateTimeImmutable $to): int
{
if (8 <= \PHP_INT_SIZE) {
@@ -84,29 +105,7 @@ final class PeriodicalTrigger implements TriggerInterface
private static function ensureIntervalSize(string|float $interval): void
{
if ($interval > \PHP_INT_MAX) {
throw new InvalidArgumentException('The interval for a periodical message is too big. If you need to run it once, use `$priorTo` argument.');
throw new InvalidArgumentException('The interval for a periodical message is too big. If you need to run it once, use "$priorTo" argument.');
}
}
public function nextTo(\DateTimeImmutable $run): ?\DateTimeImmutable
{
if ($this->firstRun > $run) {
return $this->firstRun;
}
if ($this->priorTo <= $run) {
return null;
}
$delta = (float) $run->format('U.u') - (float) $this->firstRun->format('U.u');
$recurrencesPassed = (int) ($delta / $this->intervalInSeconds);
$nextRunTimestamp = ($recurrencesPassed + 1) * $this->intervalInSeconds + $this->firstRun->getTimestamp();
/** @var \DateTimeImmutable $nextRun */
$nextRun = \DateTimeImmutable::createFromFormat('U.u', $nextRunTimestamp.$this->firstRun->format('.u'));
if ($this->priorTo <= $nextRun) {
return null;
}
return $nextRun;
}
}

View File

@@ -11,7 +11,10 @@
namespace Symfony\Component\Scheduler\Trigger;
/**
* @experimental
*/
interface TriggerInterface
{
public function nextTo(\DateTimeImmutable $run): ?\DateTimeImmutable;
public function getNextRunDate(\DateTimeImmutable $run): ?\DateTimeImmutable;
}

View File

@@ -1,7 +1,7 @@
{
"name": "symfony/scheduler",
"type": "library",
"description": "Provides basic scheduling through the Symfony Messenger",
"description": "Provides scheduling through Symfony Messenger",
"keywords": ["scheduler", "schedule", "cron"],
"homepage": "https://symfony.com",
"license": "MIT",
@@ -10,6 +10,10 @@
"name": "Sergey Rabochiy",
"email": "upyx.00@gmail.com"
},
{
"name": "Fabien Potencier",
"email": "fabien@symfony.com"
},
{
"name": "Symfony Community",
"homepage": "https://symfony.com/contributors"
@@ -17,20 +21,14 @@
],
"require": {
"php": ">=8.1",
"psr/clock-implementation": "^1.0",
"psr/container-implementation": "^1.1|^2.0",
"symfony/cache-implementation": "^1.1|^2.0|^3.0",
"symfony/messenger": "^5.4|^6.0"
"symfony/clock": "^6.3"
},
"require-dev": {
"dragonmantank/cron-expression": "^3",
"symfony/cache": "^5.4|^6.0",
"symfony/clock": "^6.2",
"symfony/dependency-injection": "^5.4|^6.0",
"symfony/lock": "^5.4|^6.0"
},
"suggest": {
"symfony/cache": "For saving state between restarts.",
"symfony/lock": "For preventing workers race."
"symfony/lock": "^5.4|^6.0",
"symfony/messenger": "^6.3"
},
"autoload": {
"psr-4": { "Symfony\\Component\\Scheduler\\": "" },