1 Commits

Author SHA1 Message Date
AUDUL
4e82b114c4 Add auto update count processed item while running job (#79)
* Add auto update count processed item while running job
2025-10-02 15:22:37 +02:00
51 changed files with 290 additions and 290 deletions

View File

@@ -10,3 +10,5 @@ jobs:
- uses: actions/checkout@v4
- uses: php-actions/composer@v6 # or alternative dependency management
- uses: php-actions/phpunit@v4
- name: Run PHP CS Fixer
run: php vendor/bin/php-cs-fixer fix --dry-run --diff

1
.gitignore vendored
View File

@@ -6,3 +6,4 @@ composer.lock
.idea
.phpunit.cache
.php-version
.php-cs-fixer.cache

49
.php-cs-fixer.dist.php Normal file
View File

@@ -0,0 +1,49 @@
<?php
$finder = PhpCsFixer\Finder::create()
->in('src')
//->in('tests')
->files()->name('*.php');
$config = new PhpCsFixer\Config();
$config->setRules([
'@Symfony' => true,
'@Symfony:risky' => true,
'@PSR12' => true,
'array_syntax' => [
'syntax' => 'short',
],
'combine_consecutive_unsets' => true,
'native_function_invocation' => [
'include' => [
'@compiler_optimized',
],
],
'no_extra_blank_lines' => [
'tokens' => [
'break',
'continue',
'extra',
'return',
'throw',
'use',
'parenthesis_brace_block',
'square_brace_block',
'curly_brace_block',
],
],
'ordered_class_elements' => true,
'ordered_imports' => true,
'yoda_style' => [
'equal' => false,
'identical' => false,
'less_and_greater' => false,
'always_move_variable' => false,
],
])
->setRiskyAllowed(true)
->setFinder(
$finder
);
return $config;

View File

@@ -1,12 +0,0 @@
<?php
$finder = PhpCsFixer\Finder::create()->in(__DIR__.'/src');
return PhpCsFixer\Config::create()
->setRules([
'@Symfony' => true,
'declare_strict_types' => true,
])
->setFinder($finder)
->setRiskyAllowed(true)
;

View File

@@ -1,3 +1,6 @@
# Version 5.3.0
* Added auto update count processed item while running job
# Version 5.2.0
* Added custom index for job status

View File

@@ -44,9 +44,7 @@ class CollectionWriterTest extends TestCase
$embeddedWriter
->expects($matcher)
->method('write')
->with($this->callback(function ($arg) use ($matcher, $values) {
return $arg === $values[$matcher->numberOfInvocations() - 1];
}))
->with($this->callback(fn($arg) => $arg === $values[$matcher->numberOfInvocations() - 1]))
;
$writer = new CollectionWriter($embeddedWriter);

View File

@@ -11,7 +11,7 @@ class PortWriterAdapterTest extends TestCase
{
$value = 'not an array';
$writer = $this->getMockBuilder('\Port\Writer')
$writer = $this->getMockBuilder(\Port\Writer::class)
->onlyMethods(['prepare', 'finish', 'writeItem'])
->getMock()
;

View File

@@ -49,15 +49,10 @@ class ScheduledDataflowManagerTest extends TestCase
$this->jobRepository
->expects($matcher)
->method('findPendingForScheduledDataflow')
->with($this->callback(function ($arg) use ($matcher, $scheduled1, $scheduled2) {
switch ($matcher->numberOfInvocations()) {
case 1:
return $arg === $scheduled1;
case 2:
return $arg === $scheduled2;
default:
return false;
}
->with($this->callback(fn($arg) => match ($matcher->numberOfInvocations()) {
1 => $arg === $scheduled1,
2 => $arg === $scheduled2,
default => false,
}))
->willReturnOnConsecutiveCalls(new Job(), null)
;

View File

@@ -44,18 +44,11 @@ class JobProcessorTest extends TestCase
->expects($matcher)
->method('dispatch')
->with(
$this->callback(function ($arg) use ($job) {
return $arg instanceof ProcessingEvent && $arg->getJob() === $job;
}),
$this->callback(function ($arg) use ($matcher) {
switch ($matcher->numberOfInvocations()) {
case 1:
return $arg === Events::BEFORE_PROCESSING;
case 2:
return $arg === Events::AFTER_PROCESSING;
default:
return false;
}
$this->callback(fn($arg) => $arg instanceof ProcessingEvent && $arg->getJob() === $job),
$this->callback(fn($arg) => match ($matcher->numberOfInvocations()) {
1 => $arg === Events::BEFORE_PROCESSING,
2 => $arg === Events::AFTER_PROCESSING,
default => false,
})
);

View File

@@ -39,15 +39,10 @@ class MessengerDataflowRunnerTest extends TestCase
$this->repository
->expects($matcher)
->method('save')
->with($this->callback(function ($arg) use ($matcher, $job1, $job2) {
switch ($matcher->numberOfInvocations()) {
case 1:
return $arg === $job1;
case 2:
return $arg === $job2;
default:
return false;
}
->with($this->callback(fn($arg) => match ($matcher->numberOfInvocations()) {
1 => $arg === $job1,
2 => $arg === $job2,
default => false,
}))
;
@@ -55,15 +50,10 @@ class MessengerDataflowRunnerTest extends TestCase
$this->bus
->expects($matcher)
->method('dispatch')
->with($this->callback(function ($arg) use ($matcher, $id1, $id2) {
switch ($matcher->numberOfInvocations()) {
case 1:
return $arg instanceof JobMessage && $arg->getJobId() === $id1;
case 2:
return $arg instanceof JobMessage && $arg->getJobId() === $id2;
default:
return false;
}
->with($this->callback(fn($arg) => match ($matcher->numberOfInvocations()) {
1 => $arg instanceof JobMessage && $arg->getJobId() === $id1,
2 => $arg instanceof JobMessage && $arg->getJobId() === $id2,
default => false,
}))
->willReturnOnConsecutiveCalls(
new Envelope(new JobMessage($id1)),

View File

@@ -38,15 +38,10 @@ class PendingDataflowRunnerTest extends TestCase
$this->processor
->expects($matcher)
->method('process')
->with($this->callback(function ($arg) use ($matcher, $job1, $job2) {
switch ($matcher->numberOfInvocations()) {
case 1:
return $arg === $job1;
case 2:
return $arg === $job2;
default:
return false;
}
->with($this->callback(fn($arg) => match ($matcher->numberOfInvocations()) {
1 => $arg === $job1,
2 => $arg === $job2,
default => false,
}))
;

View File

@@ -60,9 +60,10 @@
},
"require-dev": {
"amphp/amp": "^2.5",
"friendsofphp/php-cs-fixer": "^3.75",
"phpunit/phpunit": "^11",
"portphp/portphp": "^1.9",
"rector/rector": "^1.0",
"rector/rector": "^2.0",
"symfony/messenger": "^7.0"
},
"suggest": {

View File

@@ -17,7 +17,7 @@ return static function (RectorConfig $rectorConfig): void {
$rectorConfig->rule(InlineConstructorDefaultToPropertyRector::class);
$rectorConfig->sets([
SymfonySetList::SYMFONY_60,
SymfonySetList::SYMFONY_70,
SymfonySetList::SYMFONY_CODE_QUALITY,
SymfonySetList::SYMFONY_CONSTRUCTOR_INJECTION,
LevelSetList::UP_TO_PHP_80,

View File

@@ -27,29 +27,27 @@ class AddScheduledDataflowCommand extends Command
parent::__construct();
}
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
->setHelp('The <info>%command.name%</info> allows you to create a new scheduled dataflow.')
->addOption('label', null, InputOption::VALUE_REQUIRED, 'Label of the scheduled dataflow')
->addOption('type', null, InputOption::VALUE_REQUIRED, 'Type of the scheduled dataflow (FQCN)')
->addOption('options', null, InputOption::VALUE_OPTIONAL,
'Options of the scheduled dataflow (ex: {"option1": "value1", "option2": "value2"})')
->addOption(
'options',
null,
InputOption::VALUE_OPTIONAL,
'Options of the scheduled dataflow (ex: {"option1": "value1", "option2": "value2"})'
)
->addOption('frequency', null, InputOption::VALUE_REQUIRED, 'Frequency of the scheduled dataflow')
->addOption('first_run', null, InputOption::VALUE_REQUIRED, 'Date for the first run of the scheduled dataflow (Y-m-d H:i:s)')
->addOption('enabled', null, InputOption::VALUE_REQUIRED, 'State of the scheduled dataflow')
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
if ($input->getOption('connection') !== null) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$choices = [];
@@ -71,13 +69,17 @@ class AddScheduledDataflowCommand extends Command
}
$options = $input->getOption('options');
if (!$options) {
$options = $io->ask('What are the launch options for the scheduled dataflow? (ex: {"option1": "value1", "option2": "value2"})',
json_encode([]));
$options = $io->ask(
'What are the launch options for the scheduled dataflow? (ex: {"option1": "value1", "option2": "value2"})',
json_encode([])
);
}
$frequency = $input->getOption('frequency');
if (!$frequency) {
$frequency = $io->choice('What is the frequency for the scheduled dataflow?',
ScheduledDataflow::AVAILABLE_FREQUENCIES);
$frequency = $io->choice(
'What is the frequency for the scheduled dataflow?',
ScheduledDataflow::AVAILABLE_FREQUENCIES
);
}
$firstRun = $input->getOption('first_run');
if (!$firstRun) {
@@ -92,22 +94,25 @@ class AddScheduledDataflowCommand extends Command
'id' => null,
'label' => $label,
'dataflow_type' => $type,
'options' => json_decode($options, true, 512, JSON_THROW_ON_ERROR),
'options' => json_decode($options, true, 512, \JSON_THROW_ON_ERROR),
'frequency' => $frequency,
'next' => new \DateTime($firstRun),
'enabled' => $enabled,
]);
$errors = $this->validator->validate($newScheduledDataflow);
if (count($errors) > 0) {
if (\count($errors) > 0) {
$io->error((string) $errors);
return 2;
}
$this->scheduledDataflowRepository->save($newScheduledDataflow);
$io->success(sprintf('New scheduled dataflow "%s" (id:%d) was created successfully.',
$newScheduledDataflow->getLabel(), $newScheduledDataflow->getId()));
$io->success(\sprintf(
'New scheduled dataflow "%s" (id:%d) was created successfully.',
$newScheduledDataflow->getLabel(),
$newScheduledDataflow->getId()
));
return 0;
}

View File

@@ -26,9 +26,6 @@ class ChangeScheduleStatusCommand extends Command
parent::__construct();
}
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
@@ -39,12 +36,9 @@ class ChangeScheduleStatusCommand extends Command
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
if ($input->getOption('connection') !== null) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$io = new SymfonyStyle($input, $output);
@@ -52,7 +46,7 @@ class ChangeScheduleStatusCommand extends Command
$schedule = $this->scheduledDataflowRepository->find((int) $input->getArgument('schedule-id'));
if (!$schedule) {
$io->error(sprintf('Cannot find scheduled dataflow with id "%d".', $input->getArgument('schedule-id')));
$io->error(\sprintf('Cannot find scheduled dataflow with id "%d".', $input->getArgument('schedule-id')));
return 1;
}
@@ -71,9 +65,9 @@ class ChangeScheduleStatusCommand extends Command
try {
$schedule->setEnabled($input->getOption('enable'));
$this->scheduledDataflowRepository->save($schedule);
$io->success(sprintf('Schedule with id "%s" has been successfully updated.', $schedule->getId()));
$io->success(\sprintf('Schedule with id "%s" has been successfully updated.', $schedule->getId()));
} catch (\Exception $e) {
$io->error(sprintf('An error occured when changing schedule status : "%s".', $e->getMessage()));
$io->error(\sprintf('An error occured when changing schedule status : "%s".', $e->getMessage()));
return 4;
}

View File

@@ -27,9 +27,6 @@ class DatabaseSchemaCommand extends Command
parent::__construct();
}
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
@@ -43,7 +40,7 @@ class DatabaseSchemaCommand extends Command
{
$io = new SymfonyStyle($input, $output);
if (null !== $input->getOption('connection')) {
if ($input->getOption('connection') !== null) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
@@ -61,7 +58,7 @@ class DatabaseSchemaCommand extends Command
$tables = [];
foreach ($sm->listTables() as $table) {
/** @var Table $table */
if (in_array($table->getName(), $tableArray)) {
if (\in_array($table->getName(), $tableArray)) {
$tables[] = $table;
}
}
@@ -90,14 +87,14 @@ class DatabaseSchemaCommand extends Command
if ($input->getOption('dump-sql')) {
$io->text('Execute these SQL Queries on your database:');
foreach ($sqls as $sql) {
$io->text($sql . ';');
$io->text($sql.';');
}
return Command::SUCCESS;
}
if (!$io->askQuestion(new ConfirmationQuestion('Are you sure to update database ?', true))) {
$io->text("Execution canceled.");
$io->text('Execution canceled.');
return Command::SUCCESS;
}
@@ -106,7 +103,7 @@ class DatabaseSchemaCommand extends Command
$connection->executeQuery($sql);
}
$io->success(sprintf('%d queries executed.', \count($sqls)));
$io->success(\sprintf('%d queries executed.', \count($sqls)));
return parent::SUCCESS;
}

View File

@@ -4,8 +4,10 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\DataflowType\AutoUpdateCountInterface;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Symfony\Component\Console\Attribute\AsCommand;
@@ -26,18 +28,16 @@ class ExecuteDataflowCommand extends Command implements LoggerAwareInterface
{
use LoggerAwareTrait;
public function __construct(private DataflowTypeRegistryInterface $registry, private ConnectionFactory $connectionFactory)
public function __construct(private DataflowTypeRegistryInterface $registry, private ConnectionFactory $connectionFactory, private JobRepository $jobRepository)
{
parent::__construct();
}
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
->setHelp(<<<'EOF'
->setHelp(
<<<'EOF'
The <info>%command.name%</info> command runs one dataflow with the provided options.
<info>php %command.full_name% App\Dataflow\MyDataflow '{"option1": "value1", "option2": "value2"}'</info>
@@ -48,19 +48,20 @@ EOF
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
if ($input->getOption('connection') !== null) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$fqcnOrAlias = $input->getArgument('fqcn');
$options = json_decode($input->getArgument('options'), true, 512, JSON_THROW_ON_ERROR);
$options = json_decode($input->getArgument('options'), true, 512, \JSON_THROW_ON_ERROR);
$io = new SymfonyStyle($input, $output);
$dataflowType = $this->registry->getDataflowType($fqcnOrAlias);
if ($dataflowType instanceof AutoUpdateCountInterface) {
$dataflowType->setRepository($this->jobRepository);
}
if ($dataflowType instanceof LoggerAwareInterface && isset($this->logger)) {
$dataflowType->setLogger($this->logger);
}

View File

@@ -31,9 +31,6 @@ class JobShowCommand extends Command
parent::__construct();
}
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
@@ -44,12 +41,9 @@ class JobShowCommand extends Command
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
if ($input->getOption('connection') !== null) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
@@ -73,7 +67,7 @@ class JobShowCommand extends Command
return 2;
}
if (null === $job) {
if ($job === null) {
$io->error('Cannot find job :/');
return 3;
@@ -87,19 +81,19 @@ class JobShowCommand extends Command
['Started at', $job->getStartTime() ? $job->getStartTime()->format('Y-m-d H:i:s') : '-'],
['Ended at', $job->getEndTime() ? $job->getEndTime()->format('Y-m-d H:i:s') : '-'],
['Object number', $job->getCount()],
['Errors', count((array) $job->getExceptions())],
['Errors', \count((array) $job->getExceptions())],
['Status', $this->translateStatus($job->getStatus())],
];
if ($input->getOption('details')) {
$display[] = ['Type', $job->getDataflowType()];
$display[] = ['Options', json_encode($job->getOptions(), JSON_THROW_ON_ERROR)];
$display[] = ['Options', json_encode($job->getOptions(), \JSON_THROW_ON_ERROR)];
$io->section('Summary');
}
$io->table(['Field', 'Value'], $display);
if ($input->getOption('details')) {
$io->section('Exceptions');
$exceptions = array_map(fn(string $exception) => substr($exception, 0, 900).'…', $job->getExceptions());
$exceptions = array_map(fn (string $exception) => substr($exception, 0, 900).'…', $job->getExceptions());
$io->write($exceptions);
}

View File

@@ -29,22 +29,17 @@ class RunPendingDataflowsCommand extends Command
parent::__construct();
}
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
->setHelp(<<<'EOF'
->setHelp(
<<<'EOF'
The <info>%command.name%</info> command runs dataflows according to the schedule defined in the UI by the user.
EOF
)
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (!$this->lock()) {
@@ -53,7 +48,7 @@ EOF
return 0;
}
if (null !== $input->getOption('connection')) {
if ($input->getOption('connection') !== null) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}

View File

@@ -24,9 +24,6 @@ class ScheduleListCommand extends Command
parent::__construct();
}
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
@@ -34,12 +31,9 @@ class ScheduleListCommand extends Command
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
if ($input->getOption('connection') !== null) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$io = new SymfonyStyle($input, $output);

View File

@@ -4,13 +4,6 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use CodeRhapsodie\DataflowBundle\SchemaProvider\DataflowSchemaProvider;
use Doctrine\DBAL\Schema\Comparator;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Table;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\ArrayInput;
@@ -21,14 +14,12 @@ use Symfony\Component\Console\Style\SymfonyStyle;
/**
* @codeCoverageIgnore
*
* @deprecated This command is deprecated and will be removed in 6.0, use this command "code-rhapsodie:dataflow:database-schema" instead.
*/
#[AsCommand('code-rhapsodie:dataflow:dump-schema', 'Generates schema create / update SQL queries')]
class SchemaCommand extends Command
{
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
@@ -38,9 +29,6 @@ class SchemaCommand extends Command
;
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
@@ -48,9 +36,9 @@ class SchemaCommand extends Command
$options = array_filter($input->getOptions());
//add -- before each keys
// add -- before each keys
$options = array_combine(
array_map(fn($key) => '--' . $key, array_keys($options)),
array_map(fn ($key) => '--'.$key, array_keys($options)),
array_values($options)
);
@@ -58,7 +46,7 @@ class SchemaCommand extends Command
$inputArray = new ArrayInput([
'command' => 'code-rhapsodie:dataflow:database-schema',
...$options
...$options,
]);
return $this->getApplication()->doRun($inputArray, $output);

View File

@@ -7,7 +7,6 @@ namespace CodeRhapsodie\DataflowBundle\DataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\AMPAsyncDataflow;
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\DataflowInterface;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
class AMPAsyncDataflowBuilder extends DataflowBuilder
{

View File

@@ -4,15 +4,20 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwareInterface
abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwareInterface, AutoUpdateCountInterface
{
use LoggerAwareTrait;
private JobRepository $repository;
private ?\DateTime $saveDate = null;
/**
* @codeCoverageIgnore
*/
@@ -21,14 +26,24 @@ abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwar
return [];
}
public function process(array $options): Result
public function process(array $options, ?int $jobId = null): Result
{
$this->saveDate = new \DateTime('+1 minute');
$optionsResolver = new OptionsResolver();
$this->configureOptions($optionsResolver);
$options = $optionsResolver->resolve($options);
$builder = $this->createDataflowBuilder();
$builder->setName($this->getLabel());
$builder->addAfterItemProcessor(function (int|string $index, mixed $item, int $count) use ($jobId) {
if ($jobId === null || $this->saveDate > new \DateTime()) {
return;
}
$this->repository->updateCount($jobId, $count);
$this->saveDate = new \DateTime('+1 minute');
});
$this->buildDataflow($builder, $options);
$dataflow = $builder->getDataflow();
if ($dataflow instanceof LoggerAwareInterface && $this->logger instanceof LoggerInterface) {
@@ -38,6 +53,11 @@ abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwar
return $dataflow->process();
}
public function setRepository(JobRepository $repository): void
{
$this->repository = $repository;
}
protected function createDataflowBuilder(): DataflowBuilder
{
return new DataflowBuilder();

View File

@@ -0,0 +1,12 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
interface AutoUpdateCountInterface
{
public function setRepository(JobRepository $repository): void;
}

View File

@@ -5,18 +5,19 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType\Dataflow;
use function Amp\coroutine;
use Amp\Deferred;
use Amp\Delayed;
use Amp\Loop;
use Amp\Producer;
use Amp\Promise;
use function Amp\Promise\wait;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use RuntimeException;
use Throwable;
class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
{
@@ -34,8 +35,8 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
public function __construct(private iterable $reader, private ?string $name, private ?int $loopInterval = 0, private ?int $emitInterval = 0)
{
if (!function_exists('Amp\\Promise\\wait')) {
throw new RuntimeException('Amp is not loaded. Suggest install it with composer require amphp/amp');
if (!\function_exists('Amp\\Promise\\wait')) {
throw new \RuntimeException('Amp is not loaded. Suggest install it with composer require amphp/amp');
}
}
@@ -61,9 +62,6 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
return $this;
}
/**
* {@inheritdoc}
*/
public function process(): Result
{
$count = 0;
@@ -76,7 +74,7 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
}
$deferred = new Deferred();
$resolved = false; //missing $deferred->isResolved() in version 2.5
$resolved = false; // missing $deferred->isResolved() in version 2.5
$producer = new Producer(function (callable $emit) {
foreach ($this->reader as $index => $item) {
yield new Delayed($this->emitInterval);
@@ -89,7 +87,7 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
$it = $producer->getCurrent();
[$index, $item] = $it;
$this->states[$index] = [$index, 0, $item];
} elseif (!$resolved && 0 === count($this->states)) {
} elseif (!$resolved && \count($this->states) === 0) {
$resolved = true;
$deferred->resolve();
}
@@ -120,20 +118,20 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
private function processState(mixed $state, int &$count, array &$exceptions): void
{
[$readIndex, $stepIndex, $item] = $state;
if ($stepIndex < count($this->steps)) {
if ($stepIndex < \count($this->steps)) {
if (!isset($this->stepsJobs[$stepIndex])) {
$this->stepsJobs[$stepIndex] = [];
}
[$step, $scale] = $this->steps[$stepIndex];
if ((is_countable($this->stepsJobs[$stepIndex]) ? count($this->stepsJobs[$stepIndex]) : 0) < $scale && !isset($this->stepsJobs[$stepIndex][$readIndex])) {
if ((is_countable($this->stepsJobs[$stepIndex]) ? \count($this->stepsJobs[$stepIndex]) : 0) < $scale && !isset($this->stepsJobs[$stepIndex][$readIndex])) {
$this->stepsJobs[$stepIndex][$readIndex] = true;
/** @var Promise<void> $promise */
$promise = coroutine($step)($item);
$promise->onResolve(function (?Throwable $exception = null, $newItem = null) use ($stepIndex, $readIndex, &$exceptions) {
$promise->onResolve(function (?\Throwable $exception = null, $newItem = null) use ($stepIndex, $readIndex, &$exceptions) {
if ($exception) {
$exceptions[$stepIndex] = $exception;
$this->logException($exception, (string) $stepIndex);
} elseif (false === $newItem) {
} elseif ($newItem === false) {
unset($this->states[$readIndex]);
} else {
$this->states[$readIndex] = [$readIndex, $stepIndex + 1, $newItem];
@@ -153,7 +151,7 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
}
}
private function logException(Throwable $e, ?string $index = null): void
private function logException(\Throwable $e, ?string $index = null): void
{
if (!isset($this->logger)) {
return;

View File

@@ -21,6 +21,13 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
private ?\Closure $customExceptionIndex = null;
private ?\DateTimeInterface $dateTime = null;
/**
* @var \Closure[]
*/
private array $afterItemProcessors = [];
public function __construct(private iterable $reader, private ?string $name)
{
}
@@ -56,8 +63,15 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
}
/**
* {@inheritdoc}
* @param array<callable> $processors
*/
public function setAfterItemProcessors(array $processors): self
{
$this->afterItemProcessors = array_map(fn (callable $callable) => \Closure::fromCallable($callable), $processors);
return $this;
}
public function process(): Result
{
$count = 0;
@@ -75,7 +89,7 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
} catch (\Throwable $e) {
$exceptionIndex = $index;
try {
if (is_callable($this->customExceptionIndex)) {
if (\is_callable($this->customExceptionIndex)) {
$exceptionIndex = (string) ($this->customExceptionIndex)($item, $index);
}
} catch (\Throwable $e2) {
@@ -87,6 +101,10 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
}
++$count;
foreach ($this->afterItemProcessors as $afterItemProcessor) {
$afterItemProcessor($index, $item, $count);
}
}
foreach ($this->writers as $writer) {
@@ -103,9 +121,9 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
private function processItem(mixed $item): void
{
foreach ($this->steps as $step) {
$item = call_user_func($step, $item);
$item = \call_user_func($step, $item);
if (false === $item) {
if ($item === false) {
return;
}
}

View File

@@ -18,6 +18,10 @@ class DataflowBuilder
private array $writers = [];
private ?\Closure $customExceptionIndex = null;
/**
* @var \Closure[]
*/
private array $afterItemProcessors = [];
public function setName(string $name): self
{
@@ -54,6 +58,13 @@ class DataflowBuilder
return $this;
}
public function addAfterItemProcessor(callable $callable): self
{
$this->afterItemProcessors[] = \Closure::fromCallable($callable);
return $this;
}
public function getDataflow(): DataflowInterface
{
$dataflow = new Dataflow($this->reader, $this->name);
@@ -69,10 +80,12 @@ class DataflowBuilder
$dataflow->addWriter($writer);
}
if (is_callable($this->customExceptionIndex)) {
if (\is_callable($this->customExceptionIndex)) {
$dataflow->setCustomExceptionIndex($this->customExceptionIndex);
}
$dataflow->setAfterItemProcessors($this->afterItemProcessors);
return $dataflow;
}
}

View File

@@ -10,5 +10,5 @@ interface DataflowTypeInterface
public function getAliases(): iterable;
public function process(array $options): Result;
public function process(array $options, ?int $jobId = null): Result;
}

View File

@@ -20,7 +20,7 @@ class Result
public function __construct(private string $name, private \DateTimeInterface $startTime, private \DateTimeInterface $endTime, private int $totalProcessedCount, array $exceptions)
{
$this->elapsed = $startTime->diff($endTime);
$this->errorCount = count($exceptions);
$this->errorCount = \count($exceptions);
$this->successCount = $totalProcessedCount - $this->errorCount;
$this->exceptions = $exceptions;
}

View File

@@ -18,21 +18,15 @@ class CollectionWriter implements DelegateWriterInterface
{
}
/**
* {@inheritdoc}
*/
public function prepare()
{
$this->writer->prepare();
}
/**
* {@inheritdoc}
*/
public function write($collection)
{
if (!is_iterable($collection)) {
throw new UnsupportedItemTypeException(sprintf('Item to write was expected to be an iterable, received %s.', get_debug_type($collection)));
throw new UnsupportedItemTypeException(\sprintf('Item to write was expected to be an iterable, received %s.', get_debug_type($collection)));
}
foreach ($collection as $item) {
@@ -40,17 +34,11 @@ class CollectionWriter implements DelegateWriterInterface
}
}
/**
* {@inheritdoc}
*/
public function finish()
{
$this->writer->finish();
}
/**
* {@inheritdoc}
*/
public function supports($item): bool
{
return is_iterable($item);

View File

@@ -11,8 +11,6 @@ interface DelegateWriterInterface extends WriterInterface
{
/**
* Returns true if the argument is of a supported type.
*
* @param $item
*/
public function supports($item): bool;
}

View File

@@ -21,9 +21,6 @@ class DelegatorWriter implements DelegateWriterInterface
{
}
/**
* {@inheritdoc}
*/
public function prepare()
{
foreach ($this->delegates as $delegate) {
@@ -31,9 +28,6 @@ class DelegatorWriter implements DelegateWriterInterface
}
}
/**
* {@inheritdoc}
*/
public function write($item)
{
foreach ($this->delegates as $delegate) {
@@ -46,12 +40,9 @@ class DelegatorWriter implements DelegateWriterInterface
return;
}
throw new UnsupportedItemTypeException(sprintf('None of the registered delegate writers support the received item of type %s', get_debug_type($item)));
throw new UnsupportedItemTypeException(\sprintf('None of the registered delegate writers support the received item of type %s', get_debug_type($item)));
}
/**
* {@inheritdoc}
*/
public function finish()
{
foreach ($this->delegates as $delegate) {
@@ -59,9 +50,6 @@ class DelegatorWriter implements DelegateWriterInterface
}
}
/**
* {@inheritdoc}
*/
public function supports($item): bool
{
foreach ($this->delegates as $delegate) {

View File

@@ -12,9 +12,6 @@ use Symfony\Component\DependencyInjection\Reference;
class BusCompilerPass implements CompilerPassInterface
{
/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
{
if (!$container->hasParameter('coderhapsodie.dataflow.bus')) {
@@ -23,7 +20,7 @@ class BusCompilerPass implements CompilerPassInterface
$bus = $container->getParameter('coderhapsodie.dataflow.bus');
if (!$container->has($bus)) {
throw new InvalidArgumentException(sprintf('Service "%s" not found', $bus));
throw new InvalidArgumentException(\sprintf('Service "%s" not found', $bus));
}
if (!$container->has(MessengerDataflowRunner::class)) {

View File

@@ -16,9 +16,6 @@ use Symfony\Component\DependencyInjection\Reference;
*/
class DataflowTypeCompilerPass implements CompilerPassInterface
{
/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
{
if (!$container->has(DataflowTypeRegistry::class)) {

View File

@@ -12,9 +12,6 @@ use Symfony\Component\DependencyInjection\Reference;
class DefaultLoggerCompilerPass implements CompilerPassInterface
{
/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
{
$defaultLogger = $container->getParameter('coderhapsodie.dataflow.default_logger');

View File

@@ -10,7 +10,7 @@ use Symfony\Component\Messenger\MessageBusInterface;
class Configuration implements ConfigurationInterface
{
public function getConfigTreeBuilder(): \Symfony\Component\Config\Definition\Builder\TreeBuilder
public function getConfigTreeBuilder(): TreeBuilder
{
$treeBuilder = new TreeBuilder('code_rhapsodie_dataflow');
$rootNode = $treeBuilder->getRootNode();
@@ -34,7 +34,7 @@ class Configuration implements ConfigurationInterface
->end()
->end()
->validate()
->ifTrue(static fn($v): bool => $v['enabled'] && !interface_exists(MessageBusInterface::class))
->ifTrue(static fn ($v): bool => $v['enabled'] && !interface_exists(MessageBusInterface::class))
->thenInvalid('You need "symfony/messenger" in order to use Dataflow messenger mode.')
->end()
->end()

View File

@@ -75,19 +75,19 @@ class Job
public static function createFromArray(array $datas)
{
$lost = array_diff(static::KEYS, array_keys($datas));
if (count($lost) > 0) {
if (\count($lost) > 0) {
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ', $lost).'"');
}
$job = new self();
$job->id = null === $datas['id'] ? null : (int) $datas['id'];
$job->setStatus(null === $datas['status'] ? null : (int) $datas['status']);
$job->id = $datas['id'] === null ? null : (int) $datas['id'];
$job->setStatus($datas['status'] === null ? null : (int) $datas['status']);
$job->setLabel($datas['label']);
$job->setDataflowType($datas['dataflow_type']);
$job->setOptions($datas['options']);
$job->setRequestedDate($datas['requested_date']);
$job->setScheduledDataflowId(null === $datas['scheduled_dataflow_id'] ? null : (int) $datas['scheduled_dataflow_id']);
$job->setCount(null === $datas['count'] ? null : (int) $datas['count']);
$job->setScheduledDataflowId($datas['scheduled_dataflow_id'] === null ? null : (int) $datas['scheduled_dataflow_id']);
$job->setCount($datas['count'] === null ? null : (int) $datas['count']);
$job->setExceptions($datas['exceptions']);
$job->setStartTime($datas['start_time']);
$job->setEndTime($datas['end_time']);
@@ -112,7 +112,7 @@ class Job
];
}
public function setId(int $id): Job
public function setId(int $id): self
{
$this->id = $id;
@@ -129,7 +129,7 @@ class Job
return $this->status;
}
public function setStatus(int $status): Job
public function setStatus(int $status): self
{
$this->status = $status;
@@ -141,7 +141,7 @@ class Job
return $this->label;
}
public function setLabel(?string $label): Job
public function setLabel(?string $label): self
{
$this->label = $label;
@@ -153,7 +153,7 @@ class Job
return $this->dataflowType;
}
public function setDataflowType(?string $dataflowType): Job
public function setDataflowType(?string $dataflowType): self
{
$this->dataflowType = $dataflowType;
@@ -165,7 +165,7 @@ class Job
return $this->options;
}
public function setOptions(?array $options): Job
public function setOptions(?array $options): self
{
$this->options = $options;
@@ -177,7 +177,7 @@ class Job
return $this->requestedDate;
}
public function setRequestedDate(?\DateTimeInterface $requestedDate): Job
public function setRequestedDate(?\DateTimeInterface $requestedDate): self
{
$this->requestedDate = $requestedDate;
@@ -189,7 +189,7 @@ class Job
return $this->scheduledDataflowId;
}
public function setScheduledDataflowId(?int $scheduledDataflowId): Job
public function setScheduledDataflowId(?int $scheduledDataflowId): self
{
$this->scheduledDataflowId = $scheduledDataflowId;
@@ -201,7 +201,7 @@ class Job
return $this->count;
}
public function setCount(?int $count): Job
public function setCount(?int $count): self
{
$this->count = $count;
@@ -213,7 +213,7 @@ class Job
return $this->exceptions;
}
public function setExceptions(?array $exceptions): Job
public function setExceptions(?array $exceptions): self
{
$this->exceptions = $exceptions;
@@ -225,7 +225,7 @@ class Job
return $this->startTime;
}
public function setStartTime(?\DateTimeInterface $startTime): Job
public function setStartTime(?\DateTimeInterface $startTime): self
{
$this->startTime = $startTime;
@@ -237,7 +237,7 @@ class Job
return $this->endTime;
}
public function setEndTime(?\DateTimeInterface $endTime): Job
public function setEndTime(?\DateTimeInterface $endTime): self
{
$this->endTime = $endTime;

View File

@@ -50,19 +50,19 @@ class ScheduledDataflow
public static function createFromArray(array $datas)
{
$lost = array_diff(static::KEYS, array_keys($datas));
if (count($lost) > 0) {
if (\count($lost) > 0) {
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ', $lost).'"');
}
$scheduledDataflow = new self();
$scheduledDataflow->id = null === $datas['id'] ? null : (int) $datas['id'];
$scheduledDataflow->id = $datas['id'] === null ? null : (int) $datas['id'];
$scheduledDataflow->setLabel($datas['label']);
$scheduledDataflow->setDataflowType($datas['dataflow_type']);
$scheduledDataflow->setOptions($datas['options']);
$scheduledDataflow->setFrequency($datas['frequency']);
$scheduledDataflow->setNext($datas['next']);
$scheduledDataflow->setEnabled(null === $datas['enabled'] ? null : (bool) $datas['enabled']);
$scheduledDataflow->setEnabled($datas['enabled'] === null ? null : (bool) $datas['enabled']);
return $scheduledDataflow;
}
@@ -80,7 +80,7 @@ class ScheduledDataflow
];
}
public function setId(int $id): ScheduledDataflow
public function setId(int $id): self
{
$this->id = $id;
@@ -97,7 +97,7 @@ class ScheduledDataflow
return $this->label;
}
public function setLabel(?string $label): ScheduledDataflow
public function setLabel(?string $label): self
{
$this->label = $label;
@@ -109,7 +109,7 @@ class ScheduledDataflow
return $this->dataflowType;
}
public function setDataflowType(?string $dataflowType): ScheduledDataflow
public function setDataflowType(?string $dataflowType): self
{
$this->dataflowType = $dataflowType;
@@ -121,7 +121,7 @@ class ScheduledDataflow
return $this->options;
}
public function setOptions(?array $options): ScheduledDataflow
public function setOptions(?array $options): self
{
$this->options = $options;
@@ -133,7 +133,7 @@ class ScheduledDataflow
return $this->frequency;
}
public function setFrequency(?string $frequency): ScheduledDataflow
public function setFrequency(?string $frequency): self
{
$this->frequency = $frequency;
@@ -145,7 +145,7 @@ class ScheduledDataflow
return $this->next;
}
public function setNext(?\DateTimeInterface $next): ScheduledDataflow
public function setNext(?\DateTimeInterface $next): self
{
$this->next = $next;
@@ -157,7 +157,7 @@ class ScheduledDataflow
return $this->enabled;
}
public function setEnabled(?bool $enabled): ScheduledDataflow
public function setEnabled(?bool $enabled): self
{
$this->enabled = $enabled;

View File

@@ -11,7 +11,7 @@ class UnknownDataflowTypeException extends \Exception
{
public static function create(string $aliasOrFqcn, array $knownDataflowTypes): self
{
return new self(sprintf(
return new self(\sprintf(
'Unknown dataflow type FQCN or alias "%s". Registered dataflow types FQCN and aliases are %s.',
$aliasOrFqcn,
implode(', ', $knownDataflowTypes)

View File

@@ -24,6 +24,6 @@ class ConnectionFactory
public function getConnection(): \Doctrine\DBAL\Connection
{
return $this->container->get(sprintf('doctrine.dbal.%s_connection', $this->connectionName));
return $this->container->get(\sprintf('doctrine.dbal.%s_connection', $this->connectionName));
}
}

View File

@@ -7,7 +7,6 @@ namespace CodeRhapsodie\DataflowBundle\Logger;
use Monolog\Formatter\FormatterInterface;
use Monolog\Formatter\LineFormatter;
use Monolog\Handler\AbstractProcessingHandler;
use Monolog\Logger;
use Monolog\LogRecord;
class BufferHandler extends AbstractProcessingHandler

View File

@@ -16,7 +16,7 @@ final class DelegatingLogger extends AbstractLogger
{
foreach ($loggers as $logger) {
if (!$logger instanceof LoggerInterface) {
throw new \InvalidArgumentException(sprintf('Only instances of %s should be passed to the constructor of %s. An instance of %s was passed instead.', LoggerInterface::class, self::class, $logger::class));
throw new \InvalidArgumentException(\sprintf('Only instances of %s should be passed to the constructor of %s. An instance of %s was passed instead.', LoggerInterface::class, self::class, $logger::class));
}
$this->loggers[] = $logger;

View File

@@ -19,15 +19,12 @@ class ScheduledDataflowManager implements ScheduledDataflowManagerInterface
{
}
/**
* {@inheritdoc}
*/
public function createJobsFromScheduledDataflows(): void
{
$this->connection->beginTransaction();
try {
foreach ($this->scheduledDataflowRepository->findReadyToRun() as $scheduled) {
if (null !== $this->jobRepository->findPendingForScheduledDataflow($scheduled)) {
if ($this->jobRepository->findPendingForScheduledDataflow($scheduled) !== null) {
continue;
}

View File

@@ -4,6 +4,7 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Processor;
use CodeRhapsodie\DataflowBundle\DataflowType\RepositoryInterface;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
@@ -30,6 +31,10 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
$this->beforeProcessing($job);
$dataflowType = $this->registry->getDataflowType($job->getDataflowType());
if ($dataflowType instanceof RepositoryInterface) {
$dataflowType->setRepository($this->repository);
}
$loggers = [new Logger('dataflow_internal', [$bufferHandler = new BufferHandler()])];
if (isset($this->logger)) {
$loggers[] = $this->logger;
@@ -40,7 +45,7 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
$dataflowType->setLogger($logger);
}
$result = $dataflowType->process($job->getOptions());
$result = $dataflowType->process($job->getOptions(), $job->getId());
if (!$dataflowType instanceof LoggerAwareInterface) {
foreach ($result->getExceptions() as $index => $e) {

View File

@@ -18,9 +18,6 @@ class DataflowTypeRegistry implements DataflowTypeRegistryInterface
/** @var array|DataflowTypeInterface[] */
private array $aliasesRegistry = [];
/**
* {@inheritdoc}
*/
public function getDataflowType(string $fqcnOrAlias): DataflowTypeInterface
{
if (isset($this->fqcnRegistry[$fqcnOrAlias])) {
@@ -34,17 +31,11 @@ class DataflowTypeRegistry implements DataflowTypeRegistryInterface
throw UnknownDataflowTypeException::create($fqcnOrAlias, [...array_keys($this->fqcnRegistry), ...array_keys($this->aliasesRegistry)]);
}
/**
* {@inheritdoc}
*/
public function listDataflowTypes(): iterable
{
return $this->fqcnRegistry;
}
/**
* {@inheritdoc}
*/
public function registerDataflowType(DataflowTypeInterface $dataflowType): void
{
$this->fqcnRegistry[$dataflowType::class] = $dataflowType;

View File

@@ -14,7 +14,7 @@ trait InitFromDbTrait
private function initDateTime(array $datas): array
{
foreach ($this->getFields() as $key => $type) {
if ('datetime' === $type && null !== $datas[$key]) {
if ($type === 'datetime' && $datas[$key] !== null) {
$datas[$key] = new \DateTime($datas[$key]);
}
}
@@ -24,10 +24,10 @@ trait InitFromDbTrait
private function initArray(array $datas): array
{
if (!is_array($datas['options'])) {
if (!\is_array($datas['options'])) {
$datas['options'] = $this->strToArray($datas['options']);
}
if (array_key_exists('exceptions', $datas) && !is_array($datas['exceptions'])) {
if (\array_key_exists('exceptions', $datas) && !\is_array($datas['exceptions'])) {
$datas['exceptions'] = $this->strToArray($datas['exceptions']);
}
@@ -36,12 +36,12 @@ trait InitFromDbTrait
private function strToArray($value): array
{
if (null === $value) {
if ($value === null) {
return [];
}
$array = json_decode($value, true, 512, JSON_THROW_ON_ERROR);
$array = json_decode($value, true, 512, \JSON_THROW_ON_ERROR);
return (false === $array) ? [] : $array;
return ($array === false) ? [] : $array;
}
}

View File

@@ -45,7 +45,7 @@ class JobRepository
->andWhere($qb->expr()->isNull('scheduled_dataflow_id'))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, ParameterType::INTEGER)));
$stmt = $qb->executeQuery();
if (0 === $stmt->rowCount()) {
if ($stmt->rowCount() === 0) {
return [];
}
while (false !== ($row = $stmt->fetchAssociative())) {
@@ -93,7 +93,7 @@ class JobRepository
->orderBy('requested_date', 'DESC')
->setMaxResults(20);
$stmt = $qb->executeQuery();
if (0 === $stmt->rowCount()) {
if ($stmt->rowCount() === 0) {
return [];
}
while (false !== ($row = $stmt->fetchAssociative())) {
@@ -108,7 +108,7 @@ class JobRepository
->orderBy('requested_date', 'DESC')
->setMaxResults(20);
$stmt = $qb->executeQuery();
if (0 === $stmt->rowCount()) {
if ($stmt->rowCount() === 0) {
return [];
}
while (false !== ($row = $stmt->fetchAssociative())) {
@@ -121,14 +121,14 @@ class JobRepository
$datas = $job->toArray();
unset($datas['id']);
if (is_array($datas['options'])) {
$datas['options'] = json_encode($datas['options'], JSON_THROW_ON_ERROR);
if (\is_array($datas['options'])) {
$datas['options'] = json_encode($datas['options'], \JSON_THROW_ON_ERROR);
}
if (is_array($datas['exceptions'])) {
$datas['exceptions'] = json_encode($datas['exceptions'], JSON_THROW_ON_ERROR);
if (\is_array($datas['exceptions'])) {
$datas['exceptions'] = json_encode($datas['exceptions'], \JSON_THROW_ON_ERROR);
}
if (null === $job->getId()) {
if ($job->getId() === null) {
$this->connection->insert(static::TABLE_NAME, $datas, $this->getFields());
$job->setId((int) $this->connection->lastInsertId());
@@ -137,6 +137,11 @@ class JobRepository
$this->connection->update(static::TABLE_NAME, $datas, ['id' => $job->getId()], $this->getFields());
}
public function updateCount(int $jobId, int $count): void
{
$this->connection->update(static::TABLE_NAME, ['count' => $count], ['id' => $jobId]);
}
public function createQueryBuilder($alias = null): QueryBuilder
{
$qb = $this->connection->createQueryBuilder();
@@ -149,7 +154,7 @@ class JobRepository
private function returnFirstOrNull(QueryBuilder $qb): ?Job
{
$stmt = $qb->executeQuery();
if (0 === $stmt->rowCount()) {
if ($stmt->rowCount() === 0) {
return null;
}

View File

@@ -41,7 +41,7 @@ class ScheduledDataflowRepository
;
$stmt = $qb->executeQuery();
if (0 === $stmt->rowCount()) {
if ($stmt->rowCount() === 0) {
return [];
}
while (false !== ($row = $stmt->fetchAssociative())) {
@@ -65,7 +65,7 @@ class ScheduledDataflowRepository
$qb->orderBy('label', 'ASC');
$stmt = $qb->executeQuery();
if (0 === $stmt->rowCount()) {
if ($stmt->rowCount() === 0) {
return [];
}
while (false !== ($row = $stmt->fetchAssociative())) {
@@ -90,11 +90,11 @@ class ScheduledDataflowRepository
$datas = $scheduledDataflow->toArray();
unset($datas['id']);
if (is_array($datas['options'])) {
$datas['options'] = json_encode($datas['options'], JSON_THROW_ON_ERROR);
if (\is_array($datas['options'])) {
$datas['options'] = json_encode($datas['options'], \JSON_THROW_ON_ERROR);
}
if (null === $scheduledDataflow->getId()) {
if ($scheduledDataflow->getId() === null) {
$this->connection->insert(static::TABLE_NAME, $datas, $this->getFields());
$scheduledDataflow->setId((int) $this->connection->lastInsertId());
@@ -129,7 +129,7 @@ class ScheduledDataflowRepository
private function returnFirstOrNull(QueryBuilder $qb): ?ScheduledDataflow
{
$stmt = $qb->executeQuery();
if (0 === $stmt->rowCount()) {
if ($stmt->rowCount() === 0) {
return null;
}

View File

@@ -23,6 +23,7 @@ services:
arguments:
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\JobShowCommand:

View File

@@ -13,9 +13,6 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
{
}
/**
* {@inheritdoc}
*/
public function runPendingDataflows(): void
{
while (null !== ($job = $this->repository->findNextPendingDataflow())) {

View File

@@ -10,22 +10,19 @@ use Symfony\Component\Validator\Exception\UnexpectedTypeException;
class FrequencyValidator extends ConstraintValidator
{
/**
* {@inheritdoc}
*/
public function validate(mixed $value, Constraint $constraint)
{
if (!$constraint instanceof Frequency) {
throw new UnexpectedTypeException($constraint, Frequency::class);
}
if (null === $value) {
if ($value === null) {
return;
}
try {
$interval = \DateInterval::createFromDateString($value);
} catch (\Exception){
} catch (\Exception) {
$interval = false;
}