2 Commits

Author SHA1 Message Date
jbcr
8bb55b1303 Update CHANGELOG.md (#56) 2021-01-15 15:18:36 +01:00
jeremycr
f0459462f7 Improved logging (#55)
* Improved logging

* Better handling of default logger

* Update src/DataflowType/Dataflow/Dataflow.php

* Updated README

Co-authored-by: jbcr <51637606+jbcr@users.noreply.github.com>
2021-01-15 14:38:48 +01:00
16 changed files with 268 additions and 53 deletions

View File

@@ -1,3 +1,7 @@
# Version 2.2.0
* Improve logging Dataflow job
# Version 2.1.1
* Fixed some Symfony 5 compatibility issues

View File

@@ -130,6 +130,14 @@ code_rhapsodie_dataflow:
dbal_default_connection: test #Name of the default connection used by Dataflow bundle
```
By default, the `logger` service will be used to log all exceptions and custom messages.
If you want to use another logger, like a specific Monolog handler, Add this configuration:
```yaml
code_rhapsodie_dataflow:
default_logger: monolog.logger.custom #Service ID of the logger you want Dataflow to use
```
## Define a dataflow type
This bundle uses a fixed and simple workflow structure in order to let you focus on the data processing logic part of your dataflow.
@@ -239,6 +247,30 @@ class MyFirstDataflowType extends AbstractDataflowType
With this configuration, the option `fileName` is required. For an advanced usage of the option resolver, read the [Symfony documentation](https://symfony.com/doc/current/components/options_resolver.html).
### Logging
All exceptions will be caught and written in the logger.
If you want to add custom messages in the log, you can inject the logger in your readers / steps / writers.
If your DataflowType class extends `AbstractDataflowType`, the logger is accessible as `$this->logger`.
```php
<?php
// ...
use Symfony\Component\OptionsResolver\OptionsResolver;
class MyDataflowType extends AbstractDataflowType
{
// ...
protected function buildDataflow(DataflowBuilder $builder, array $options): void
{
$this->myWriter->setLogger($this->logger);
}
}
```
When using the `code-rhapsodie:dataflow:run-pending` command, this logger will also be used to save the log in the corresponding job in the database.
### Check if your DataflowType is ready
Execute this command to check if your DataflowType is correctly registered:

View File

@@ -2,7 +2,12 @@
"name": "code-rhapsodie/dataflow-bundle",
"description": "Data processing framework inspired by PortPHP",
"type": "symfony-bundle",
"keywords": ["dataflow", "import", "export", "data processing"],
"keywords": [
"dataflow",
"import",
"export",
"data processing"
],
"license": "MIT",
"authors": [
{
@@ -37,17 +42,20 @@
},
"require": {
"php": "^7.1",
"ext-json": "*",
"doctrine/dbal": "^2.0",
"doctrine/doctrine-bundle": "^1.0||^2.0",
"psr/log": "^1.1",
"symfony/config": "^3.4||^4.0||^5.0",
"symfony/console": "^3.4||^4.0||^5.0",
"symfony/dependency-injection": "^3.4||>=4.1.12||^5.0",
"symfony/event-dispatcher": "^3.4||^4.0||^5.0",
"symfony/http-kernel": "^3.4||^4.0||^5.0",
"symfony/lock": "^3.4||^4.0||^5.0",
"symfony/monolog-bridge": "^3.4||^4.0||^5.0",
"symfony/options-resolver": "^3.4||^4.0||^5.0",
"symfony/validator": "^3.4||^4.0||^5.0",
"symfony/yaml": "^3.4||^4.0||^5.0",
"doctrine/doctrine-bundle": "^1.0||^2.0"
"symfony/yaml": "^3.4||^4.0||^5.0"
},
"require-dev": {
"phpunit/phpunit": "^7||^8"

View File

@@ -6,6 +6,7 @@ namespace CodeRhapsodie\DataflowBundle;
use CodeRhapsodie\DataflowBundle\DependencyInjection\CodeRhapsodieDataflowExtension;
use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\DataflowTypeCompilerPass;
use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\DefaultLoggerCompilerPass;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\HttpKernel\Bundle\Bundle;
@@ -23,6 +24,9 @@ class CodeRhapsodieDataflowBundle extends Bundle
public function build(ContainerBuilder $container)
{
$container->addCompilerPass(new DataflowTypeCompilerPass());
$container
->addCompilerPass(new DataflowTypeCompilerPass())
->addCompilerPass(new DefaultLoggerCompilerPass())
;
}
}

View File

@@ -6,20 +6,25 @@ namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
/**
* Runs one dataflow.
*
* @codeCoverageIgnore
*/
class ExecuteDataflowCommand extends Command
class ExecuteDataflowCommand extends Command implements LoggerAwareInterface
{
use LoggerAwareTrait;
protected static $defaultName = 'code-rhapsodie:dataflow:execute';
/** @var DataflowTypeRegistryInterface */
@@ -28,16 +33,12 @@ class ExecuteDataflowCommand extends Command
/** @var ConnectionFactory */
private $connectionFactory;
/** @var LoggerInterface */
private $logger;
public function __construct(DataflowTypeRegistryInterface $registry, ConnectionFactory $connectionFactory, LoggerInterface $logger)
public function __construct(DataflowTypeRegistryInterface $registry, ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->registry = $registry;
$this->connectionFactory = $connectionFactory;
$this->logger = $logger;
}
/**
@@ -68,22 +69,22 @@ EOF
}
$fqcnOrAlias = $input->getArgument('fqcn');
$options = json_decode($input->getArgument('options'), true);
$io = new SymfonyStyle($input, $output);
$dataflowType = $this->registry->getDataflowType($fqcnOrAlias);
if ($dataflowType instanceof LoggerAwareInterface && isset($this->logger)) {
$dataflowType->setLogger($this->logger);
}
$result = $dataflowType->process($options);
$output->writeln('Executed: '.$result->getName());
$output->writeln('Start time: '.$result->getStartTime()->format('Y/m/d H:i:s'));
$output->writeln('End time: '.$result->getEndTime()->format('Y/m/d H:i:s'));
$output->writeln('Success: '.$result->getSuccessCount());
$io->writeln('Executed: '.$result->getName());
$io->writeln('Start time: '.$result->getStartTime()->format('Y/m/d H:i:s'));
$io->writeln('End time: '.$result->getEndTime()->format('Y/m/d H:i:s'));
$io->writeln('Success: '.$result->getSuccessCount());
if ($result->hasErrors() > 0) {
$output->writeln('<error> Errors: '.$result->getErrorCount().' </error>');
$output->writeln('<error> Exceptions traces are available in the logs. </error>');
foreach ($result->getExceptions() as $e) {
$this->logger->error('Error during processing : '.$e->getMessage(), ['exception' => $e]);
}
if ($result->hasErrors()) {
$io->error("Errors: {$result->getErrorCount()}\nExceptions traces are available in the logs.");
return 1;
}

View File

@@ -4,10 +4,15 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
abstract class AbstractDataflowType implements DataflowTypeInterface
abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
/**
* @codeCoverageIgnore
*/
@@ -27,6 +32,9 @@ abstract class AbstractDataflowType implements DataflowTypeInterface
;
$this->buildDataflow($builder, $options);
$dataflow = $builder->getDataflow();
if ($dataflow instanceof LoggerAwareInterface && $this->logger instanceof LoggerInterface) {
$dataflow->setLogger($this->logger);
}
return $dataflow->process();
}

View File

@@ -6,9 +6,13 @@ namespace CodeRhapsodie\DataflowBundle\DataflowType\Dataflow;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
class Dataflow implements DataflowInterface
class Dataflow implements DataflowInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
/** @var string */
private $name;
@@ -16,15 +20,17 @@ class Dataflow implements DataflowInterface
private $reader;
/** @var callable[] */
private $steps = [];
private $steps;
/** @var WriterInterface[] */
private $writers = [];
private $writers;
public function __construct(iterable $reader, ?string $name)
{
$this->reader = $reader;
$this->name = $name;
$this->steps = [];
$this->writers = [];
}
/**
@@ -54,27 +60,33 @@ class Dataflow implements DataflowInterface
{
$count = 0;
$exceptions = [];
$startTime = new \DateTime();
$startTime = new \DateTimeImmutable();
foreach ($this->writers as $writer) {
$writer->prepare();
}
foreach ($this->reader as $index => $item) {
try {
$this->processItem($item);
} catch (\Throwable $e) {
$exceptions[$index] = $e;
try {
foreach ($this->writers as $writer) {
$writer->prepare();
}
++$count;
foreach ($this->reader as $index => $item) {
try {
$this->processItem($item);
} catch (\Throwable $e) {
$exceptions[$index] = $e;
$this->logException($e, (string) $index);
}
++$count;
}
foreach ($this->writers as $writer) {
$writer->finish();
}
} catch (\Throwable $e) {
$exceptions[] = $e;
$this->logException($e);
}
foreach ($this->writers as $writer) {
$writer->finish();
}
return new Result($this->name, $startTime, new \DateTime(), $count, $exceptions);
return new Result($this->name, $startTime, new \DateTimeImmutable(), $count, $exceptions);
}
/**
@@ -94,4 +106,13 @@ class Dataflow implements DataflowInterface
$writer->write($item);
}
}
private function logException(\Throwable $e, ?string $index = null): void
{
if (!isset($this->logger)) {
return;
}
$this->logger->error($e, ['exception' => $e, 'index' => $index]);
}
}

View File

@@ -28,5 +28,6 @@ class CodeRhapsodieDataflowExtension extends Extension
$config = $this->processConfiguration($configuration, $configs);
$container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']);
$container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']);
}
}

View File

@@ -0,0 +1,34 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler;
use CodeRhapsodie\DataflowBundle\Command\ExecuteDataflowCommand;
use CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Reference;
class DefaultLoggerCompilerPass implements CompilerPassInterface
{
/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
{
$defaultLogger = $container->getParameter('coderhapsodie.dataflow.default_logger');
if (!$container->has($defaultLogger)) {
return;
}
foreach ([ExecuteDataflowCommand::class, PendingDataflowRunner::class] as $serviceId) {
if (!$container->has($serviceId)) {
continue;
}
$definition = $container->findDefinition($serviceId);
$definition->addMethodCall('setLogger', [new Reference($defaultLogger)]);
}
}
}

View File

@@ -24,6 +24,9 @@ class Configuration implements ConfigurationInterface
->scalarNode('dbal_default_connection')
->defaultValue('default')
->end()
->scalarNode('default_logger')
->defaultValue('logger')
->end()
->end()
;

View File

@@ -9,4 +9,12 @@ namespace CodeRhapsodie\DataflowBundle\Exceptions;
*/
class UnknownDataflowTypeException extends \Exception
{
public static function create(string $aliasOrFqcn, array $knownDataflowTypes): self
{
return new self(sprintf(
'Unknown dataflow type FQCN or alias "%s". Registered dataflow types FQCN and aliases are %s.',
$aliasOrFqcn,
implode(', ', $knownDataflowTypes)
));
}
}

View File

@@ -0,0 +1,42 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Logger;
use Monolog\Formatter\FormatterInterface;
use Monolog\Formatter\LineFormatter;
use Monolog\Handler\AbstractProcessingHandler;
use Monolog\Logger;
class BufferHandler extends AbstractProcessingHandler
{
private const FORMAT = "[%datetime%] %level_name% when processing item %context.index%: %message% %context% %extra%\n";
private $buffer;
public function __construct($level = Logger::DEBUG, bool $bubble = true)
{
parent::__construct($level, $bubble);
$this->buffer = [];
}
public function clearBuffer(): array
{
$logs = $this->buffer;
$this->buffer = [];
return $logs;
}
protected function write(array $record): void
{
$this->buffer[] = $record['formatted'];
}
protected function getDefaultFormatter(): FormatterInterface
{
return new LineFormatter(self::FORMAT);
}
}

View File

@@ -0,0 +1,32 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Logger;
use Psr\Log\AbstractLogger;
use Psr\Log\LoggerInterface;
final class DelegatingLogger extends AbstractLogger
{
/** @var LoggerInterface[] */
private $loggers;
public function __construct(iterable $loggers)
{
foreach ($loggers as $logger) {
if (!$logger instanceof LoggerInterface) {
throw new \InvalidArgumentException(sprintf('Only instances of %s should be passed to the constructor of %s. An instance of %s was passed instead.', LoggerInterface::class, self::class, get_class($logger)));
}
$this->loggers[] = $logger;
}
}
public function log($level, $message, array $context = [])
{
foreach ($this->loggers as $logger) {
$logger->log($level, $message, $context);
}
}
}

View File

@@ -31,7 +31,7 @@ class DataflowTypeRegistry implements DataflowTypeRegistryInterface
return $this->aliasesRegistry[$fqcnOrAlias];
}
throw new UnknownDataflowTypeException($fqcnOrAlias);
throw UnknownDataflowTypeException::create($fqcnOrAlias, array_merge(array_keys($this->fqcnRegistry), array_keys($this->aliasesRegistry)));
}
/**

View File

@@ -23,7 +23,6 @@ services:
arguments:
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
$logger: '@logger'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\JobShowCommand:

View File

@@ -8,12 +8,20 @@ use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Logger\BufferHandler;
use CodeRhapsodie\DataflowBundle\Logger\DelegatingLogger;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Monolog\Logger;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class PendingDataflowRunner implements PendingDataflowRunnerInterface
class PendingDataflowRunner implements PendingDataflowRunnerInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
/** @var JobRepository */
private $repository;
@@ -39,9 +47,25 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
$this->beforeProcessing($job);
$dataflowType = $this->registry->getDataflowType($job->getDataflowType());
$loggers = [new Logger('dataflow_internal', [$bufferHandler = new BufferHandler()])];
if (isset($this->logger)) {
$loggers[] = $this->logger;
}
$logger = new DelegatingLogger($loggers);
if ($dataflowType instanceof LoggerAwareInterface) {
$dataflowType->setLogger($logger);
}
$result = $dataflowType->process($job->getOptions());
$this->afterProcessing($job, $result);
if (!$dataflowType instanceof LoggerAwareInterface) {
foreach ($result->getExceptions() as $index => $e) {
$logger->error($e, ['index' => $index]);
}
}
$this->afterProcessing($job, $result, $bufferHandler);
}
}
@@ -61,19 +85,13 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
$this->repository->save($job);
}
private function afterProcessing(Job $job, Result $result): void
private function afterProcessing(Job $job, Result $result, BufferHandler $bufferLogger): void
{
$exceptions = [];
/** @var \Exception $exception */
foreach ($result->getExceptions() as $exception) {
$exceptions[] = (string) $exception;
}
$job
->setEndTime($result->getEndTime())
->setStatus(Job::STATUS_COMPLETED)
->setCount($result->getSuccessCount())
->setExceptions($exceptions)
->setExceptions($bufferLogger->clearBuffer())
;
$this->repository->save($job);