mirror of
https://github.com/code-rhapsodie/dataflow-bundle.git
synced 2026-03-24 06:42:23 +01:00
Compare commits
2 Commits
master
...
job_retent
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
83f00e79d3 | ||
|
|
ee5703f0ab |
@@ -1,3 +1,6 @@
|
||||
# Version 5.5.0
|
||||
* Added cleanup commands
|
||||
|
||||
# Version 5.4.1
|
||||
* Fix File Exceptions integration
|
||||
|
||||
|
||||
@@ -243,7 +243,6 @@ implementing `DataflowTypeInterface`.
|
||||
|
||||
Otherwise, manually add the tag `coderhapsodie.dataflow.type` in your dataflow type service configuration:
|
||||
|
||||
```yaml
|
||||
```yaml
|
||||
CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType:
|
||||
tags:
|
||||
@@ -598,6 +597,10 @@ the messenger component instead.
|
||||
|
||||
`code-rhapsodie:dataflow:dump-schema` Generates schema create / update SQL queries
|
||||
|
||||
`code-rhapsodie:dataflow:set_crashed` Jobs that have been in the "running" status for too long will be set in the "crashed" status.
|
||||
|
||||
`code-rhapsodie:dataflow:job_cleanup` Remove old completed or crashed jobs
|
||||
|
||||
### Work with many databases
|
||||
|
||||
All commands have a `--connection` option to define what Doctrine DBAL connection to use during execution.
|
||||
|
||||
32
src/Command/JobCleanupCommand.php
Normal file
32
src/Command/JobCleanupCommand.php
Normal file
@@ -0,0 +1,32 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Command;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
|
||||
use Symfony\Component\Console\Attribute\AsCommand;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
|
||||
#[AsCommand(name: 'code-rhapsodie:job_cleanup', description: 'Cleanup job history.')]
|
||||
class JobCleanupCommand extends Command
|
||||
{
|
||||
public function __construct(private JobRepository $jobRepository, private int $retention)
|
||||
{
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
protected function configure()
|
||||
{
|
||||
$this->setHelp('Job retention can be configured with the "job_history.retention" configuration.');
|
||||
}
|
||||
|
||||
protected function execute(InputInterface $input, OutputInterface $output): int
|
||||
{
|
||||
$this->jobRepository->deleteOld($this->retention);
|
||||
|
||||
return Command::SUCCESS;
|
||||
}
|
||||
}
|
||||
@@ -93,7 +93,7 @@ class JobShowCommand extends Command
|
||||
$io->table(['Field', 'Value'], $display);
|
||||
if ($input->getOption('details')) {
|
||||
$io->section('Exceptions');
|
||||
$exceptions = array_map(fn (string $exception) => substr($exception, 0, 900).'…', $job->getExceptions());
|
||||
$exceptions = array_map(static fn (string $exception) => substr($exception, 0, 900).'…', $job->getExceptions());
|
||||
|
||||
$io->write($exceptions);
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ class SchemaCommand extends Command
|
||||
|
||||
// add -- before each keys
|
||||
$options = array_combine(
|
||||
array_map(fn ($key) => '--'.$key, array_keys($options)),
|
||||
array_map(static fn ($key) => '--'.$key, array_keys($options)),
|
||||
array_values($options)
|
||||
);
|
||||
|
||||
|
||||
32
src/Command/SetCrashedCommand.php
Normal file
32
src/Command/SetCrashedCommand.php
Normal file
@@ -0,0 +1,32 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Command;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
|
||||
use Symfony\Component\Console\Attribute\AsCommand;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
|
||||
#[AsCommand(name: 'code-rhapsodie:set_crashed', description: 'Set long running jobs as crashed.')]
|
||||
class SetCrashedCommand extends Command
|
||||
{
|
||||
public function __construct(private JobRepository $jobRepository, private int $crashedDelay)
|
||||
{
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
protected function configure()
|
||||
{
|
||||
$this->setHelp('How long jobs have to run before they are set as crashed can be configured with the "job_history.crashed_delay" configuration.');
|
||||
}
|
||||
|
||||
protected function execute(InputInterface $input, OutputInterface $output): int
|
||||
{
|
||||
$this->jobRepository->crashLongRunning($this->crashedDelay);
|
||||
|
||||
return Command::SUCCESS;
|
||||
}
|
||||
}
|
||||
@@ -67,7 +67,7 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
|
||||
*/
|
||||
public function setAfterItemProcessors(array $processors): self
|
||||
{
|
||||
$this->afterItemProcessors = array_map(fn (callable $callable) => \Closure::fromCallable($callable), $processors);
|
||||
$this->afterItemProcessors = array_map(static fn (callable $callable) => \Closure::fromCallable($callable), $processors);
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
@@ -30,6 +30,8 @@ class CodeRhapsodieDataflowExtension extends Extension
|
||||
$container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']);
|
||||
$container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']);
|
||||
$container->setParameter('coderhapsodie.dataflow.exceptions_mode.type', $config['exceptions_mode']['type']);
|
||||
$container->setParameter('coderhapsodie.dataflow.job_history.retention', $config['job_history']['retention']);
|
||||
$container->setParameter('coderhapsodie.dataflow.job_history.crashed_delay', $config['job_history']['crashed_delay']);
|
||||
|
||||
if ($config['exceptions_mode']['type'] === 'file') {
|
||||
$container->setParameter('coderhapsodie.dataflow.flysystem_service', $config['exceptions_mode']['flysystem_service']);
|
||||
|
||||
@@ -52,6 +52,20 @@ class Configuration implements ConfigurationInterface
|
||||
->thenInvalid('You need "league/flysystem" to use Dataflow file exception mode.')
|
||||
->end()
|
||||
->end()
|
||||
->arrayNode('job_history')
|
||||
->addDefaultsIfNotSet()
|
||||
->children()
|
||||
->integerNode('retention')
|
||||
->defaultValue(30)
|
||||
->min(0)
|
||||
->info('How many days completed and crashed jobs are kept when running the cleanup command.')
|
||||
->end()
|
||||
->integerNode('crashed_delay')
|
||||
->defaultValue(24)
|
||||
->min(24)
|
||||
->info('Jobs running for more than this many hours will be set as crashed when running the cleanup command.')
|
||||
->end()
|
||||
->end()
|
||||
->end()
|
||||
;
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ class Job
|
||||
public const STATUS_RUNNING = 1;
|
||||
public const STATUS_COMPLETED = 2;
|
||||
public const STATUS_QUEUED = 3;
|
||||
public const STATUS_CRASHED = 4;
|
||||
|
||||
private const KEYS = [
|
||||
'id',
|
||||
|
||||
@@ -151,6 +151,33 @@ class JobRepository
|
||||
return $qb;
|
||||
}
|
||||
|
||||
public function crashLongRunning(int $hours): void
|
||||
{
|
||||
$qb = $this->connection->createQueryBuilder();
|
||||
$qb->update(static::TABLE_NAME, 'j')
|
||||
->set('j.status', ':new_status')
|
||||
->set('j.end_time', ':now')
|
||||
->andWhere('j.status = :status')
|
||||
->andWhere('j.start_time < :date')
|
||||
->setParameter('status', Job::STATUS_RUNNING)
|
||||
->setParameter('date', new \DateTime("- {$hours} hours"), 'datetime')
|
||||
->setParameter('new_status', Job::STATUS_CRASHED)
|
||||
->setParameter('now', new \DateTime(), 'datetime')
|
||||
->executeStatement()
|
||||
;
|
||||
}
|
||||
|
||||
public function deleteOld(int $days): void
|
||||
{
|
||||
$qb = $this->connection->createQueryBuilder();
|
||||
$qb->delete(static::TABLE_NAME, 'j')
|
||||
->andWhere($qb->expr()->in('j.status', [Job::STATUS_COMPLETED, Job::STATUS_CRASHED]))
|
||||
->andWhere('j.end_time < :date')
|
||||
->setParameter('date', new \DateTime("- {$days} days"), 'datetime')
|
||||
->executeStatement()
|
||||
;
|
||||
}
|
||||
|
||||
private function returnFirstOrNull(QueryBuilder $qb): ?Job
|
||||
{
|
||||
$stmt = $qb->executeQuery();
|
||||
|
||||
@@ -3,7 +3,7 @@ services:
|
||||
public: false
|
||||
|
||||
CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry'
|
||||
CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry:
|
||||
CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry: ~
|
||||
|
||||
CodeRhapsodie\DataflowBundle\Command\AddScheduledDataflowCommand:
|
||||
arguments:
|
||||
@@ -100,3 +100,15 @@ services:
|
||||
arguments:
|
||||
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
|
||||
$exceptionHandler: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface'
|
||||
|
||||
CodeRhapsodie\DataflowBundle\Command\JobCleanupCommand:
|
||||
arguments:
|
||||
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
|
||||
$retention: '%coderhapsodie.dataflow.job_history.retention%'
|
||||
tags: ['console.command']
|
||||
|
||||
CodeRhapsodie\DataflowBundle\Command\SetCrashedCommand:
|
||||
arguments:
|
||||
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
|
||||
$crashedDelay: '%coderhapsodie.dataflow.job_history.crashed_delay%'
|
||||
tags: ['console.command']
|
||||
|
||||
Reference in New Issue
Block a user