mirror of
https://github.com/code-rhapsodie/dataflow-bundle.git
synced 2026-03-24 06:42:23 +01:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b191e33c47 | ||
|
|
8bb55b1303 | ||
|
|
f0459462f7 | ||
|
|
5a76c11bc6 | ||
|
|
d7efd85c8e | ||
|
|
b0d17c31cc | ||
|
|
e72d0d5e8d |
73
.travis.yml
73
.travis.yml
@@ -23,66 +23,76 @@ env:
|
||||
matrix:
|
||||
fast_finish: true
|
||||
include:
|
||||
- php: '7.1'
|
||||
- php: '7.2'
|
||||
- php: '7.3'
|
||||
- php: '7.4'
|
||||
- php: '8.0'
|
||||
|
||||
# Enable code coverage with the previous supported PHP version
|
||||
- php: '7.4'
|
||||
env:
|
||||
- SYMFONY_VERSION=3.4.*
|
||||
- COVERALLS_ENABLED="true"
|
||||
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
|
||||
|
||||
# Enable code coverage with the latest supported PHP version
|
||||
- php: '7.3'
|
||||
- php: '8.0'
|
||||
env:
|
||||
- SYMFONY_VERSION=3.4.*
|
||||
- COVERALLS_ENABLED="true"
|
||||
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
|
||||
|
||||
# Minimum supported dependencies with the latest and oldest supported PHP versions
|
||||
- php: '7.1'
|
||||
env:
|
||||
- COMPOSER_FLAGS="--prefer-lowest"
|
||||
- php: '7.3'
|
||||
env:
|
||||
- COMPOSER_FLAGS="--prefer-lowest"
|
||||
# Incompatibility between lowest symfony testing utils and phpunit
|
||||
# - php: '8.0'
|
||||
# env:
|
||||
# - COMPOSER_FLAGS="--prefer-lowest"
|
||||
|
||||
# Test each supported Symfony version with lowest supported PHP version
|
||||
- php: '7.1'
|
||||
- php: '7.3'
|
||||
env:
|
||||
- SYMFONY_VERSION=3.4.*
|
||||
- php: '7.1'
|
||||
env:
|
||||
- SYMFONY_VERSION=4.3.*
|
||||
- php: '7.1'
|
||||
- php: '7.3'
|
||||
env:
|
||||
- SYMFONY_VERSION=4.4.*
|
||||
- php: '7.2'
|
||||
- php: '7.3'
|
||||
env:
|
||||
- COVERALLS_ENABLED="true"
|
||||
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
|
||||
- SYMFONY_VERSION=5.0.*
|
||||
|
||||
- SYMFONY_VERSION=5.2.*
|
||||
# Test unsupported versions of Symfony
|
||||
- php: '7.1'
|
||||
env:
|
||||
- SYMFONY_VERSION=4.0.*
|
||||
- php: '7.1'
|
||||
- php: '7.3'
|
||||
env:
|
||||
- SYMFONY_VERSION=4.1.*
|
||||
- php: '7.1'
|
||||
- php: '7.3'
|
||||
env:
|
||||
- SYMFONY_VERSION=4.2.*
|
||||
|
||||
- php: '7.3'
|
||||
env:
|
||||
- SYMFONY_VERSION=4.3.*
|
||||
- php: '7.3'
|
||||
env:
|
||||
- SYMFONY_VERSION=5.0.*
|
||||
- php: '7.3'
|
||||
env:
|
||||
- SYMFONY_VERSION=5.1.*
|
||||
|
||||
# Test upcoming Symfony versions with lowest supported PHP version and dev dependencies
|
||||
- php: '7.2'
|
||||
env:
|
||||
- STABILITY=dev
|
||||
- SYMFONY_VERSION=5.1.*
|
||||
# - php: '7.2'
|
||||
# env:
|
||||
# - STABILITY=dev
|
||||
# - SYMFONY_VERSION=5.3.*
|
||||
|
||||
# Test upcoming PHP versions with dev dependencies
|
||||
- php: '7.4snapshot'
|
||||
env:
|
||||
- STABILITY=dev
|
||||
- COMPOSER_FLAGS="--ignore-platform-reqs --prefer-stable"
|
||||
#- php: '7.5snapshot'
|
||||
# env:
|
||||
# - STABILITY=dev
|
||||
# - COMPOSER_FLAGS="--ignore-platform-reqs --prefer-stable"
|
||||
|
||||
allow_failures:
|
||||
# 4.0 not supported because of https://github.com/advisories/GHSA-pgwj-prpq-jpc2
|
||||
- env:
|
||||
- SYMFONY_VERSION=4.0.*
|
||||
- env:
|
||||
@@ -95,6 +105,9 @@ matrix:
|
||||
- env:
|
||||
- STABILITY=dev
|
||||
- SYMFONY_VERSION=5.1.*
|
||||
- env:
|
||||
- STABILITY=dev
|
||||
- SYMFONY_VERSION=5.2.*
|
||||
|
||||
before_install:
|
||||
- if [[ "$SYMFONY_VERSION" != "" ]]; then
|
||||
@@ -108,11 +121,11 @@ before_install:
|
||||
phpenv config-rm xdebug.ini || true;
|
||||
fi
|
||||
- if [[ "$COVERALLS_ENABLED" == "true" ]]; then
|
||||
travis_retry composer require --dev satooshi/php-coveralls:^2.0 --no-update $COMPOSER_FLAGS;
|
||||
travis_retry composer require --dev php-coveralls/php-coveralls:^2.0 --no-update $COMPOSER_FLAGS;
|
||||
fi
|
||||
|
||||
install:
|
||||
- travis_retry composer update --prefer-dist --no-interaction --no-suggest --no-progress --ansi $COMPOSER_FLAGS
|
||||
- travis_retry composer update --prefer-dist --no-interaction --no-progress --ansi $COMPOSER_FLAGS
|
||||
|
||||
script: ./vendor/bin/phpunit $PHPUNIT_FLAGS
|
||||
|
||||
|
||||
15
CHANGELOG.md
15
CHANGELOG.md
@@ -1,3 +1,18 @@
|
||||
# Version 3.0.0
|
||||
|
||||
* Added PHP 8 support
|
||||
* PHP minimum requirements bumped to 7.3
|
||||
* Added Doctrine DBAL 3 support
|
||||
* Doctrine DBAL minimum requirements bumped to 2.12
|
||||
|
||||
# Version 2.2.0
|
||||
|
||||
* Improve logging Dataflow job
|
||||
|
||||
# Version 2.1.1
|
||||
|
||||
* Fixed some Symfony 5 compatibility issues
|
||||
|
||||
# Version 2.1.0
|
||||
|
||||
* Added CollectionWriter and DelegatorWriter
|
||||
|
||||
38
README.md
38
README.md
@@ -3,7 +3,7 @@
|
||||
DataflowBundle is a bundle for Symfony 3.4+
|
||||
providing an easy way to create import / export dataflow.
|
||||
|
||||
[](https://travis-ci.org/code-rhapsodie/dataflow-bundle)
|
||||
[](https://travis-ci.com/code-rhapsodie/dataflow-bundle)
|
||||
|
||||
[](https://coveralls.io/github/code-rhapsodie/dataflow-bundle)
|
||||
|
||||
@@ -35,6 +35,10 @@ As the following schema shows, you can define more than one dataflow:
|
||||
|
||||
## Installation
|
||||
|
||||
Security notice: Symfony 4.x is not supported before 4.1.12, see https://github.com/advisories/GHSA-pgwj-prpq-jpc2
|
||||
|
||||
And basically, every allowed-to-failed jobs in our travis configuration are not fully supported.
|
||||
|
||||
### Add the dependency
|
||||
|
||||
To install this bundle, run this command :
|
||||
@@ -126,6 +130,14 @@ code_rhapsodie_dataflow:
|
||||
dbal_default_connection: test #Name of the default connection used by Dataflow bundle
|
||||
```
|
||||
|
||||
By default, the `logger` service will be used to log all exceptions and custom messages.
|
||||
If you want to use another logger, like a specific Monolog handler, Add this configuration:
|
||||
|
||||
```yaml
|
||||
code_rhapsodie_dataflow:
|
||||
default_logger: monolog.logger.custom #Service ID of the logger you want Dataflow to use
|
||||
```
|
||||
|
||||
## Define a dataflow type
|
||||
|
||||
This bundle uses a fixed and simple workflow structure in order to let you focus on the data processing logic part of your dataflow.
|
||||
@@ -235,6 +247,30 @@ class MyFirstDataflowType extends AbstractDataflowType
|
||||
|
||||
With this configuration, the option `fileName` is required. For an advanced usage of the option resolver, read the [Symfony documentation](https://symfony.com/doc/current/components/options_resolver.html).
|
||||
|
||||
### Logging
|
||||
|
||||
All exceptions will be caught and written in the logger.
|
||||
If you want to add custom messages in the log, you can inject the logger in your readers / steps / writers.
|
||||
If your DataflowType class extends `AbstractDataflowType`, the logger is accessible as `$this->logger`.
|
||||
|
||||
```php
|
||||
<?php
|
||||
// ...
|
||||
use Symfony\Component\OptionsResolver\OptionsResolver;
|
||||
|
||||
class MyDataflowType extends AbstractDataflowType
|
||||
{
|
||||
// ...
|
||||
protected function buildDataflow(DataflowBuilder $builder, array $options): void
|
||||
{
|
||||
$this->myWriter->setLogger($this->logger);
|
||||
}
|
||||
|
||||
}
|
||||
```
|
||||
|
||||
When using the `code-rhapsodie:dataflow:run-pending` command, this logger will also be used to save the log in the corresponding job in the database.
|
||||
|
||||
### Check if your DataflowType is ready
|
||||
|
||||
Execute this command to check if your DataflowType is correctly registered:
|
||||
|
||||
@@ -2,7 +2,12 @@
|
||||
"name": "code-rhapsodie/dataflow-bundle",
|
||||
"description": "Data processing framework inspired by PortPHP",
|
||||
"type": "symfony-bundle",
|
||||
"keywords": ["dataflow", "import", "export", "data processing"],
|
||||
"keywords": [
|
||||
"dataflow",
|
||||
"import",
|
||||
"export",
|
||||
"data processing"
|
||||
],
|
||||
"license": "MIT",
|
||||
"authors": [
|
||||
{
|
||||
@@ -36,18 +41,21 @@
|
||||
}
|
||||
},
|
||||
"require": {
|
||||
"php": "^7.1",
|
||||
"doctrine/dbal": "^2.0",
|
||||
"php": "^7.3||^8.0",
|
||||
"ext-json": "*",
|
||||
"doctrine/dbal": "^2.12||^3.0",
|
||||
"doctrine/doctrine-bundle": "^1.0||^2.0",
|
||||
"psr/log": "^1.1",
|
||||
"symfony/config": "^3.4||^4.0||^5.0",
|
||||
"symfony/console": "^3.4||^4.0||^5.0",
|
||||
"symfony/dependency-injection": "^3.4||^4.0||^5.0",
|
||||
"symfony/dependency-injection": "^3.4||>=4.1.12||^5.0",
|
||||
"symfony/event-dispatcher": "^3.4||^4.0||^5.0",
|
||||
"symfony/http-kernel": "^3.4||^4.0||^5.0",
|
||||
"symfony/lock": "^3.4||^4.0||^5.0",
|
||||
"symfony/monolog-bridge": "^3.4||^4.0||^5.0",
|
||||
"symfony/options-resolver": "^3.4||^4.0||^5.0",
|
||||
"symfony/validator": "^3.4||^4.0||^5.0",
|
||||
"symfony/yaml": "^3.4||^4.0||^5.0",
|
||||
"doctrine/doctrine-bundle": "^1.0||^2.0"
|
||||
"symfony/yaml": "^3.4||^4.0||^5.0"
|
||||
},
|
||||
"require-dev": {
|
||||
"phpunit/phpunit": "^7||^8"
|
||||
|
||||
@@ -6,6 +6,7 @@ namespace CodeRhapsodie\DataflowBundle;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\DependencyInjection\CodeRhapsodieDataflowExtension;
|
||||
use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\DataflowTypeCompilerPass;
|
||||
use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\DefaultLoggerCompilerPass;
|
||||
use Symfony\Component\DependencyInjection\ContainerBuilder;
|
||||
use Symfony\Component\HttpKernel\Bundle\Bundle;
|
||||
|
||||
@@ -23,6 +24,9 @@ class CodeRhapsodieDataflowBundle extends Bundle
|
||||
|
||||
public function build(ContainerBuilder $container)
|
||||
{
|
||||
$container->addCompilerPass(new DataflowTypeCompilerPass());
|
||||
$container
|
||||
->addCompilerPass(new DataflowTypeCompilerPass())
|
||||
->addCompilerPass(new DefaultLoggerCompilerPass())
|
||||
;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,20 +6,25 @@ namespace CodeRhapsodie\DataflowBundle\Command;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
|
||||
use Psr\Log\LoggerAwareInterface;
|
||||
use Psr\Log\LoggerAwareTrait;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\Console\Input\InputArgument;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
|
||||
/**
|
||||
* Runs one dataflow.
|
||||
*
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class ExecuteDataflowCommand extends Command
|
||||
class ExecuteDataflowCommand extends Command implements LoggerAwareInterface
|
||||
{
|
||||
use LoggerAwareTrait;
|
||||
|
||||
protected static $defaultName = 'code-rhapsodie:dataflow:execute';
|
||||
|
||||
/** @var DataflowTypeRegistryInterface */
|
||||
@@ -28,16 +33,12 @@ class ExecuteDataflowCommand extends Command
|
||||
/** @var ConnectionFactory */
|
||||
private $connectionFactory;
|
||||
|
||||
/** @var LoggerInterface */
|
||||
private $logger;
|
||||
|
||||
public function __construct(DataflowTypeRegistryInterface $registry, ConnectionFactory $connectionFactory, LoggerInterface $logger)
|
||||
public function __construct(DataflowTypeRegistryInterface $registry, ConnectionFactory $connectionFactory)
|
||||
{
|
||||
parent::__construct();
|
||||
|
||||
$this->registry = $registry;
|
||||
$this->connectionFactory = $connectionFactory;
|
||||
$this->logger = $logger;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -68,22 +69,22 @@ EOF
|
||||
}
|
||||
$fqcnOrAlias = $input->getArgument('fqcn');
|
||||
$options = json_decode($input->getArgument('options'), true);
|
||||
$io = new SymfonyStyle($input, $output);
|
||||
|
||||
$dataflowType = $this->registry->getDataflowType($fqcnOrAlias);
|
||||
if ($dataflowType instanceof LoggerAwareInterface && isset($this->logger)) {
|
||||
$dataflowType->setLogger($this->logger);
|
||||
}
|
||||
|
||||
$result = $dataflowType->process($options);
|
||||
|
||||
$output->writeln('Executed: '.$result->getName());
|
||||
$output->writeln('Start time: '.$result->getStartTime()->format('Y/m/d H:i:s'));
|
||||
$output->writeln('End time: '.$result->getEndTime()->format('Y/m/d H:i:s'));
|
||||
$output->writeln('Success: '.$result->getSuccessCount());
|
||||
$io->writeln('Executed: '.$result->getName());
|
||||
$io->writeln('Start time: '.$result->getStartTime()->format('Y/m/d H:i:s'));
|
||||
$io->writeln('End time: '.$result->getEndTime()->format('Y/m/d H:i:s'));
|
||||
$io->writeln('Success: '.$result->getSuccessCount());
|
||||
|
||||
if ($result->hasErrors() > 0) {
|
||||
$output->writeln('<error> Errors: '.$result->getErrorCount().' </error>');
|
||||
$output->writeln('<error> Exceptions traces are available in the logs. </error>');
|
||||
|
||||
foreach ($result->getExceptions() as $e) {
|
||||
$this->logger->error('Error during processing : '.$e->getMessage(), ['exception' => $e]);
|
||||
}
|
||||
if ($result->hasErrors()) {
|
||||
$io->error("Errors: {$result->getErrorCount()}\nExceptions traces are available in the logs.");
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
@@ -95,5 +95,7 @@ class SchemaCommand extends Command
|
||||
foreach ($sqls as $sql) {
|
||||
$io->text($sql.';');
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,10 +4,15 @@ declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\DataflowType;
|
||||
|
||||
use Psr\Log\LoggerAwareInterface;
|
||||
use Psr\Log\LoggerAwareTrait;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\OptionsResolver\OptionsResolver;
|
||||
|
||||
abstract class AbstractDataflowType implements DataflowTypeInterface
|
||||
abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwareInterface
|
||||
{
|
||||
use LoggerAwareTrait;
|
||||
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
@@ -27,6 +32,9 @@ abstract class AbstractDataflowType implements DataflowTypeInterface
|
||||
;
|
||||
$this->buildDataflow($builder, $options);
|
||||
$dataflow = $builder->getDataflow();
|
||||
if ($dataflow instanceof LoggerAwareInterface && $this->logger instanceof LoggerInterface) {
|
||||
$dataflow->setLogger($this->logger);
|
||||
}
|
||||
|
||||
return $dataflow->process();
|
||||
}
|
||||
|
||||
@@ -6,9 +6,13 @@ namespace CodeRhapsodie\DataflowBundle\DataflowType\Dataflow;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
|
||||
use Psr\Log\LoggerAwareInterface;
|
||||
use Psr\Log\LoggerAwareTrait;
|
||||
|
||||
class Dataflow implements DataflowInterface
|
||||
class Dataflow implements DataflowInterface, LoggerAwareInterface
|
||||
{
|
||||
use LoggerAwareTrait;
|
||||
|
||||
/** @var string */
|
||||
private $name;
|
||||
|
||||
@@ -16,15 +20,17 @@ class Dataflow implements DataflowInterface
|
||||
private $reader;
|
||||
|
||||
/** @var callable[] */
|
||||
private $steps = [];
|
||||
private $steps;
|
||||
|
||||
/** @var WriterInterface[] */
|
||||
private $writers = [];
|
||||
private $writers;
|
||||
|
||||
public function __construct(iterable $reader, ?string $name)
|
||||
{
|
||||
$this->reader = $reader;
|
||||
$this->name = $name;
|
||||
$this->steps = [];
|
||||
$this->writers = [];
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -54,27 +60,33 @@ class Dataflow implements DataflowInterface
|
||||
{
|
||||
$count = 0;
|
||||
$exceptions = [];
|
||||
$startTime = new \DateTime();
|
||||
$startTime = new \DateTimeImmutable();
|
||||
|
||||
foreach ($this->writers as $writer) {
|
||||
$writer->prepare();
|
||||
}
|
||||
|
||||
foreach ($this->reader as $index => $item) {
|
||||
try {
|
||||
$this->processItem($item);
|
||||
} catch (\Throwable $e) {
|
||||
$exceptions[$index] = $e;
|
||||
try {
|
||||
foreach ($this->writers as $writer) {
|
||||
$writer->prepare();
|
||||
}
|
||||
|
||||
++$count;
|
||||
foreach ($this->reader as $index => $item) {
|
||||
try {
|
||||
$this->processItem($item);
|
||||
} catch (\Throwable $e) {
|
||||
$exceptions[$index] = $e;
|
||||
$this->logException($e, (string) $index);
|
||||
}
|
||||
|
||||
++$count;
|
||||
}
|
||||
|
||||
foreach ($this->writers as $writer) {
|
||||
$writer->finish();
|
||||
}
|
||||
} catch (\Throwable $e) {
|
||||
$exceptions[] = $e;
|
||||
$this->logException($e);
|
||||
}
|
||||
|
||||
foreach ($this->writers as $writer) {
|
||||
$writer->finish();
|
||||
}
|
||||
|
||||
return new Result($this->name, $startTime, new \DateTime(), $count, $exceptions);
|
||||
return new Result($this->name, $startTime, new \DateTimeImmutable(), $count, $exceptions);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -94,4 +106,13 @@ class Dataflow implements DataflowInterface
|
||||
$writer->write($item);
|
||||
}
|
||||
}
|
||||
|
||||
private function logException(\Throwable $e, ?string $index = null): void
|
||||
{
|
||||
if (!isset($this->logger)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->logger->error($e, ['exception' => $e, 'index' => $index]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,5 +28,6 @@ class CodeRhapsodieDataflowExtension extends Extension
|
||||
$config = $this->processConfiguration($configuration, $configs);
|
||||
|
||||
$container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']);
|
||||
$container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Command\ExecuteDataflowCommand;
|
||||
use CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner;
|
||||
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
|
||||
use Symfony\Component\DependencyInjection\ContainerBuilder;
|
||||
use Symfony\Component\DependencyInjection\Reference;
|
||||
|
||||
class DefaultLoggerCompilerPass implements CompilerPassInterface
|
||||
{
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function process(ContainerBuilder $container)
|
||||
{
|
||||
$defaultLogger = $container->getParameter('coderhapsodie.dataflow.default_logger');
|
||||
if (!$container->has($defaultLogger)) {
|
||||
return;
|
||||
}
|
||||
|
||||
foreach ([ExecuteDataflowCommand::class, PendingDataflowRunner::class] as $serviceId) {
|
||||
if (!$container->has($serviceId)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$definition = $container->findDefinition($serviceId);
|
||||
$definition->addMethodCall('setLogger', [new Reference($defaultLogger)]);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -11,14 +11,22 @@ class Configuration implements ConfigurationInterface
|
||||
{
|
||||
public function getConfigTreeBuilder()
|
||||
{
|
||||
$treeBuilder = new TreeBuilder();
|
||||
$rootNode = $treeBuilder->root('code_rhapsodie_dataflow');
|
||||
$treeBuilder = new TreeBuilder('code_rhapsodie_dataflow');
|
||||
if (method_exists($treeBuilder, 'getRootNode')) {
|
||||
$rootNode = $treeBuilder->getRootNode();
|
||||
} else {
|
||||
// BC for symfony/config < 4.2
|
||||
$rootNode = $treeBuilder->root('code_rhapsodie_dataflow');
|
||||
}
|
||||
|
||||
$rootNode
|
||||
->children()
|
||||
->scalarNode('dbal_default_connection')
|
||||
->defaultValue('default')
|
||||
->end()
|
||||
->scalarNode('default_logger')
|
||||
->defaultValue('logger')
|
||||
->end()
|
||||
->end()
|
||||
;
|
||||
|
||||
|
||||
@@ -68,8 +68,6 @@ class Job
|
||||
|
||||
/**
|
||||
* @var \DateTimeInterface|null
|
||||
*
|
||||
* @Asserts\DateTime()
|
||||
*/
|
||||
private $requestedDate;
|
||||
|
||||
|
||||
@@ -9,4 +9,12 @@ namespace CodeRhapsodie\DataflowBundle\Exceptions;
|
||||
*/
|
||||
class UnknownDataflowTypeException extends \Exception
|
||||
{
|
||||
public static function create(string $aliasOrFqcn, array $knownDataflowTypes): self
|
||||
{
|
||||
return new self(sprintf(
|
||||
'Unknown dataflow type FQCN or alias "%s". Registered dataflow types FQCN and aliases are %s.',
|
||||
$aliasOrFqcn,
|
||||
implode(', ', $knownDataflowTypes)
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
42
src/Logger/BufferHandler.php
Normal file
42
src/Logger/BufferHandler.php
Normal file
@@ -0,0 +1,42 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Logger;
|
||||
|
||||
use Monolog\Formatter\FormatterInterface;
|
||||
use Monolog\Formatter\LineFormatter;
|
||||
use Monolog\Handler\AbstractProcessingHandler;
|
||||
use Monolog\Logger;
|
||||
|
||||
class BufferHandler extends AbstractProcessingHandler
|
||||
{
|
||||
private const FORMAT = "[%datetime%] %level_name% when processing item %context.index%: %message% %context% %extra%\n";
|
||||
|
||||
private $buffer;
|
||||
|
||||
public function __construct($level = Logger::DEBUG, bool $bubble = true)
|
||||
{
|
||||
parent::__construct($level, $bubble);
|
||||
|
||||
$this->buffer = [];
|
||||
}
|
||||
|
||||
public function clearBuffer(): array
|
||||
{
|
||||
$logs = $this->buffer;
|
||||
$this->buffer = [];
|
||||
|
||||
return $logs;
|
||||
}
|
||||
|
||||
protected function write(array $record): void
|
||||
{
|
||||
$this->buffer[] = $record['formatted'];
|
||||
}
|
||||
|
||||
protected function getDefaultFormatter(): FormatterInterface
|
||||
{
|
||||
return new LineFormatter(self::FORMAT);
|
||||
}
|
||||
}
|
||||
32
src/Logger/DelegatingLogger.php
Normal file
32
src/Logger/DelegatingLogger.php
Normal file
@@ -0,0 +1,32 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Logger;
|
||||
|
||||
use Psr\Log\AbstractLogger;
|
||||
use Psr\Log\LoggerInterface;
|
||||
|
||||
final class DelegatingLogger extends AbstractLogger
|
||||
{
|
||||
/** @var LoggerInterface[] */
|
||||
private $loggers;
|
||||
|
||||
public function __construct(iterable $loggers)
|
||||
{
|
||||
foreach ($loggers as $logger) {
|
||||
if (!$logger instanceof LoggerInterface) {
|
||||
throw new \InvalidArgumentException(sprintf('Only instances of %s should be passed to the constructor of %s. An instance of %s was passed instead.', LoggerInterface::class, self::class, get_class($logger)));
|
||||
}
|
||||
|
||||
$this->loggers[] = $logger;
|
||||
}
|
||||
}
|
||||
|
||||
public function log($level, $message, array $context = [])
|
||||
{
|
||||
foreach ($this->loggers as $logger) {
|
||||
$logger->log($level, $message, $context);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -31,7 +31,7 @@ class DataflowTypeRegistry implements DataflowTypeRegistryInterface
|
||||
return $this->aliasesRegistry[$fqcnOrAlias];
|
||||
}
|
||||
|
||||
throw new UnknownDataflowTypeException($fqcnOrAlias);
|
||||
throw UnknownDataflowTypeException::create($fqcnOrAlias, array_merge(array_keys($this->fqcnRegistry), array_keys($this->aliasesRegistry)));
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -7,6 +7,7 @@ namespace CodeRhapsodie\DataflowBundle\Repository;
|
||||
use CodeRhapsodie\DataflowBundle\Entity\Job;
|
||||
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
|
||||
use Doctrine\DBAL\Driver\Connection;
|
||||
use Doctrine\DBAL\ParameterType;
|
||||
use Doctrine\DBAL\Query\QueryBuilder;
|
||||
|
||||
/**
|
||||
@@ -21,15 +22,15 @@ class JobRepository
|
||||
public const TABLE_NAME = 'cr_dataflow_job';
|
||||
|
||||
private const FIELDS_TYPE = [
|
||||
'id' => \PDO::PARAM_INT,
|
||||
'status' => \PDO::PARAM_INT,
|
||||
'label' => \PDO::PARAM_STR,
|
||||
'dataflow_type' => \PDO::PARAM_STR,
|
||||
'options' => \PDO::PARAM_STR,
|
||||
'id' => ParameterType::INTEGER,
|
||||
'status' => ParameterType::INTEGER,
|
||||
'label' => ParameterType::STRING,
|
||||
'dataflow_type' => ParameterType::STRING,
|
||||
'options' => ParameterType::STRING,
|
||||
'requested_date' => 'datetime',
|
||||
'scheduled_dataflow_id' => \PDO::PARAM_INT,
|
||||
'count' => \PDO::PARAM_INT,
|
||||
'exceptions' => \PDO::PARAM_STR,
|
||||
'scheduled_dataflow_id' => ParameterType::INTEGER,
|
||||
'count' => ParameterType::INTEGER,
|
||||
'exceptions' => ParameterType::STRING,
|
||||
'start_time' => 'datetime',
|
||||
'end_time' => 'datetime',
|
||||
];
|
||||
@@ -51,7 +52,7 @@ class JobRepository
|
||||
{
|
||||
$qb = $this->createQueryBuilder();
|
||||
$qb
|
||||
->andWhere($qb->expr()->eq('id', $qb->createNamedParameter($jobId, \PDO::PARAM_INT)))
|
||||
->andWhere($qb->expr()->eq('id', $qb->createNamedParameter($jobId, ParameterType::INTEGER)))
|
||||
;
|
||||
|
||||
return $this->returnFirstOrNull($qb);
|
||||
@@ -62,12 +63,12 @@ class JobRepository
|
||||
$qb = $this->createQueryBuilder();
|
||||
$qb
|
||||
->andWhere($qb->expr()->isNull('scheduled_dataflow_id'))
|
||||
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, \PDO::PARAM_INT)));
|
||||
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, ParameterType::INTEGER)));
|
||||
$stmt = $qb->execute();
|
||||
if (0 === $stmt->rowCount()) {
|
||||
return [];
|
||||
}
|
||||
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
|
||||
while (false !== ($row = $stmt->fetchAssociative())) {
|
||||
yield Job::createFromArray($this->initDateTime($this->initArray($row)));
|
||||
}
|
||||
}
|
||||
@@ -76,8 +77,8 @@ class JobRepository
|
||||
{
|
||||
$qb = $this->createQueryBuilder();
|
||||
$qb
|
||||
->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($scheduled->getId(), \PDO::PARAM_INT)))
|
||||
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, \PDO::PARAM_INT)));
|
||||
->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($scheduled->getId(), ParameterType::INTEGER)))
|
||||
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, ParameterType::INTEGER)));
|
||||
|
||||
return $this->returnFirstOrNull($qb);
|
||||
}
|
||||
@@ -86,7 +87,7 @@ class JobRepository
|
||||
{
|
||||
$qb = $this->createQueryBuilder();
|
||||
$qb->andWhere($qb->expr()->lte('requested_date', $qb->createNamedParameter(new \DateTime(), 'datetime')))
|
||||
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, \PDO::PARAM_INT)))
|
||||
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, ParameterType::INTEGER)))
|
||||
->orderBy('requested_date', 'ASC')
|
||||
->setMaxResults(1)
|
||||
;
|
||||
@@ -97,7 +98,7 @@ class JobRepository
|
||||
public function findLastForDataflowId(int $dataflowId): ?Job
|
||||
{
|
||||
$qb = $this->createQueryBuilder();
|
||||
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($dataflowId, \PDO::PARAM_INT)))
|
||||
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($dataflowId, ParameterType::INTEGER)))
|
||||
->orderBy('requested_date', 'DESC')
|
||||
->setMaxResults(1)
|
||||
;
|
||||
@@ -115,7 +116,7 @@ class JobRepository
|
||||
if (0 === $stmt->rowCount()) {
|
||||
return [];
|
||||
}
|
||||
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
|
||||
while (false !== ($row = $stmt->fetchAssociative())) {
|
||||
yield Job::createFromArray($row);
|
||||
}
|
||||
}
|
||||
@@ -123,14 +124,14 @@ class JobRepository
|
||||
public function findForScheduled(int $id): iterable
|
||||
{
|
||||
$qb = $this->createQueryBuilder();
|
||||
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($id, \PDO::PARAM_INT)))
|
||||
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($id, ParameterType::INTEGER)))
|
||||
->orderBy('requested_date', 'DESC')
|
||||
->setMaxResults(20);
|
||||
$stmt = $qb->execute();
|
||||
if (0 === $stmt->rowCount()) {
|
||||
return [];
|
||||
}
|
||||
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
|
||||
while (false !== ($row = $stmt->fetchAssociative())) {
|
||||
yield Job::createFromArray($row);
|
||||
}
|
||||
}
|
||||
@@ -172,6 +173,6 @@ class JobRepository
|
||||
return null;
|
||||
}
|
||||
|
||||
return Job::createFromArray($this->initDateTime($this->initArray($stmt->fetch(\PDO::FETCH_ASSOC))));
|
||||
return Job::createFromArray($this->initDateTime($this->initArray($stmt->fetchAssociative())));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ namespace CodeRhapsodie\DataflowBundle\Repository;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
|
||||
use Doctrine\DBAL\Driver\Connection;
|
||||
use Doctrine\DBAL\ParameterType;
|
||||
use Doctrine\DBAL\Query\QueryBuilder;
|
||||
|
||||
/**
|
||||
@@ -20,13 +21,13 @@ class ScheduledDataflowRepository
|
||||
public const TABLE_NAME = 'cr_dataflow_scheduled';
|
||||
|
||||
private const FIELDS_TYPE = [
|
||||
'id' => \PDO::PARAM_INT,
|
||||
'label' => \PDO::PARAM_STR,
|
||||
'dataflow_type' => \PDO::PARAM_STR,
|
||||
'options' => \PDO::PARAM_STR,
|
||||
'frequency' => \PDO::PARAM_STR,
|
||||
'id' => ParameterType::INTEGER,
|
||||
'label' => ParameterType::STRING,
|
||||
'dataflow_type' => ParameterType::STRING,
|
||||
'options' => ParameterType::STRING,
|
||||
'frequency' => ParameterType::STRING,
|
||||
'next' => 'datetime',
|
||||
'enabled' => \PDO::PARAM_BOOL,
|
||||
'enabled' => ParameterType::BOOLEAN,
|
||||
];
|
||||
/**
|
||||
* @var \Doctrine\DBAL\Connection
|
||||
@@ -58,7 +59,7 @@ class ScheduledDataflowRepository
|
||||
if (0 === $stmt->rowCount()) {
|
||||
return [];
|
||||
}
|
||||
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
|
||||
while (false !== ($row = $stmt->fetchAssociative())) {
|
||||
yield ScheduledDataflow::createFromArray($this->initDateTime($this->initArray($row)));
|
||||
}
|
||||
}
|
||||
@@ -66,7 +67,7 @@ class ScheduledDataflowRepository
|
||||
public function find(int $scheduleId): ?ScheduledDataflow
|
||||
{
|
||||
$qb = $this->createQueryBuilder();
|
||||
$qb->andWhere($qb->expr()->eq('id', $qb->createNamedParameter($scheduleId, \PDO::PARAM_INT)))
|
||||
$qb->andWhere($qb->expr()->eq('id', $qb->createNamedParameter($scheduleId, ParameterType::INTEGER)))
|
||||
->setMaxResults(1)
|
||||
;
|
||||
|
||||
@@ -82,7 +83,7 @@ class ScheduledDataflowRepository
|
||||
if (0 === $stmt->rowCount()) {
|
||||
return [];
|
||||
}
|
||||
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
|
||||
while (false !== ($row = $stmt->fetchAssociative())) {
|
||||
yield ScheduledDataflow::createFromArray($this->initDateTime($this->initOptions($row)));
|
||||
}
|
||||
}
|
||||
@@ -96,7 +97,7 @@ class ScheduledDataflowRepository
|
||||
->orderBy('w.label', 'ASC')
|
||||
->groupBy('w.id');
|
||||
|
||||
return $query->execute()->fetchAll(\PDO::FETCH_ASSOC);
|
||||
return $query->execute()->fetchAllAssociative();
|
||||
}
|
||||
|
||||
public function save(ScheduledDataflow $scheduledDataflow)
|
||||
@@ -147,6 +148,6 @@ class ScheduledDataflowRepository
|
||||
return null;
|
||||
}
|
||||
|
||||
return ScheduledDataflow::createFromArray($this->initDateTime($this->initArray($stmt->fetch(\PDO::FETCH_ASSOC))));
|
||||
return ScheduledDataflow::createFromArray($this->initDateTime($this->initArray($stmt->fetchAssociative())));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,6 @@ services:
|
||||
arguments:
|
||||
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
|
||||
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
|
||||
$logger: '@logger'
|
||||
tags: ['console.command']
|
||||
|
||||
CodeRhapsodie\DataflowBundle\Command\JobShowCommand:
|
||||
|
||||
@@ -8,12 +8,20 @@ use CodeRhapsodie\DataflowBundle\DataflowType\Result;
|
||||
use CodeRhapsodie\DataflowBundle\Entity\Job;
|
||||
use CodeRhapsodie\DataflowBundle\Event\Events;
|
||||
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
|
||||
use CodeRhapsodie\DataflowBundle\Logger\BufferHandler;
|
||||
use CodeRhapsodie\DataflowBundle\Logger\DelegatingLogger;
|
||||
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
|
||||
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
|
||||
use Monolog\Logger;
|
||||
use Psr\Log\LoggerAwareInterface;
|
||||
use Psr\Log\LoggerAwareTrait;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
||||
|
||||
class PendingDataflowRunner implements PendingDataflowRunnerInterface
|
||||
class PendingDataflowRunner implements PendingDataflowRunnerInterface, LoggerAwareInterface
|
||||
{
|
||||
use LoggerAwareTrait;
|
||||
|
||||
/** @var JobRepository */
|
||||
private $repository;
|
||||
|
||||
@@ -39,9 +47,25 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
|
||||
$this->beforeProcessing($job);
|
||||
|
||||
$dataflowType = $this->registry->getDataflowType($job->getDataflowType());
|
||||
$loggers = [new Logger('dataflow_internal', [$bufferHandler = new BufferHandler()])];
|
||||
if (isset($this->logger)) {
|
||||
$loggers[] = $this->logger;
|
||||
}
|
||||
$logger = new DelegatingLogger($loggers);
|
||||
|
||||
if ($dataflowType instanceof LoggerAwareInterface) {
|
||||
$dataflowType->setLogger($logger);
|
||||
}
|
||||
|
||||
$result = $dataflowType->process($job->getOptions());
|
||||
|
||||
$this->afterProcessing($job, $result);
|
||||
if (!$dataflowType instanceof LoggerAwareInterface) {
|
||||
foreach ($result->getExceptions() as $index => $e) {
|
||||
$logger->error($e, ['index' => $index]);
|
||||
}
|
||||
}
|
||||
|
||||
$this->afterProcessing($job, $result, $bufferHandler);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,19 +85,13 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
|
||||
$this->repository->save($job);
|
||||
}
|
||||
|
||||
private function afterProcessing(Job $job, Result $result): void
|
||||
private function afterProcessing(Job $job, Result $result, BufferHandler $bufferLogger): void
|
||||
{
|
||||
$exceptions = [];
|
||||
/** @var \Exception $exception */
|
||||
foreach ($result->getExceptions() as $exception) {
|
||||
$exceptions[] = (string) $exception;
|
||||
}
|
||||
|
||||
$job
|
||||
->setEndTime($result->getEndTime())
|
||||
->setStatus(Job::STATUS_COMPLETED)
|
||||
->setCount($result->getSuccessCount())
|
||||
->setExceptions($exceptions)
|
||||
->setExceptions($bufferLogger->clearBuffer())
|
||||
;
|
||||
$this->repository->save($job);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user