2 Commits

Author SHA1 Message Date
Jérémy J
83f00e79d3 php-cs-fixer 2026-02-23 16:59:28 +01:00
Jérémy J
ee5703f0ab Added cleanup commands 2026-02-23 16:55:22 +01:00
12 changed files with 131 additions and 5 deletions

View File

@@ -1,3 +1,6 @@
# Version 5.5.0
* Added cleanup commands
# Version 5.4.1
* Fix File Exceptions integration

View File

@@ -243,7 +243,6 @@ implementing `DataflowTypeInterface`.
Otherwise, manually add the tag `coderhapsodie.dataflow.type` in your dataflow type service configuration:
```yaml
```yaml
CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType:
tags:
@@ -598,6 +597,10 @@ the messenger component instead.
`code-rhapsodie:dataflow:dump-schema` Generates schema create / update SQL queries
`code-rhapsodie:dataflow:set_crashed` Jobs that have been in the "running" status for too long will be set in the "crashed" status.
`code-rhapsodie:dataflow:job_cleanup` Remove old completed or crashed jobs
### Work with many databases
All commands have a `--connection` option to define what Doctrine DBAL connection to use during execution.

View File

@@ -0,0 +1,32 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
#[AsCommand(name: 'code-rhapsodie:job_cleanup', description: 'Cleanup job history.')]
class JobCleanupCommand extends Command
{
public function __construct(private JobRepository $jobRepository, private int $retention)
{
parent::__construct();
}
protected function configure()
{
$this->setHelp('Job retention can be configured with the "job_history.retention" configuration.');
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->jobRepository->deleteOld($this->retention);
return Command::SUCCESS;
}
}

View File

@@ -93,7 +93,7 @@ class JobShowCommand extends Command
$io->table(['Field', 'Value'], $display);
if ($input->getOption('details')) {
$io->section('Exceptions');
$exceptions = array_map(fn (string $exception) => substr($exception, 0, 900).'…', $job->getExceptions());
$exceptions = array_map(static fn (string $exception) => substr($exception, 0, 900).'…', $job->getExceptions());
$io->write($exceptions);
}

View File

@@ -38,7 +38,7 @@ class SchemaCommand extends Command
// add -- before each keys
$options = array_combine(
array_map(fn ($key) => '--'.$key, array_keys($options)),
array_map(static fn ($key) => '--'.$key, array_keys($options)),
array_values($options)
);

View File

@@ -0,0 +1,32 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
#[AsCommand(name: 'code-rhapsodie:set_crashed', description: 'Set long running jobs as crashed.')]
class SetCrashedCommand extends Command
{
public function __construct(private JobRepository $jobRepository, private int $crashedDelay)
{
parent::__construct();
}
protected function configure()
{
$this->setHelp('How long jobs have to run before they are set as crashed can be configured with the "job_history.crashed_delay" configuration.');
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->jobRepository->crashLongRunning($this->crashedDelay);
return Command::SUCCESS;
}
}

View File

@@ -67,7 +67,7 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
*/
public function setAfterItemProcessors(array $processors): self
{
$this->afterItemProcessors = array_map(fn (callable $callable) => \Closure::fromCallable($callable), $processors);
$this->afterItemProcessors = array_map(static fn (callable $callable) => \Closure::fromCallable($callable), $processors);
return $this;
}

View File

@@ -30,6 +30,8 @@ class CodeRhapsodieDataflowExtension extends Extension
$container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']);
$container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']);
$container->setParameter('coderhapsodie.dataflow.exceptions_mode.type', $config['exceptions_mode']['type']);
$container->setParameter('coderhapsodie.dataflow.job_history.retention', $config['job_history']['retention']);
$container->setParameter('coderhapsodie.dataflow.job_history.crashed_delay', $config['job_history']['crashed_delay']);
if ($config['exceptions_mode']['type'] === 'file') {
$container->setParameter('coderhapsodie.dataflow.flysystem_service', $config['exceptions_mode']['flysystem_service']);

View File

@@ -52,6 +52,20 @@ class Configuration implements ConfigurationInterface
->thenInvalid('You need "league/flysystem" to use Dataflow file exception mode.')
->end()
->end()
->arrayNode('job_history')
->addDefaultsIfNotSet()
->children()
->integerNode('retention')
->defaultValue(30)
->min(0)
->info('How many days completed and crashed jobs are kept when running the cleanup command.')
->end()
->integerNode('crashed_delay')
->defaultValue(24)
->min(24)
->info('Jobs running for more than this many hours will be set as crashed when running the cleanup command.')
->end()
->end()
->end()
;

View File

@@ -17,6 +17,7 @@ class Job
public const STATUS_RUNNING = 1;
public const STATUS_COMPLETED = 2;
public const STATUS_QUEUED = 3;
public const STATUS_CRASHED = 4;
private const KEYS = [
'id',

View File

@@ -151,6 +151,33 @@ class JobRepository
return $qb;
}
public function crashLongRunning(int $hours): void
{
$qb = $this->connection->createQueryBuilder();
$qb->update(static::TABLE_NAME, 'j')
->set('j.status', ':new_status')
->set('j.end_time', ':now')
->andWhere('j.status = :status')
->andWhere('j.start_time < :date')
->setParameter('status', Job::STATUS_RUNNING)
->setParameter('date', new \DateTime("- {$hours} hours"), 'datetime')
->setParameter('new_status', Job::STATUS_CRASHED)
->setParameter('now', new \DateTime(), 'datetime')
->executeStatement()
;
}
public function deleteOld(int $days): void
{
$qb = $this->connection->createQueryBuilder();
$qb->delete(static::TABLE_NAME, 'j')
->andWhere($qb->expr()->in('j.status', [Job::STATUS_COMPLETED, Job::STATUS_CRASHED]))
->andWhere('j.end_time < :date')
->setParameter('date', new \DateTime("- {$days} days"), 'datetime')
->executeStatement()
;
}
private function returnFirstOrNull(QueryBuilder $qb): ?Job
{
$stmt = $qb->executeQuery();

View File

@@ -3,7 +3,7 @@ services:
public: false
CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry'
CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry:
CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry: ~
CodeRhapsodie\DataflowBundle\Command\AddScheduledDataflowCommand:
arguments:
@@ -100,3 +100,15 @@ services:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$exceptionHandler: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface'
CodeRhapsodie\DataflowBundle\Command\JobCleanupCommand:
arguments:
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$retention: '%coderhapsodie.dataflow.job_history.retention%'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\SetCrashedCommand:
arguments:
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$crashedDelay: '%coderhapsodie.dataflow.job_history.crashed_delay%'
tags: ['console.command']