mirror of
https://github.com/code-rhapsodie/dataflow-bundle.git
synced 2026-03-24 06:42:23 +01:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4e82b114c4 |
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
@@ -10,3 +10,5 @@ jobs:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: php-actions/composer@v6 # or alternative dependency management
|
||||
- uses: php-actions/phpunit@v4
|
||||
- name: Run PHP CS Fixer
|
||||
run: php vendor/bin/php-cs-fixer fix --dry-run --diff
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -6,3 +6,4 @@ composer.lock
|
||||
.idea
|
||||
.phpunit.cache
|
||||
.php-version
|
||||
.php-cs-fixer.cache
|
||||
49
.php-cs-fixer.dist.php
Normal file
49
.php-cs-fixer.dist.php
Normal file
@@ -0,0 +1,49 @@
|
||||
<?php
|
||||
|
||||
$finder = PhpCsFixer\Finder::create()
|
||||
->in('src')
|
||||
//->in('tests')
|
||||
->files()->name('*.php');
|
||||
|
||||
$config = new PhpCsFixer\Config();
|
||||
$config->setRules([
|
||||
'@Symfony' => true,
|
||||
'@Symfony:risky' => true,
|
||||
'@PSR12' => true,
|
||||
'array_syntax' => [
|
||||
'syntax' => 'short',
|
||||
],
|
||||
'combine_consecutive_unsets' => true,
|
||||
'native_function_invocation' => [
|
||||
'include' => [
|
||||
'@compiler_optimized',
|
||||
],
|
||||
],
|
||||
'no_extra_blank_lines' => [
|
||||
'tokens' => [
|
||||
'break',
|
||||
'continue',
|
||||
'extra',
|
||||
'return',
|
||||
'throw',
|
||||
'use',
|
||||
'parenthesis_brace_block',
|
||||
'square_brace_block',
|
||||
'curly_brace_block',
|
||||
],
|
||||
],
|
||||
'ordered_class_elements' => true,
|
||||
'ordered_imports' => true,
|
||||
'yoda_style' => [
|
||||
'equal' => false,
|
||||
'identical' => false,
|
||||
'less_and_greater' => false,
|
||||
'always_move_variable' => false,
|
||||
],
|
||||
])
|
||||
->setRiskyAllowed(true)
|
||||
->setFinder(
|
||||
$finder
|
||||
);
|
||||
|
||||
return $config;
|
||||
12
.php_cs.dist
12
.php_cs.dist
@@ -1,12 +0,0 @@
|
||||
<?php
|
||||
|
||||
$finder = PhpCsFixer\Finder::create()->in(__DIR__.'/src');
|
||||
|
||||
return PhpCsFixer\Config::create()
|
||||
->setRules([
|
||||
'@Symfony' => true,
|
||||
'declare_strict_types' => true,
|
||||
])
|
||||
->setFinder($finder)
|
||||
->setRiskyAllowed(true)
|
||||
;
|
||||
@@ -1,3 +1,6 @@
|
||||
# Version 5.3.0
|
||||
* Added auto update count processed item while running job
|
||||
|
||||
# Version 5.2.0
|
||||
* Added custom index for job status
|
||||
|
||||
|
||||
@@ -44,9 +44,7 @@ class CollectionWriterTest extends TestCase
|
||||
$embeddedWriter
|
||||
->expects($matcher)
|
||||
->method('write')
|
||||
->with($this->callback(function ($arg) use ($matcher, $values) {
|
||||
return $arg === $values[$matcher->numberOfInvocations() - 1];
|
||||
}))
|
||||
->with($this->callback(fn($arg) => $arg === $values[$matcher->numberOfInvocations() - 1]))
|
||||
;
|
||||
|
||||
$writer = new CollectionWriter($embeddedWriter);
|
||||
|
||||
@@ -11,7 +11,7 @@ class PortWriterAdapterTest extends TestCase
|
||||
{
|
||||
$value = 'not an array';
|
||||
|
||||
$writer = $this->getMockBuilder('\Port\Writer')
|
||||
$writer = $this->getMockBuilder(\Port\Writer::class)
|
||||
->onlyMethods(['prepare', 'finish', 'writeItem'])
|
||||
->getMock()
|
||||
;
|
||||
|
||||
@@ -49,15 +49,10 @@ class ScheduledDataflowManagerTest extends TestCase
|
||||
$this->jobRepository
|
||||
->expects($matcher)
|
||||
->method('findPendingForScheduledDataflow')
|
||||
->with($this->callback(function ($arg) use ($matcher, $scheduled1, $scheduled2) {
|
||||
switch ($matcher->numberOfInvocations()) {
|
||||
case 1:
|
||||
return $arg === $scheduled1;
|
||||
case 2:
|
||||
return $arg === $scheduled2;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
->with($this->callback(fn($arg) => match ($matcher->numberOfInvocations()) {
|
||||
1 => $arg === $scheduled1,
|
||||
2 => $arg === $scheduled2,
|
||||
default => false,
|
||||
}))
|
||||
->willReturnOnConsecutiveCalls(new Job(), null)
|
||||
;
|
||||
|
||||
@@ -44,18 +44,11 @@ class JobProcessorTest extends TestCase
|
||||
->expects($matcher)
|
||||
->method('dispatch')
|
||||
->with(
|
||||
$this->callback(function ($arg) use ($job) {
|
||||
return $arg instanceof ProcessingEvent && $arg->getJob() === $job;
|
||||
}),
|
||||
$this->callback(function ($arg) use ($matcher) {
|
||||
switch ($matcher->numberOfInvocations()) {
|
||||
case 1:
|
||||
return $arg === Events::BEFORE_PROCESSING;
|
||||
case 2:
|
||||
return $arg === Events::AFTER_PROCESSING;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
$this->callback(fn($arg) => $arg instanceof ProcessingEvent && $arg->getJob() === $job),
|
||||
$this->callback(fn($arg) => match ($matcher->numberOfInvocations()) {
|
||||
1 => $arg === Events::BEFORE_PROCESSING,
|
||||
2 => $arg === Events::AFTER_PROCESSING,
|
||||
default => false,
|
||||
})
|
||||
);
|
||||
|
||||
|
||||
@@ -39,15 +39,10 @@ class MessengerDataflowRunnerTest extends TestCase
|
||||
$this->repository
|
||||
->expects($matcher)
|
||||
->method('save')
|
||||
->with($this->callback(function ($arg) use ($matcher, $job1, $job2) {
|
||||
switch ($matcher->numberOfInvocations()) {
|
||||
case 1:
|
||||
return $arg === $job1;
|
||||
case 2:
|
||||
return $arg === $job2;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
->with($this->callback(fn($arg) => match ($matcher->numberOfInvocations()) {
|
||||
1 => $arg === $job1,
|
||||
2 => $arg === $job2,
|
||||
default => false,
|
||||
}))
|
||||
;
|
||||
|
||||
@@ -55,15 +50,10 @@ class MessengerDataflowRunnerTest extends TestCase
|
||||
$this->bus
|
||||
->expects($matcher)
|
||||
->method('dispatch')
|
||||
->with($this->callback(function ($arg) use ($matcher, $id1, $id2) {
|
||||
switch ($matcher->numberOfInvocations()) {
|
||||
case 1:
|
||||
return $arg instanceof JobMessage && $arg->getJobId() === $id1;
|
||||
case 2:
|
||||
return $arg instanceof JobMessage && $arg->getJobId() === $id2;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
->with($this->callback(fn($arg) => match ($matcher->numberOfInvocations()) {
|
||||
1 => $arg instanceof JobMessage && $arg->getJobId() === $id1,
|
||||
2 => $arg instanceof JobMessage && $arg->getJobId() === $id2,
|
||||
default => false,
|
||||
}))
|
||||
->willReturnOnConsecutiveCalls(
|
||||
new Envelope(new JobMessage($id1)),
|
||||
|
||||
@@ -38,15 +38,10 @@ class PendingDataflowRunnerTest extends TestCase
|
||||
$this->processor
|
||||
->expects($matcher)
|
||||
->method('process')
|
||||
->with($this->callback(function ($arg) use ($matcher, $job1, $job2) {
|
||||
switch ($matcher->numberOfInvocations()) {
|
||||
case 1:
|
||||
return $arg === $job1;
|
||||
case 2:
|
||||
return $arg === $job2;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
->with($this->callback(fn($arg) => match ($matcher->numberOfInvocations()) {
|
||||
1 => $arg === $job1,
|
||||
2 => $arg === $job2,
|
||||
default => false,
|
||||
}))
|
||||
;
|
||||
|
||||
|
||||
@@ -60,9 +60,10 @@
|
||||
},
|
||||
"require-dev": {
|
||||
"amphp/amp": "^2.5",
|
||||
"friendsofphp/php-cs-fixer": "^3.75",
|
||||
"phpunit/phpunit": "^11",
|
||||
"portphp/portphp": "^1.9",
|
||||
"rector/rector": "^1.0",
|
||||
"rector/rector": "^2.0",
|
||||
"symfony/messenger": "^7.0"
|
||||
},
|
||||
"suggest": {
|
||||
|
||||
@@ -17,7 +17,7 @@ return static function (RectorConfig $rectorConfig): void {
|
||||
$rectorConfig->rule(InlineConstructorDefaultToPropertyRector::class);
|
||||
|
||||
$rectorConfig->sets([
|
||||
SymfonySetList::SYMFONY_60,
|
||||
SymfonySetList::SYMFONY_70,
|
||||
SymfonySetList::SYMFONY_CODE_QUALITY,
|
||||
SymfonySetList::SYMFONY_CONSTRUCTOR_INJECTION,
|
||||
LevelSetList::UP_TO_PHP_80,
|
||||
|
||||
@@ -27,29 +27,27 @@ class AddScheduledDataflowCommand extends Command
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function configure(): void
|
||||
{
|
||||
$this
|
||||
->setHelp('The <info>%command.name%</info> allows you to create a new scheduled dataflow.')
|
||||
->addOption('label', null, InputOption::VALUE_REQUIRED, 'Label of the scheduled dataflow')
|
||||
->addOption('type', null, InputOption::VALUE_REQUIRED, 'Type of the scheduled dataflow (FQCN)')
|
||||
->addOption('options', null, InputOption::VALUE_OPTIONAL,
|
||||
'Options of the scheduled dataflow (ex: {"option1": "value1", "option2": "value2"})')
|
||||
->addOption(
|
||||
'options',
|
||||
null,
|
||||
InputOption::VALUE_OPTIONAL,
|
||||
'Options of the scheduled dataflow (ex: {"option1": "value1", "option2": "value2"})'
|
||||
)
|
||||
->addOption('frequency', null, InputOption::VALUE_REQUIRED, 'Frequency of the scheduled dataflow')
|
||||
->addOption('first_run', null, InputOption::VALUE_REQUIRED, 'Date for the first run of the scheduled dataflow (Y-m-d H:i:s)')
|
||||
->addOption('enabled', null, InputOption::VALUE_REQUIRED, 'State of the scheduled dataflow')
|
||||
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function execute(InputInterface $input, OutputInterface $output): int
|
||||
{
|
||||
if (null !== $input->getOption('connection')) {
|
||||
if ($input->getOption('connection') !== null) {
|
||||
$this->connectionFactory->setConnectionName($input->getOption('connection'));
|
||||
}
|
||||
$choices = [];
|
||||
@@ -71,13 +69,17 @@ class AddScheduledDataflowCommand extends Command
|
||||
}
|
||||
$options = $input->getOption('options');
|
||||
if (!$options) {
|
||||
$options = $io->ask('What are the launch options for the scheduled dataflow? (ex: {"option1": "value1", "option2": "value2"})',
|
||||
json_encode([]));
|
||||
$options = $io->ask(
|
||||
'What are the launch options for the scheduled dataflow? (ex: {"option1": "value1", "option2": "value2"})',
|
||||
json_encode([])
|
||||
);
|
||||
}
|
||||
$frequency = $input->getOption('frequency');
|
||||
if (!$frequency) {
|
||||
$frequency = $io->choice('What is the frequency for the scheduled dataflow?',
|
||||
ScheduledDataflow::AVAILABLE_FREQUENCIES);
|
||||
$frequency = $io->choice(
|
||||
'What is the frequency for the scheduled dataflow?',
|
||||
ScheduledDataflow::AVAILABLE_FREQUENCIES
|
||||
);
|
||||
}
|
||||
$firstRun = $input->getOption('first_run');
|
||||
if (!$firstRun) {
|
||||
@@ -92,22 +94,25 @@ class AddScheduledDataflowCommand extends Command
|
||||
'id' => null,
|
||||
'label' => $label,
|
||||
'dataflow_type' => $type,
|
||||
'options' => json_decode($options, true, 512, JSON_THROW_ON_ERROR),
|
||||
'options' => json_decode($options, true, 512, \JSON_THROW_ON_ERROR),
|
||||
'frequency' => $frequency,
|
||||
'next' => new \DateTime($firstRun),
|
||||
'enabled' => $enabled,
|
||||
]);
|
||||
|
||||
$errors = $this->validator->validate($newScheduledDataflow);
|
||||
if (count($errors) > 0) {
|
||||
if (\count($errors) > 0) {
|
||||
$io->error((string) $errors);
|
||||
|
||||
return 2;
|
||||
}
|
||||
|
||||
$this->scheduledDataflowRepository->save($newScheduledDataflow);
|
||||
$io->success(sprintf('New scheduled dataflow "%s" (id:%d) was created successfully.',
|
||||
$newScheduledDataflow->getLabel(), $newScheduledDataflow->getId()));
|
||||
$io->success(\sprintf(
|
||||
'New scheduled dataflow "%s" (id:%d) was created successfully.',
|
||||
$newScheduledDataflow->getLabel(),
|
||||
$newScheduledDataflow->getId()
|
||||
));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -26,9 +26,6 @@ class ChangeScheduleStatusCommand extends Command
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function configure(): void
|
||||
{
|
||||
$this
|
||||
@@ -39,12 +36,9 @@ class ChangeScheduleStatusCommand extends Command
|
||||
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function execute(InputInterface $input, OutputInterface $output): int
|
||||
{
|
||||
if (null !== $input->getOption('connection')) {
|
||||
if ($input->getOption('connection') !== null) {
|
||||
$this->connectionFactory->setConnectionName($input->getOption('connection'));
|
||||
}
|
||||
$io = new SymfonyStyle($input, $output);
|
||||
@@ -52,7 +46,7 @@ class ChangeScheduleStatusCommand extends Command
|
||||
$schedule = $this->scheduledDataflowRepository->find((int) $input->getArgument('schedule-id'));
|
||||
|
||||
if (!$schedule) {
|
||||
$io->error(sprintf('Cannot find scheduled dataflow with id "%d".', $input->getArgument('schedule-id')));
|
||||
$io->error(\sprintf('Cannot find scheduled dataflow with id "%d".', $input->getArgument('schedule-id')));
|
||||
|
||||
return 1;
|
||||
}
|
||||
@@ -71,9 +65,9 @@ class ChangeScheduleStatusCommand extends Command
|
||||
try {
|
||||
$schedule->setEnabled($input->getOption('enable'));
|
||||
$this->scheduledDataflowRepository->save($schedule);
|
||||
$io->success(sprintf('Schedule with id "%s" has been successfully updated.', $schedule->getId()));
|
||||
$io->success(\sprintf('Schedule with id "%s" has been successfully updated.', $schedule->getId()));
|
||||
} catch (\Exception $e) {
|
||||
$io->error(sprintf('An error occured when changing schedule status : "%s".', $e->getMessage()));
|
||||
$io->error(\sprintf('An error occured when changing schedule status : "%s".', $e->getMessage()));
|
||||
|
||||
return 4;
|
||||
}
|
||||
|
||||
@@ -27,9 +27,6 @@ class DatabaseSchemaCommand extends Command
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function configure(): void
|
||||
{
|
||||
$this
|
||||
@@ -43,7 +40,7 @@ class DatabaseSchemaCommand extends Command
|
||||
{
|
||||
$io = new SymfonyStyle($input, $output);
|
||||
|
||||
if (null !== $input->getOption('connection')) {
|
||||
if ($input->getOption('connection') !== null) {
|
||||
$this->connectionFactory->setConnectionName($input->getOption('connection'));
|
||||
}
|
||||
|
||||
@@ -61,7 +58,7 @@ class DatabaseSchemaCommand extends Command
|
||||
$tables = [];
|
||||
foreach ($sm->listTables() as $table) {
|
||||
/** @var Table $table */
|
||||
if (in_array($table->getName(), $tableArray)) {
|
||||
if (\in_array($table->getName(), $tableArray)) {
|
||||
$tables[] = $table;
|
||||
}
|
||||
}
|
||||
@@ -90,14 +87,14 @@ class DatabaseSchemaCommand extends Command
|
||||
if ($input->getOption('dump-sql')) {
|
||||
$io->text('Execute these SQL Queries on your database:');
|
||||
foreach ($sqls as $sql) {
|
||||
$io->text($sql . ';');
|
||||
$io->text($sql.';');
|
||||
}
|
||||
|
||||
return Command::SUCCESS;
|
||||
}
|
||||
|
||||
if (!$io->askQuestion(new ConfirmationQuestion('Are you sure to update database ?', true))) {
|
||||
$io->text("Execution canceled.");
|
||||
$io->text('Execution canceled.');
|
||||
|
||||
return Command::SUCCESS;
|
||||
}
|
||||
@@ -106,7 +103,7 @@ class DatabaseSchemaCommand extends Command
|
||||
$connection->executeQuery($sql);
|
||||
}
|
||||
|
||||
$io->success(sprintf('%d queries executed.', \count($sqls)));
|
||||
$io->success(\sprintf('%d queries executed.', \count($sqls)));
|
||||
|
||||
return parent::SUCCESS;
|
||||
}
|
||||
|
||||
@@ -4,8 +4,10 @@ declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Command;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\AutoUpdateCountInterface;
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
|
||||
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
|
||||
use Psr\Log\LoggerAwareInterface;
|
||||
use Psr\Log\LoggerAwareTrait;
|
||||
use Symfony\Component\Console\Attribute\AsCommand;
|
||||
@@ -26,18 +28,16 @@ class ExecuteDataflowCommand extends Command implements LoggerAwareInterface
|
||||
{
|
||||
use LoggerAwareTrait;
|
||||
|
||||
public function __construct(private DataflowTypeRegistryInterface $registry, private ConnectionFactory $connectionFactory)
|
||||
public function __construct(private DataflowTypeRegistryInterface $registry, private ConnectionFactory $connectionFactory, private JobRepository $jobRepository)
|
||||
{
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function configure(): void
|
||||
{
|
||||
$this
|
||||
->setHelp(<<<'EOF'
|
||||
->setHelp(
|
||||
<<<'EOF'
|
||||
The <info>%command.name%</info> command runs one dataflow with the provided options.
|
||||
|
||||
<info>php %command.full_name% App\Dataflow\MyDataflow '{"option1": "value1", "option2": "value2"}'</info>
|
||||
@@ -48,19 +48,20 @@ EOF
|
||||
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function execute(InputInterface $input, OutputInterface $output): int
|
||||
{
|
||||
if (null !== $input->getOption('connection')) {
|
||||
if ($input->getOption('connection') !== null) {
|
||||
$this->connectionFactory->setConnectionName($input->getOption('connection'));
|
||||
}
|
||||
$fqcnOrAlias = $input->getArgument('fqcn');
|
||||
$options = json_decode($input->getArgument('options'), true, 512, JSON_THROW_ON_ERROR);
|
||||
$options = json_decode($input->getArgument('options'), true, 512, \JSON_THROW_ON_ERROR);
|
||||
$io = new SymfonyStyle($input, $output);
|
||||
|
||||
$dataflowType = $this->registry->getDataflowType($fqcnOrAlias);
|
||||
if ($dataflowType instanceof AutoUpdateCountInterface) {
|
||||
$dataflowType->setRepository($this->jobRepository);
|
||||
}
|
||||
|
||||
if ($dataflowType instanceof LoggerAwareInterface && isset($this->logger)) {
|
||||
$dataflowType->setLogger($this->logger);
|
||||
}
|
||||
|
||||
@@ -31,9 +31,6 @@ class JobShowCommand extends Command
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function configure(): void
|
||||
{
|
||||
$this
|
||||
@@ -44,12 +41,9 @@ class JobShowCommand extends Command
|
||||
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function execute(InputInterface $input, OutputInterface $output): int
|
||||
{
|
||||
if (null !== $input->getOption('connection')) {
|
||||
if ($input->getOption('connection') !== null) {
|
||||
$this->connectionFactory->setConnectionName($input->getOption('connection'));
|
||||
}
|
||||
|
||||
@@ -73,7 +67,7 @@ class JobShowCommand extends Command
|
||||
return 2;
|
||||
}
|
||||
|
||||
if (null === $job) {
|
||||
if ($job === null) {
|
||||
$io->error('Cannot find job :/');
|
||||
|
||||
return 3;
|
||||
@@ -87,19 +81,19 @@ class JobShowCommand extends Command
|
||||
['Started at', $job->getStartTime() ? $job->getStartTime()->format('Y-m-d H:i:s') : '-'],
|
||||
['Ended at', $job->getEndTime() ? $job->getEndTime()->format('Y-m-d H:i:s') : '-'],
|
||||
['Object number', $job->getCount()],
|
||||
['Errors', count((array) $job->getExceptions())],
|
||||
['Errors', \count((array) $job->getExceptions())],
|
||||
['Status', $this->translateStatus($job->getStatus())],
|
||||
];
|
||||
if ($input->getOption('details')) {
|
||||
$display[] = ['Type', $job->getDataflowType()];
|
||||
$display[] = ['Options', json_encode($job->getOptions(), JSON_THROW_ON_ERROR)];
|
||||
$display[] = ['Options', json_encode($job->getOptions(), \JSON_THROW_ON_ERROR)];
|
||||
$io->section('Summary');
|
||||
}
|
||||
|
||||
$io->table(['Field', 'Value'], $display);
|
||||
if ($input->getOption('details')) {
|
||||
$io->section('Exceptions');
|
||||
$exceptions = array_map(fn(string $exception) => substr($exception, 0, 900).'…', $job->getExceptions());
|
||||
$exceptions = array_map(fn (string $exception) => substr($exception, 0, 900).'…', $job->getExceptions());
|
||||
|
||||
$io->write($exceptions);
|
||||
}
|
||||
|
||||
@@ -29,22 +29,17 @@ class RunPendingDataflowsCommand extends Command
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function configure(): void
|
||||
{
|
||||
$this
|
||||
->setHelp(<<<'EOF'
|
||||
->setHelp(
|
||||
<<<'EOF'
|
||||
The <info>%command.name%</info> command runs dataflows according to the schedule defined in the UI by the user.
|
||||
EOF
|
||||
)
|
||||
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function execute(InputInterface $input, OutputInterface $output): int
|
||||
{
|
||||
if (!$this->lock()) {
|
||||
@@ -53,7 +48,7 @@ EOF
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (null !== $input->getOption('connection')) {
|
||||
if ($input->getOption('connection') !== null) {
|
||||
$this->connectionFactory->setConnectionName($input->getOption('connection'));
|
||||
}
|
||||
|
||||
|
||||
@@ -24,9 +24,6 @@ class ScheduleListCommand extends Command
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function configure(): void
|
||||
{
|
||||
$this
|
||||
@@ -34,12 +31,9 @@ class ScheduleListCommand extends Command
|
||||
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function execute(InputInterface $input, OutputInterface $output): int
|
||||
{
|
||||
if (null !== $input->getOption('connection')) {
|
||||
if ($input->getOption('connection') !== null) {
|
||||
$this->connectionFactory->setConnectionName($input->getOption('connection'));
|
||||
}
|
||||
$io = new SymfonyStyle($input, $output);
|
||||
|
||||
@@ -4,13 +4,6 @@ declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Command;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
|
||||
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
|
||||
use CodeRhapsodie\DataflowBundle\SchemaProvider\DataflowSchemaProvider;
|
||||
use Doctrine\DBAL\Schema\Comparator;
|
||||
use Doctrine\DBAL\Schema\Schema;
|
||||
use Doctrine\DBAL\Schema\Table;
|
||||
use Symfony\Component\Console\Attribute\AsCommand;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\Console\Input\ArrayInput;
|
||||
@@ -21,14 +14,12 @@ use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
*
|
||||
* @deprecated This command is deprecated and will be removed in 6.0, use this command "code-rhapsodie:dataflow:database-schema" instead.
|
||||
*/
|
||||
#[AsCommand('code-rhapsodie:dataflow:dump-schema', 'Generates schema create / update SQL queries')]
|
||||
class SchemaCommand extends Command
|
||||
{
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function configure(): void
|
||||
{
|
||||
$this
|
||||
@@ -38,9 +29,6 @@ class SchemaCommand extends Command
|
||||
;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function execute(InputInterface $input, OutputInterface $output): int
|
||||
{
|
||||
$io = new SymfonyStyle($input, $output);
|
||||
@@ -48,9 +36,9 @@ class SchemaCommand extends Command
|
||||
|
||||
$options = array_filter($input->getOptions());
|
||||
|
||||
//add -- before each keys
|
||||
// add -- before each keys
|
||||
$options = array_combine(
|
||||
array_map(fn($key) => '--' . $key, array_keys($options)),
|
||||
array_map(fn ($key) => '--'.$key, array_keys($options)),
|
||||
array_values($options)
|
||||
);
|
||||
|
||||
@@ -58,7 +46,7 @@ class SchemaCommand extends Command
|
||||
|
||||
$inputArray = new ArrayInput([
|
||||
'command' => 'code-rhapsodie:dataflow:database-schema',
|
||||
...$options
|
||||
...$options,
|
||||
]);
|
||||
|
||||
return $this->getApplication()->doRun($inputArray, $output);
|
||||
|
||||
@@ -7,7 +7,6 @@ namespace CodeRhapsodie\DataflowBundle\DataflowType;
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\AMPAsyncDataflow;
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\DataflowInterface;
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
|
||||
use Symfony\Component\OptionsResolver\OptionsResolver;
|
||||
|
||||
class AMPAsyncDataflowBuilder extends DataflowBuilder
|
||||
{
|
||||
|
||||
@@ -4,15 +4,20 @@ declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\DataflowType;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
|
||||
use Psr\Log\LoggerAwareInterface;
|
||||
use Psr\Log\LoggerAwareTrait;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\OptionsResolver\OptionsResolver;
|
||||
|
||||
abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwareInterface
|
||||
abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwareInterface, AutoUpdateCountInterface
|
||||
{
|
||||
use LoggerAwareTrait;
|
||||
|
||||
private JobRepository $repository;
|
||||
|
||||
private ?\DateTime $saveDate = null;
|
||||
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
@@ -21,14 +26,24 @@ abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwar
|
||||
return [];
|
||||
}
|
||||
|
||||
public function process(array $options): Result
|
||||
public function process(array $options, ?int $jobId = null): Result
|
||||
{
|
||||
$this->saveDate = new \DateTime('+1 minute');
|
||||
|
||||
$optionsResolver = new OptionsResolver();
|
||||
$this->configureOptions($optionsResolver);
|
||||
$options = $optionsResolver->resolve($options);
|
||||
|
||||
$builder = $this->createDataflowBuilder();
|
||||
$builder->setName($this->getLabel());
|
||||
$builder->addAfterItemProcessor(function (int|string $index, mixed $item, int $count) use ($jobId) {
|
||||
if ($jobId === null || $this->saveDate > new \DateTime()) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->repository->updateCount($jobId, $count);
|
||||
$this->saveDate = new \DateTime('+1 minute');
|
||||
});
|
||||
$this->buildDataflow($builder, $options);
|
||||
$dataflow = $builder->getDataflow();
|
||||
if ($dataflow instanceof LoggerAwareInterface && $this->logger instanceof LoggerInterface) {
|
||||
@@ -38,6 +53,11 @@ abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwar
|
||||
return $dataflow->process();
|
||||
}
|
||||
|
||||
public function setRepository(JobRepository $repository): void
|
||||
{
|
||||
$this->repository = $repository;
|
||||
}
|
||||
|
||||
protected function createDataflowBuilder(): DataflowBuilder
|
||||
{
|
||||
return new DataflowBuilder();
|
||||
|
||||
12
src/DataflowType/AutoUpdateCountInterface.php
Normal file
12
src/DataflowType/AutoUpdateCountInterface.php
Normal file
@@ -0,0 +1,12 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\DataflowType;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
|
||||
|
||||
interface AutoUpdateCountInterface
|
||||
{
|
||||
public function setRepository(JobRepository $repository): void;
|
||||
}
|
||||
@@ -5,18 +5,19 @@ declare(strict_types=1);
|
||||
namespace CodeRhapsodie\DataflowBundle\DataflowType\Dataflow;
|
||||
|
||||
use function Amp\coroutine;
|
||||
|
||||
use Amp\Deferred;
|
||||
use Amp\Delayed;
|
||||
use Amp\Loop;
|
||||
use Amp\Producer;
|
||||
use Amp\Promise;
|
||||
|
||||
use function Amp\Promise\wait;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
|
||||
use Psr\Log\LoggerAwareInterface;
|
||||
use Psr\Log\LoggerAwareTrait;
|
||||
use RuntimeException;
|
||||
use Throwable;
|
||||
|
||||
class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
|
||||
{
|
||||
@@ -34,8 +35,8 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
|
||||
|
||||
public function __construct(private iterable $reader, private ?string $name, private ?int $loopInterval = 0, private ?int $emitInterval = 0)
|
||||
{
|
||||
if (!function_exists('Amp\\Promise\\wait')) {
|
||||
throw new RuntimeException('Amp is not loaded. Suggest install it with composer require amphp/amp');
|
||||
if (!\function_exists('Amp\\Promise\\wait')) {
|
||||
throw new \RuntimeException('Amp is not loaded. Suggest install it with composer require amphp/amp');
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,9 +62,6 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function process(): Result
|
||||
{
|
||||
$count = 0;
|
||||
@@ -76,7 +74,7 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
|
||||
}
|
||||
|
||||
$deferred = new Deferred();
|
||||
$resolved = false; //missing $deferred->isResolved() in version 2.5
|
||||
$resolved = false; // missing $deferred->isResolved() in version 2.5
|
||||
$producer = new Producer(function (callable $emit) {
|
||||
foreach ($this->reader as $index => $item) {
|
||||
yield new Delayed($this->emitInterval);
|
||||
@@ -89,7 +87,7 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
|
||||
$it = $producer->getCurrent();
|
||||
[$index, $item] = $it;
|
||||
$this->states[$index] = [$index, 0, $item];
|
||||
} elseif (!$resolved && 0 === count($this->states)) {
|
||||
} elseif (!$resolved && \count($this->states) === 0) {
|
||||
$resolved = true;
|
||||
$deferred->resolve();
|
||||
}
|
||||
@@ -120,20 +118,20 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
|
||||
private function processState(mixed $state, int &$count, array &$exceptions): void
|
||||
{
|
||||
[$readIndex, $stepIndex, $item] = $state;
|
||||
if ($stepIndex < count($this->steps)) {
|
||||
if ($stepIndex < \count($this->steps)) {
|
||||
if (!isset($this->stepsJobs[$stepIndex])) {
|
||||
$this->stepsJobs[$stepIndex] = [];
|
||||
}
|
||||
[$step, $scale] = $this->steps[$stepIndex];
|
||||
if ((is_countable($this->stepsJobs[$stepIndex]) ? count($this->stepsJobs[$stepIndex]) : 0) < $scale && !isset($this->stepsJobs[$stepIndex][$readIndex])) {
|
||||
if ((is_countable($this->stepsJobs[$stepIndex]) ? \count($this->stepsJobs[$stepIndex]) : 0) < $scale && !isset($this->stepsJobs[$stepIndex][$readIndex])) {
|
||||
$this->stepsJobs[$stepIndex][$readIndex] = true;
|
||||
/** @var Promise<void> $promise */
|
||||
$promise = coroutine($step)($item);
|
||||
$promise->onResolve(function (?Throwable $exception = null, $newItem = null) use ($stepIndex, $readIndex, &$exceptions) {
|
||||
$promise->onResolve(function (?\Throwable $exception = null, $newItem = null) use ($stepIndex, $readIndex, &$exceptions) {
|
||||
if ($exception) {
|
||||
$exceptions[$stepIndex] = $exception;
|
||||
$this->logException($exception, (string) $stepIndex);
|
||||
} elseif (false === $newItem) {
|
||||
} elseif ($newItem === false) {
|
||||
unset($this->states[$readIndex]);
|
||||
} else {
|
||||
$this->states[$readIndex] = [$readIndex, $stepIndex + 1, $newItem];
|
||||
@@ -153,7 +151,7 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
|
||||
}
|
||||
}
|
||||
|
||||
private function logException(Throwable $e, ?string $index = null): void
|
||||
private function logException(\Throwable $e, ?string $index = null): void
|
||||
{
|
||||
if (!isset($this->logger)) {
|
||||
return;
|
||||
|
||||
@@ -21,6 +21,13 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
|
||||
|
||||
private ?\Closure $customExceptionIndex = null;
|
||||
|
||||
private ?\DateTimeInterface $dateTime = null;
|
||||
|
||||
/**
|
||||
* @var \Closure[]
|
||||
*/
|
||||
private array $afterItemProcessors = [];
|
||||
|
||||
public function __construct(private iterable $reader, private ?string $name)
|
||||
{
|
||||
}
|
||||
@@ -56,8 +63,15 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
* @param array<callable> $processors
|
||||
*/
|
||||
public function setAfterItemProcessors(array $processors): self
|
||||
{
|
||||
$this->afterItemProcessors = array_map(fn (callable $callable) => \Closure::fromCallable($callable), $processors);
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function process(): Result
|
||||
{
|
||||
$count = 0;
|
||||
@@ -75,7 +89,7 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
|
||||
} catch (\Throwable $e) {
|
||||
$exceptionIndex = $index;
|
||||
try {
|
||||
if (is_callable($this->customExceptionIndex)) {
|
||||
if (\is_callable($this->customExceptionIndex)) {
|
||||
$exceptionIndex = (string) ($this->customExceptionIndex)($item, $index);
|
||||
}
|
||||
} catch (\Throwable $e2) {
|
||||
@@ -87,6 +101,10 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
|
||||
}
|
||||
|
||||
++$count;
|
||||
|
||||
foreach ($this->afterItemProcessors as $afterItemProcessor) {
|
||||
$afterItemProcessor($index, $item, $count);
|
||||
}
|
||||
}
|
||||
|
||||
foreach ($this->writers as $writer) {
|
||||
@@ -103,9 +121,9 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
|
||||
private function processItem(mixed $item): void
|
||||
{
|
||||
foreach ($this->steps as $step) {
|
||||
$item = call_user_func($step, $item);
|
||||
$item = \call_user_func($step, $item);
|
||||
|
||||
if (false === $item) {
|
||||
if ($item === false) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,10 @@ class DataflowBuilder
|
||||
private array $writers = [];
|
||||
|
||||
private ?\Closure $customExceptionIndex = null;
|
||||
/**
|
||||
* @var \Closure[]
|
||||
*/
|
||||
private array $afterItemProcessors = [];
|
||||
|
||||
public function setName(string $name): self
|
||||
{
|
||||
@@ -54,6 +58,13 @@ class DataflowBuilder
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function addAfterItemProcessor(callable $callable): self
|
||||
{
|
||||
$this->afterItemProcessors[] = \Closure::fromCallable($callable);
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function getDataflow(): DataflowInterface
|
||||
{
|
||||
$dataflow = new Dataflow($this->reader, $this->name);
|
||||
@@ -69,10 +80,12 @@ class DataflowBuilder
|
||||
$dataflow->addWriter($writer);
|
||||
}
|
||||
|
||||
if (is_callable($this->customExceptionIndex)) {
|
||||
if (\is_callable($this->customExceptionIndex)) {
|
||||
$dataflow->setCustomExceptionIndex($this->customExceptionIndex);
|
||||
}
|
||||
|
||||
$dataflow->setAfterItemProcessors($this->afterItemProcessors);
|
||||
|
||||
return $dataflow;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,5 +10,5 @@ interface DataflowTypeInterface
|
||||
|
||||
public function getAliases(): iterable;
|
||||
|
||||
public function process(array $options): Result;
|
||||
public function process(array $options, ?int $jobId = null): Result;
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ class Result
|
||||
public function __construct(private string $name, private \DateTimeInterface $startTime, private \DateTimeInterface $endTime, private int $totalProcessedCount, array $exceptions)
|
||||
{
|
||||
$this->elapsed = $startTime->diff($endTime);
|
||||
$this->errorCount = count($exceptions);
|
||||
$this->errorCount = \count($exceptions);
|
||||
$this->successCount = $totalProcessedCount - $this->errorCount;
|
||||
$this->exceptions = $exceptions;
|
||||
}
|
||||
|
||||
@@ -18,21 +18,15 @@ class CollectionWriter implements DelegateWriterInterface
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function prepare()
|
||||
{
|
||||
$this->writer->prepare();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function write($collection)
|
||||
{
|
||||
if (!is_iterable($collection)) {
|
||||
throw new UnsupportedItemTypeException(sprintf('Item to write was expected to be an iterable, received %s.', get_debug_type($collection)));
|
||||
throw new UnsupportedItemTypeException(\sprintf('Item to write was expected to be an iterable, received %s.', get_debug_type($collection)));
|
||||
}
|
||||
|
||||
foreach ($collection as $item) {
|
||||
@@ -40,17 +34,11 @@ class CollectionWriter implements DelegateWriterInterface
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function finish()
|
||||
{
|
||||
$this->writer->finish();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function supports($item): bool
|
||||
{
|
||||
return is_iterable($item);
|
||||
|
||||
@@ -11,8 +11,6 @@ interface DelegateWriterInterface extends WriterInterface
|
||||
{
|
||||
/**
|
||||
* Returns true if the argument is of a supported type.
|
||||
*
|
||||
* @param $item
|
||||
*/
|
||||
public function supports($item): bool;
|
||||
}
|
||||
|
||||
@@ -21,9 +21,6 @@ class DelegatorWriter implements DelegateWriterInterface
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function prepare()
|
||||
{
|
||||
foreach ($this->delegates as $delegate) {
|
||||
@@ -31,9 +28,6 @@ class DelegatorWriter implements DelegateWriterInterface
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function write($item)
|
||||
{
|
||||
foreach ($this->delegates as $delegate) {
|
||||
@@ -46,12 +40,9 @@ class DelegatorWriter implements DelegateWriterInterface
|
||||
return;
|
||||
}
|
||||
|
||||
throw new UnsupportedItemTypeException(sprintf('None of the registered delegate writers support the received item of type %s', get_debug_type($item)));
|
||||
throw new UnsupportedItemTypeException(\sprintf('None of the registered delegate writers support the received item of type %s', get_debug_type($item)));
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function finish()
|
||||
{
|
||||
foreach ($this->delegates as $delegate) {
|
||||
@@ -59,9 +50,6 @@ class DelegatorWriter implements DelegateWriterInterface
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function supports($item): bool
|
||||
{
|
||||
foreach ($this->delegates as $delegate) {
|
||||
|
||||
@@ -12,9 +12,6 @@ use Symfony\Component\DependencyInjection\Reference;
|
||||
|
||||
class BusCompilerPass implements CompilerPassInterface
|
||||
{
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function process(ContainerBuilder $container)
|
||||
{
|
||||
if (!$container->hasParameter('coderhapsodie.dataflow.bus')) {
|
||||
@@ -23,7 +20,7 @@ class BusCompilerPass implements CompilerPassInterface
|
||||
|
||||
$bus = $container->getParameter('coderhapsodie.dataflow.bus');
|
||||
if (!$container->has($bus)) {
|
||||
throw new InvalidArgumentException(sprintf('Service "%s" not found', $bus));
|
||||
throw new InvalidArgumentException(\sprintf('Service "%s" not found', $bus));
|
||||
}
|
||||
|
||||
if (!$container->has(MessengerDataflowRunner::class)) {
|
||||
|
||||
@@ -16,9 +16,6 @@ use Symfony\Component\DependencyInjection\Reference;
|
||||
*/
|
||||
class DataflowTypeCompilerPass implements CompilerPassInterface
|
||||
{
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function process(ContainerBuilder $container)
|
||||
{
|
||||
if (!$container->has(DataflowTypeRegistry::class)) {
|
||||
|
||||
@@ -12,9 +12,6 @@ use Symfony\Component\DependencyInjection\Reference;
|
||||
|
||||
class DefaultLoggerCompilerPass implements CompilerPassInterface
|
||||
{
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function process(ContainerBuilder $container)
|
||||
{
|
||||
$defaultLogger = $container->getParameter('coderhapsodie.dataflow.default_logger');
|
||||
|
||||
@@ -10,7 +10,7 @@ use Symfony\Component\Messenger\MessageBusInterface;
|
||||
|
||||
class Configuration implements ConfigurationInterface
|
||||
{
|
||||
public function getConfigTreeBuilder(): \Symfony\Component\Config\Definition\Builder\TreeBuilder
|
||||
public function getConfigTreeBuilder(): TreeBuilder
|
||||
{
|
||||
$treeBuilder = new TreeBuilder('code_rhapsodie_dataflow');
|
||||
$rootNode = $treeBuilder->getRootNode();
|
||||
@@ -34,7 +34,7 @@ class Configuration implements ConfigurationInterface
|
||||
->end()
|
||||
->end()
|
||||
->validate()
|
||||
->ifTrue(static fn($v): bool => $v['enabled'] && !interface_exists(MessageBusInterface::class))
|
||||
->ifTrue(static fn ($v): bool => $v['enabled'] && !interface_exists(MessageBusInterface::class))
|
||||
->thenInvalid('You need "symfony/messenger" in order to use Dataflow messenger mode.')
|
||||
->end()
|
||||
->end()
|
||||
|
||||
@@ -75,19 +75,19 @@ class Job
|
||||
public static function createFromArray(array $datas)
|
||||
{
|
||||
$lost = array_diff(static::KEYS, array_keys($datas));
|
||||
if (count($lost) > 0) {
|
||||
if (\count($lost) > 0) {
|
||||
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ', $lost).'"');
|
||||
}
|
||||
|
||||
$job = new self();
|
||||
$job->id = null === $datas['id'] ? null : (int) $datas['id'];
|
||||
$job->setStatus(null === $datas['status'] ? null : (int) $datas['status']);
|
||||
$job->id = $datas['id'] === null ? null : (int) $datas['id'];
|
||||
$job->setStatus($datas['status'] === null ? null : (int) $datas['status']);
|
||||
$job->setLabel($datas['label']);
|
||||
$job->setDataflowType($datas['dataflow_type']);
|
||||
$job->setOptions($datas['options']);
|
||||
$job->setRequestedDate($datas['requested_date']);
|
||||
$job->setScheduledDataflowId(null === $datas['scheduled_dataflow_id'] ? null : (int) $datas['scheduled_dataflow_id']);
|
||||
$job->setCount(null === $datas['count'] ? null : (int) $datas['count']);
|
||||
$job->setScheduledDataflowId($datas['scheduled_dataflow_id'] === null ? null : (int) $datas['scheduled_dataflow_id']);
|
||||
$job->setCount($datas['count'] === null ? null : (int) $datas['count']);
|
||||
$job->setExceptions($datas['exceptions']);
|
||||
$job->setStartTime($datas['start_time']);
|
||||
$job->setEndTime($datas['end_time']);
|
||||
@@ -112,7 +112,7 @@ class Job
|
||||
];
|
||||
}
|
||||
|
||||
public function setId(int $id): Job
|
||||
public function setId(int $id): self
|
||||
{
|
||||
$this->id = $id;
|
||||
|
||||
@@ -129,7 +129,7 @@ class Job
|
||||
return $this->status;
|
||||
}
|
||||
|
||||
public function setStatus(int $status): Job
|
||||
public function setStatus(int $status): self
|
||||
{
|
||||
$this->status = $status;
|
||||
|
||||
@@ -141,7 +141,7 @@ class Job
|
||||
return $this->label;
|
||||
}
|
||||
|
||||
public function setLabel(?string $label): Job
|
||||
public function setLabel(?string $label): self
|
||||
{
|
||||
$this->label = $label;
|
||||
|
||||
@@ -153,7 +153,7 @@ class Job
|
||||
return $this->dataflowType;
|
||||
}
|
||||
|
||||
public function setDataflowType(?string $dataflowType): Job
|
||||
public function setDataflowType(?string $dataflowType): self
|
||||
{
|
||||
$this->dataflowType = $dataflowType;
|
||||
|
||||
@@ -165,7 +165,7 @@ class Job
|
||||
return $this->options;
|
||||
}
|
||||
|
||||
public function setOptions(?array $options): Job
|
||||
public function setOptions(?array $options): self
|
||||
{
|
||||
$this->options = $options;
|
||||
|
||||
@@ -177,7 +177,7 @@ class Job
|
||||
return $this->requestedDate;
|
||||
}
|
||||
|
||||
public function setRequestedDate(?\DateTimeInterface $requestedDate): Job
|
||||
public function setRequestedDate(?\DateTimeInterface $requestedDate): self
|
||||
{
|
||||
$this->requestedDate = $requestedDate;
|
||||
|
||||
@@ -189,7 +189,7 @@ class Job
|
||||
return $this->scheduledDataflowId;
|
||||
}
|
||||
|
||||
public function setScheduledDataflowId(?int $scheduledDataflowId): Job
|
||||
public function setScheduledDataflowId(?int $scheduledDataflowId): self
|
||||
{
|
||||
$this->scheduledDataflowId = $scheduledDataflowId;
|
||||
|
||||
@@ -201,7 +201,7 @@ class Job
|
||||
return $this->count;
|
||||
}
|
||||
|
||||
public function setCount(?int $count): Job
|
||||
public function setCount(?int $count): self
|
||||
{
|
||||
$this->count = $count;
|
||||
|
||||
@@ -213,7 +213,7 @@ class Job
|
||||
return $this->exceptions;
|
||||
}
|
||||
|
||||
public function setExceptions(?array $exceptions): Job
|
||||
public function setExceptions(?array $exceptions): self
|
||||
{
|
||||
$this->exceptions = $exceptions;
|
||||
|
||||
@@ -225,7 +225,7 @@ class Job
|
||||
return $this->startTime;
|
||||
}
|
||||
|
||||
public function setStartTime(?\DateTimeInterface $startTime): Job
|
||||
public function setStartTime(?\DateTimeInterface $startTime): self
|
||||
{
|
||||
$this->startTime = $startTime;
|
||||
|
||||
@@ -237,7 +237,7 @@ class Job
|
||||
return $this->endTime;
|
||||
}
|
||||
|
||||
public function setEndTime(?\DateTimeInterface $endTime): Job
|
||||
public function setEndTime(?\DateTimeInterface $endTime): self
|
||||
{
|
||||
$this->endTime = $endTime;
|
||||
|
||||
|
||||
@@ -50,19 +50,19 @@ class ScheduledDataflow
|
||||
public static function createFromArray(array $datas)
|
||||
{
|
||||
$lost = array_diff(static::KEYS, array_keys($datas));
|
||||
if (count($lost) > 0) {
|
||||
if (\count($lost) > 0) {
|
||||
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ', $lost).'"');
|
||||
}
|
||||
|
||||
$scheduledDataflow = new self();
|
||||
$scheduledDataflow->id = null === $datas['id'] ? null : (int) $datas['id'];
|
||||
$scheduledDataflow->id = $datas['id'] === null ? null : (int) $datas['id'];
|
||||
|
||||
$scheduledDataflow->setLabel($datas['label']);
|
||||
$scheduledDataflow->setDataflowType($datas['dataflow_type']);
|
||||
$scheduledDataflow->setOptions($datas['options']);
|
||||
$scheduledDataflow->setFrequency($datas['frequency']);
|
||||
$scheduledDataflow->setNext($datas['next']);
|
||||
$scheduledDataflow->setEnabled(null === $datas['enabled'] ? null : (bool) $datas['enabled']);
|
||||
$scheduledDataflow->setEnabled($datas['enabled'] === null ? null : (bool) $datas['enabled']);
|
||||
|
||||
return $scheduledDataflow;
|
||||
}
|
||||
@@ -80,7 +80,7 @@ class ScheduledDataflow
|
||||
];
|
||||
}
|
||||
|
||||
public function setId(int $id): ScheduledDataflow
|
||||
public function setId(int $id): self
|
||||
{
|
||||
$this->id = $id;
|
||||
|
||||
@@ -97,7 +97,7 @@ class ScheduledDataflow
|
||||
return $this->label;
|
||||
}
|
||||
|
||||
public function setLabel(?string $label): ScheduledDataflow
|
||||
public function setLabel(?string $label): self
|
||||
{
|
||||
$this->label = $label;
|
||||
|
||||
@@ -109,7 +109,7 @@ class ScheduledDataflow
|
||||
return $this->dataflowType;
|
||||
}
|
||||
|
||||
public function setDataflowType(?string $dataflowType): ScheduledDataflow
|
||||
public function setDataflowType(?string $dataflowType): self
|
||||
{
|
||||
$this->dataflowType = $dataflowType;
|
||||
|
||||
@@ -121,7 +121,7 @@ class ScheduledDataflow
|
||||
return $this->options;
|
||||
}
|
||||
|
||||
public function setOptions(?array $options): ScheduledDataflow
|
||||
public function setOptions(?array $options): self
|
||||
{
|
||||
$this->options = $options;
|
||||
|
||||
@@ -133,7 +133,7 @@ class ScheduledDataflow
|
||||
return $this->frequency;
|
||||
}
|
||||
|
||||
public function setFrequency(?string $frequency): ScheduledDataflow
|
||||
public function setFrequency(?string $frequency): self
|
||||
{
|
||||
$this->frequency = $frequency;
|
||||
|
||||
@@ -145,7 +145,7 @@ class ScheduledDataflow
|
||||
return $this->next;
|
||||
}
|
||||
|
||||
public function setNext(?\DateTimeInterface $next): ScheduledDataflow
|
||||
public function setNext(?\DateTimeInterface $next): self
|
||||
{
|
||||
$this->next = $next;
|
||||
|
||||
@@ -157,7 +157,7 @@ class ScheduledDataflow
|
||||
return $this->enabled;
|
||||
}
|
||||
|
||||
public function setEnabled(?bool $enabled): ScheduledDataflow
|
||||
public function setEnabled(?bool $enabled): self
|
||||
{
|
||||
$this->enabled = $enabled;
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ class UnknownDataflowTypeException extends \Exception
|
||||
{
|
||||
public static function create(string $aliasOrFqcn, array $knownDataflowTypes): self
|
||||
{
|
||||
return new self(sprintf(
|
||||
return new self(\sprintf(
|
||||
'Unknown dataflow type FQCN or alias "%s". Registered dataflow types FQCN and aliases are %s.',
|
||||
$aliasOrFqcn,
|
||||
implode(', ', $knownDataflowTypes)
|
||||
|
||||
@@ -24,6 +24,6 @@ class ConnectionFactory
|
||||
|
||||
public function getConnection(): \Doctrine\DBAL\Connection
|
||||
{
|
||||
return $this->container->get(sprintf('doctrine.dbal.%s_connection', $this->connectionName));
|
||||
return $this->container->get(\sprintf('doctrine.dbal.%s_connection', $this->connectionName));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ namespace CodeRhapsodie\DataflowBundle\Logger;
|
||||
use Monolog\Formatter\FormatterInterface;
|
||||
use Monolog\Formatter\LineFormatter;
|
||||
use Monolog\Handler\AbstractProcessingHandler;
|
||||
use Monolog\Logger;
|
||||
use Monolog\LogRecord;
|
||||
|
||||
class BufferHandler extends AbstractProcessingHandler
|
||||
|
||||
@@ -16,7 +16,7 @@ final class DelegatingLogger extends AbstractLogger
|
||||
{
|
||||
foreach ($loggers as $logger) {
|
||||
if (!$logger instanceof LoggerInterface) {
|
||||
throw new \InvalidArgumentException(sprintf('Only instances of %s should be passed to the constructor of %s. An instance of %s was passed instead.', LoggerInterface::class, self::class, $logger::class));
|
||||
throw new \InvalidArgumentException(\sprintf('Only instances of %s should be passed to the constructor of %s. An instance of %s was passed instead.', LoggerInterface::class, self::class, $logger::class));
|
||||
}
|
||||
|
||||
$this->loggers[] = $logger;
|
||||
|
||||
@@ -19,15 +19,12 @@ class ScheduledDataflowManager implements ScheduledDataflowManagerInterface
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function createJobsFromScheduledDataflows(): void
|
||||
{
|
||||
$this->connection->beginTransaction();
|
||||
try {
|
||||
foreach ($this->scheduledDataflowRepository->findReadyToRun() as $scheduled) {
|
||||
if (null !== $this->jobRepository->findPendingForScheduledDataflow($scheduled)) {
|
||||
if ($this->jobRepository->findPendingForScheduledDataflow($scheduled) !== null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Processor;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\RepositoryInterface;
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
|
||||
use CodeRhapsodie\DataflowBundle\Entity\Job;
|
||||
use CodeRhapsodie\DataflowBundle\Event\Events;
|
||||
@@ -30,6 +31,10 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
|
||||
$this->beforeProcessing($job);
|
||||
|
||||
$dataflowType = $this->registry->getDataflowType($job->getDataflowType());
|
||||
if ($dataflowType instanceof RepositoryInterface) {
|
||||
$dataflowType->setRepository($this->repository);
|
||||
}
|
||||
|
||||
$loggers = [new Logger('dataflow_internal', [$bufferHandler = new BufferHandler()])];
|
||||
if (isset($this->logger)) {
|
||||
$loggers[] = $this->logger;
|
||||
@@ -40,7 +45,7 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
|
||||
$dataflowType->setLogger($logger);
|
||||
}
|
||||
|
||||
$result = $dataflowType->process($job->getOptions());
|
||||
$result = $dataflowType->process($job->getOptions(), $job->getId());
|
||||
|
||||
if (!$dataflowType instanceof LoggerAwareInterface) {
|
||||
foreach ($result->getExceptions() as $index => $e) {
|
||||
|
||||
@@ -18,9 +18,6 @@ class DataflowTypeRegistry implements DataflowTypeRegistryInterface
|
||||
/** @var array|DataflowTypeInterface[] */
|
||||
private array $aliasesRegistry = [];
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getDataflowType(string $fqcnOrAlias): DataflowTypeInterface
|
||||
{
|
||||
if (isset($this->fqcnRegistry[$fqcnOrAlias])) {
|
||||
@@ -34,17 +31,11 @@ class DataflowTypeRegistry implements DataflowTypeRegistryInterface
|
||||
throw UnknownDataflowTypeException::create($fqcnOrAlias, [...array_keys($this->fqcnRegistry), ...array_keys($this->aliasesRegistry)]);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function listDataflowTypes(): iterable
|
||||
{
|
||||
return $this->fqcnRegistry;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function registerDataflowType(DataflowTypeInterface $dataflowType): void
|
||||
{
|
||||
$this->fqcnRegistry[$dataflowType::class] = $dataflowType;
|
||||
|
||||
@@ -14,7 +14,7 @@ trait InitFromDbTrait
|
||||
private function initDateTime(array $datas): array
|
||||
{
|
||||
foreach ($this->getFields() as $key => $type) {
|
||||
if ('datetime' === $type && null !== $datas[$key]) {
|
||||
if ($type === 'datetime' && $datas[$key] !== null) {
|
||||
$datas[$key] = new \DateTime($datas[$key]);
|
||||
}
|
||||
}
|
||||
@@ -24,10 +24,10 @@ trait InitFromDbTrait
|
||||
|
||||
private function initArray(array $datas): array
|
||||
{
|
||||
if (!is_array($datas['options'])) {
|
||||
if (!\is_array($datas['options'])) {
|
||||
$datas['options'] = $this->strToArray($datas['options']);
|
||||
}
|
||||
if (array_key_exists('exceptions', $datas) && !is_array($datas['exceptions'])) {
|
||||
if (\array_key_exists('exceptions', $datas) && !\is_array($datas['exceptions'])) {
|
||||
$datas['exceptions'] = $this->strToArray($datas['exceptions']);
|
||||
}
|
||||
|
||||
@@ -36,12 +36,12 @@ trait InitFromDbTrait
|
||||
|
||||
private function strToArray($value): array
|
||||
{
|
||||
if (null === $value) {
|
||||
if ($value === null) {
|
||||
return [];
|
||||
}
|
||||
|
||||
$array = json_decode($value, true, 512, JSON_THROW_ON_ERROR);
|
||||
$array = json_decode($value, true, 512, \JSON_THROW_ON_ERROR);
|
||||
|
||||
return (false === $array) ? [] : $array;
|
||||
return ($array === false) ? [] : $array;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ class JobRepository
|
||||
->andWhere($qb->expr()->isNull('scheduled_dataflow_id'))
|
||||
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, ParameterType::INTEGER)));
|
||||
$stmt = $qb->executeQuery();
|
||||
if (0 === $stmt->rowCount()) {
|
||||
if ($stmt->rowCount() === 0) {
|
||||
return [];
|
||||
}
|
||||
while (false !== ($row = $stmt->fetchAssociative())) {
|
||||
@@ -93,7 +93,7 @@ class JobRepository
|
||||
->orderBy('requested_date', 'DESC')
|
||||
->setMaxResults(20);
|
||||
$stmt = $qb->executeQuery();
|
||||
if (0 === $stmt->rowCount()) {
|
||||
if ($stmt->rowCount() === 0) {
|
||||
return [];
|
||||
}
|
||||
while (false !== ($row = $stmt->fetchAssociative())) {
|
||||
@@ -108,7 +108,7 @@ class JobRepository
|
||||
->orderBy('requested_date', 'DESC')
|
||||
->setMaxResults(20);
|
||||
$stmt = $qb->executeQuery();
|
||||
if (0 === $stmt->rowCount()) {
|
||||
if ($stmt->rowCount() === 0) {
|
||||
return [];
|
||||
}
|
||||
while (false !== ($row = $stmt->fetchAssociative())) {
|
||||
@@ -121,14 +121,14 @@ class JobRepository
|
||||
$datas = $job->toArray();
|
||||
unset($datas['id']);
|
||||
|
||||
if (is_array($datas['options'])) {
|
||||
$datas['options'] = json_encode($datas['options'], JSON_THROW_ON_ERROR);
|
||||
if (\is_array($datas['options'])) {
|
||||
$datas['options'] = json_encode($datas['options'], \JSON_THROW_ON_ERROR);
|
||||
}
|
||||
if (is_array($datas['exceptions'])) {
|
||||
$datas['exceptions'] = json_encode($datas['exceptions'], JSON_THROW_ON_ERROR);
|
||||
if (\is_array($datas['exceptions'])) {
|
||||
$datas['exceptions'] = json_encode($datas['exceptions'], \JSON_THROW_ON_ERROR);
|
||||
}
|
||||
|
||||
if (null === $job->getId()) {
|
||||
if ($job->getId() === null) {
|
||||
$this->connection->insert(static::TABLE_NAME, $datas, $this->getFields());
|
||||
$job->setId((int) $this->connection->lastInsertId());
|
||||
|
||||
@@ -137,6 +137,11 @@ class JobRepository
|
||||
$this->connection->update(static::TABLE_NAME, $datas, ['id' => $job->getId()], $this->getFields());
|
||||
}
|
||||
|
||||
public function updateCount(int $jobId, int $count): void
|
||||
{
|
||||
$this->connection->update(static::TABLE_NAME, ['count' => $count], ['id' => $jobId]);
|
||||
}
|
||||
|
||||
public function createQueryBuilder($alias = null): QueryBuilder
|
||||
{
|
||||
$qb = $this->connection->createQueryBuilder();
|
||||
@@ -149,7 +154,7 @@ class JobRepository
|
||||
private function returnFirstOrNull(QueryBuilder $qb): ?Job
|
||||
{
|
||||
$stmt = $qb->executeQuery();
|
||||
if (0 === $stmt->rowCount()) {
|
||||
if ($stmt->rowCount() === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ class ScheduledDataflowRepository
|
||||
;
|
||||
|
||||
$stmt = $qb->executeQuery();
|
||||
if (0 === $stmt->rowCount()) {
|
||||
if ($stmt->rowCount() === 0) {
|
||||
return [];
|
||||
}
|
||||
while (false !== ($row = $stmt->fetchAssociative())) {
|
||||
@@ -65,7 +65,7 @@ class ScheduledDataflowRepository
|
||||
$qb->orderBy('label', 'ASC');
|
||||
|
||||
$stmt = $qb->executeQuery();
|
||||
if (0 === $stmt->rowCount()) {
|
||||
if ($stmt->rowCount() === 0) {
|
||||
return [];
|
||||
}
|
||||
while (false !== ($row = $stmt->fetchAssociative())) {
|
||||
@@ -90,11 +90,11 @@ class ScheduledDataflowRepository
|
||||
$datas = $scheduledDataflow->toArray();
|
||||
unset($datas['id']);
|
||||
|
||||
if (is_array($datas['options'])) {
|
||||
$datas['options'] = json_encode($datas['options'], JSON_THROW_ON_ERROR);
|
||||
if (\is_array($datas['options'])) {
|
||||
$datas['options'] = json_encode($datas['options'], \JSON_THROW_ON_ERROR);
|
||||
}
|
||||
|
||||
if (null === $scheduledDataflow->getId()) {
|
||||
if ($scheduledDataflow->getId() === null) {
|
||||
$this->connection->insert(static::TABLE_NAME, $datas, $this->getFields());
|
||||
$scheduledDataflow->setId((int) $this->connection->lastInsertId());
|
||||
|
||||
@@ -129,7 +129,7 @@ class ScheduledDataflowRepository
|
||||
private function returnFirstOrNull(QueryBuilder $qb): ?ScheduledDataflow
|
||||
{
|
||||
$stmt = $qb->executeQuery();
|
||||
if (0 === $stmt->rowCount()) {
|
||||
if ($stmt->rowCount() === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ services:
|
||||
arguments:
|
||||
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
|
||||
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
|
||||
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
|
||||
tags: ['console.command']
|
||||
|
||||
CodeRhapsodie\DataflowBundle\Command\JobShowCommand:
|
||||
|
||||
@@ -13,9 +13,6 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function runPendingDataflows(): void
|
||||
{
|
||||
while (null !== ($job = $this->repository->findNextPendingDataflow())) {
|
||||
|
||||
@@ -10,22 +10,19 @@ use Symfony\Component\Validator\Exception\UnexpectedTypeException;
|
||||
|
||||
class FrequencyValidator extends ConstraintValidator
|
||||
{
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function validate(mixed $value, Constraint $constraint)
|
||||
{
|
||||
if (!$constraint instanceof Frequency) {
|
||||
throw new UnexpectedTypeException($constraint, Frequency::class);
|
||||
}
|
||||
|
||||
if (null === $value) {
|
||||
if ($value === null) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
$interval = \DateInterval::createFromDateString($value);
|
||||
} catch (\Exception){
|
||||
} catch (\Exception) {
|
||||
$interval = false;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user