mirror of
https://github.com/code-rhapsodie/dataflow-bundle.git
synced 2026-03-24 06:42:23 +01:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
405ce87923 |
@@ -1,3 +1,6 @@
|
||||
# Version 5.4.0
|
||||
* Add possibility to save exceptions in file
|
||||
|
||||
# Version 5.3.1
|
||||
* Fix interface naming
|
||||
|
||||
|
||||
14
README.md
14
README.md
@@ -146,6 +146,20 @@ framework:
|
||||
CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage: async
|
||||
```
|
||||
|
||||
### Exceptions mode
|
||||
Dataflow can save exceptions in any filesystem you want.
|
||||
This allows dataflow to save exceptions in filesystem instead of the database
|
||||
You have to install `league/flysystem`.
|
||||
|
||||
To enable exceptions mode:
|
||||
|
||||
```yaml
|
||||
code_rhapsodie_dataflow:
|
||||
exceptions_mode:
|
||||
type: 'file'
|
||||
flysystem_service: 'app.filesystem' #The name of the \League\Flysystem\Filesystem service
|
||||
```
|
||||
|
||||
## Define a dataflow type
|
||||
|
||||
This bundle uses a fixed and simple workflow structure in order to let you focus on the data processing logic part of
|
||||
|
||||
@@ -7,6 +7,7 @@ use CodeRhapsodie\DataflowBundle\DataflowType\Result;
|
||||
use CodeRhapsodie\DataflowBundle\Entity\Job;
|
||||
use CodeRhapsodie\DataflowBundle\Event\Events;
|
||||
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
|
||||
use CodeRhapsodie\DataflowBundle\Gateway\JobGateway;
|
||||
use CodeRhapsodie\DataflowBundle\Processor\JobProcessor;
|
||||
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
|
||||
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
|
||||
@@ -20,14 +21,16 @@ class JobProcessorTest extends TestCase
|
||||
private JobRepository|MockObject $repository;
|
||||
private DataflowTypeRegistryInterface|MockObject $registry;
|
||||
private EventDispatcherInterface|MockObject $dispatcher;
|
||||
private JobGateway|MockObject $jobGateway;
|
||||
|
||||
protected function setUp(): void
|
||||
{
|
||||
$this->repository = $this->createMock(JobRepository::class);
|
||||
$this->registry = $this->createMock(DataflowTypeRegistryInterface::class);
|
||||
$this->dispatcher = $this->createMock(EventDispatcherInterface::class);
|
||||
$this->jobGateway = $this->createMock(JobGateway::class);
|
||||
|
||||
$this->processor = new JobProcessor($this->repository, $this->registry, $this->dispatcher);
|
||||
$this->processor = new JobProcessor($this->repository, $this->registry, $this->dispatcher, $this->jobGateway);
|
||||
}
|
||||
|
||||
public function testProcess()
|
||||
@@ -72,7 +75,7 @@ class JobProcessorTest extends TestCase
|
||||
->willReturn($result)
|
||||
;
|
||||
|
||||
$this->repository
|
||||
$this->jobGateway
|
||||
->expects($this->exactly(2))
|
||||
->method('save')
|
||||
;
|
||||
|
||||
@@ -6,7 +6,7 @@ namespace CodeRhapsodie\DataflowBundle\Command;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Entity\Job;
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
|
||||
use CodeRhapsodie\DataflowBundle\Gateway\JobGateway;
|
||||
use Symfony\Component\Console\Attribute\AsCommand;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
@@ -26,7 +26,7 @@ class JobShowCommand extends Command
|
||||
Job::STATUS_COMPLETED => 'Completed',
|
||||
];
|
||||
|
||||
public function __construct(private JobRepository $jobRepository, private ConnectionFactory $connectionFactory)
|
||||
public function __construct(private JobGateway $jobGateway, private ConnectionFactory $connectionFactory)
|
||||
{
|
||||
parent::__construct();
|
||||
}
|
||||
@@ -58,9 +58,9 @@ class JobShowCommand extends Command
|
||||
}
|
||||
|
||||
if ($scheduleId) {
|
||||
$job = $this->jobRepository->findLastForDataflowId($scheduleId);
|
||||
$job = $this->jobGateway->findLastForDataflowId($scheduleId);
|
||||
} elseif ($jobId) {
|
||||
$job = $this->jobRepository->find($jobId);
|
||||
$job = $this->jobGateway->find($jobId);
|
||||
} else {
|
||||
$io->error('You must pass `job-id` or `schedule-id` option.');
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
|
||||
*/
|
||||
class CodeRhapsodieDataflowExtension extends Extension
|
||||
{
|
||||
public function load(array $configs, ContainerBuilder $container)
|
||||
public function load(array $configs, ContainerBuilder $container): void
|
||||
{
|
||||
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
|
||||
$loader->load('services.yaml');
|
||||
@@ -29,6 +29,12 @@ class CodeRhapsodieDataflowExtension extends Extension
|
||||
|
||||
$container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']);
|
||||
$container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']);
|
||||
$container->setParameter('coderhapsodie.dataflow.exceptions_mode.type', $config['exceptions_mode']['type']);
|
||||
|
||||
if ($config['exceptions_mode']['type'] === 'file') {
|
||||
$container->setParameter('coderhapsodie.dataflow.flysystem_service', $config['exceptions_mode']['flysystem_service']);
|
||||
$loader->load('exceptions_services.yaml');
|
||||
}
|
||||
|
||||
if ($config['messenger_mode']['enabled']) {
|
||||
$container->setParameter('coderhapsodie.dataflow.bus', $config['messenger_mode']['bus']);
|
||||
|
||||
@@ -12,7 +12,7 @@ use Symfony\Component\DependencyInjection\Reference;
|
||||
|
||||
class BusCompilerPass implements CompilerPassInterface
|
||||
{
|
||||
public function process(ContainerBuilder $container)
|
||||
public function process(ContainerBuilder $container): void
|
||||
{
|
||||
if (!$container->hasParameter('coderhapsodie.dataflow.bus')) {
|
||||
return;
|
||||
|
||||
@@ -16,7 +16,7 @@ use Symfony\Component\DependencyInjection\Reference;
|
||||
*/
|
||||
class DataflowTypeCompilerPass implements CompilerPassInterface
|
||||
{
|
||||
public function process(ContainerBuilder $container)
|
||||
public function process(ContainerBuilder $container): void
|
||||
{
|
||||
if (!$container->has(DataflowTypeRegistry::class)) {
|
||||
return;
|
||||
|
||||
@@ -12,7 +12,7 @@ use Symfony\Component\DependencyInjection\Reference;
|
||||
|
||||
class DefaultLoggerCompilerPass implements CompilerPassInterface
|
||||
{
|
||||
public function process(ContainerBuilder $container)
|
||||
public function process(ContainerBuilder $container): void
|
||||
{
|
||||
$defaultLogger = $container->getParameter('coderhapsodie.dataflow.default_logger');
|
||||
if (!$container->has($defaultLogger)) {
|
||||
|
||||
29
src/DependencyInjection/Compiler/ExceptionCompilerPass.php
Normal file
29
src/DependencyInjection/Compiler/ExceptionCompilerPass.php
Normal file
@@ -0,0 +1,29 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\ExceptionsHandler\FilesystemExceptionHandler;
|
||||
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
|
||||
use Symfony\Component\DependencyInjection\ContainerBuilder;
|
||||
use Symfony\Component\DependencyInjection\Exception\InvalidArgumentException;
|
||||
use Symfony\Component\DependencyInjection\Reference;
|
||||
|
||||
class ExceptionCompilerPass implements CompilerPassInterface
|
||||
{
|
||||
public function process(ContainerBuilder $container): void
|
||||
{
|
||||
if (!$container->hasParameter('coderhapsodie.dataflow.flysystem_service')) {
|
||||
return;
|
||||
}
|
||||
|
||||
$flysystem = $container->getParameter('coderhapsodie.dataflow.flysystem_service');
|
||||
if (!$container->has($flysystem)) {
|
||||
throw new InvalidArgumentException(\sprintf('Service "%s" not found', $flysystem));
|
||||
}
|
||||
|
||||
$definition = $container->findDefinition(FilesystemExceptionHandler::class);
|
||||
$definition->setArgument('$filesystem', new Reference($flysystem));
|
||||
}
|
||||
}
|
||||
@@ -38,6 +38,20 @@ class Configuration implements ConfigurationInterface
|
||||
->thenInvalid('You need "symfony/messenger" in order to use Dataflow messenger mode.')
|
||||
->end()
|
||||
->end()
|
||||
->arrayNode('exceptions_mode')
|
||||
->addDefaultsIfNotSet()
|
||||
->children()
|
||||
->scalarNode('type')
|
||||
->defaultValue('database')
|
||||
->end()
|
||||
->scalarNode('flysystem_service')
|
||||
->end()
|
||||
->validate()
|
||||
->ifTrue(static fn ($v): bool => $v['type'] === 'file' && !interface_exists('\League\Flysystem\Filesystem'))
|
||||
->thenInvalid('You need "league/flysystem" to use Dataflow file exception mode.')
|
||||
->end()
|
||||
->end()
|
||||
->end()
|
||||
->end()
|
||||
;
|
||||
|
||||
|
||||
12
src/ExceptionsHandler/ExceptionHandlerInterface.php
Normal file
12
src/ExceptionsHandler/ExceptionHandlerInterface.php
Normal file
@@ -0,0 +1,12 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\ExceptionsHandler;
|
||||
|
||||
interface ExceptionHandlerInterface
|
||||
{
|
||||
public function save(?int $jobId, ?array $exceptions): void;
|
||||
|
||||
public function find(int $jobId): ?array;
|
||||
}
|
||||
37
src/ExceptionsHandler/FilesystemExceptionHandler.php
Normal file
37
src/ExceptionsHandler/FilesystemExceptionHandler.php
Normal file
@@ -0,0 +1,37 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\ExceptionsHandler;
|
||||
|
||||
use League\Flysystem\Filesystem;
|
||||
use League\Flysystem\FilesystemException;
|
||||
|
||||
class FilesystemExceptionHandler implements ExceptionHandlerInterface
|
||||
{
|
||||
public function __construct(private Filesystem $filesystem)
|
||||
{
|
||||
}
|
||||
|
||||
public function save(?int $jobId, ?array $exceptions): void
|
||||
{
|
||||
if ($jobId === null || empty($exceptions)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->filesystem->write(\sprintf('dataflow-job-%s.log', $jobId), json_encode($exceptions));
|
||||
}
|
||||
|
||||
public function find(int $jobId): ?array
|
||||
{
|
||||
try {
|
||||
if (!$this->filesystem->has(\sprintf('dataflow-job-%s.log', $jobId))) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return json_decode($this->filesystem->read(\sprintf('dataflow-job-%s.log', $jobId)), true);
|
||||
} catch (FilesystemException) {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
}
|
||||
17
src/ExceptionsHandler/NullExceptionHandler.php
Normal file
17
src/ExceptionsHandler/NullExceptionHandler.php
Normal file
@@ -0,0 +1,17 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\ExceptionsHandler;
|
||||
|
||||
class NullExceptionHandler implements ExceptionHandlerInterface
|
||||
{
|
||||
public function save(?int $jobId, ?array $exceptions): void
|
||||
{
|
||||
}
|
||||
|
||||
public function find(int $jobId): ?array
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
52
src/Gateway/JobGateway.php
Normal file
52
src/Gateway/JobGateway.php
Normal file
@@ -0,0 +1,52 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Gateway;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Entity\Job;
|
||||
use CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface;
|
||||
use CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler;
|
||||
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
|
||||
|
||||
class JobGateway
|
||||
{
|
||||
public function __construct(private JobRepository $repository, private ExceptionHandlerInterface $exceptionHandler)
|
||||
{
|
||||
}
|
||||
|
||||
public function find(int $jobId): ?Job
|
||||
{
|
||||
$job = $this->repository->find($jobId);
|
||||
|
||||
return $this->loadExceptions($job);
|
||||
}
|
||||
|
||||
public function save(Job $job): void
|
||||
{
|
||||
if (!$this->exceptionHandler instanceof NullExceptionHandler) {
|
||||
$this->exceptionHandler->save($job->getId(), $job->getExceptions());
|
||||
$job->setExceptions([]);
|
||||
}
|
||||
|
||||
$this->repository->save($job);
|
||||
}
|
||||
|
||||
public function findLastForDataflowId(int $scheduleId): ?Job
|
||||
{
|
||||
$job = $this->repository->findLastForDataflowId($scheduleId);
|
||||
|
||||
return $this->loadExceptions($job);
|
||||
}
|
||||
|
||||
private function loadExceptions(?Job $job): ?Job
|
||||
{
|
||||
if ($job === null || $this->exceptionHandler instanceof NullExceptionHandler) {
|
||||
return $job;
|
||||
}
|
||||
|
||||
$this->exceptionHandler->save($job->getId(), $job->getExceptions());
|
||||
|
||||
return $job->setExceptions($this->exceptionHandler->find($job->getId()));
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,7 @@ use CodeRhapsodie\DataflowBundle\DataflowType\Result;
|
||||
use CodeRhapsodie\DataflowBundle\Entity\Job;
|
||||
use CodeRhapsodie\DataflowBundle\Event\Events;
|
||||
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
|
||||
use CodeRhapsodie\DataflowBundle\Gateway\JobGateway;
|
||||
use CodeRhapsodie\DataflowBundle\Logger\BufferHandler;
|
||||
use CodeRhapsodie\DataflowBundle\Logger\DelegatingLogger;
|
||||
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
|
||||
@@ -22,8 +23,12 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
|
||||
{
|
||||
use LoggerAwareTrait;
|
||||
|
||||
public function __construct(private JobRepository $repository, private DataflowTypeRegistryInterface $registry, private EventDispatcherInterface $dispatcher)
|
||||
{
|
||||
public function __construct(
|
||||
private JobRepository $repository,
|
||||
private DataflowTypeRegistryInterface $registry,
|
||||
private EventDispatcherInterface $dispatcher,
|
||||
private JobGateway $jobGateway,
|
||||
) {
|
||||
}
|
||||
|
||||
public function process(Job $job): void
|
||||
@@ -64,7 +69,7 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
|
||||
->setStatus(Job::STATUS_RUNNING)
|
||||
->setStartTime(new \DateTime())
|
||||
;
|
||||
$this->repository->save($job);
|
||||
$this->jobGateway->save($job);
|
||||
}
|
||||
|
||||
private function afterProcessing(Job $job, Result $result, BufferHandler $bufferLogger): void
|
||||
@@ -75,7 +80,8 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
|
||||
->setCount($result->getSuccessCount())
|
||||
->setExceptions($bufferLogger->clearBuffer())
|
||||
;
|
||||
$this->repository->save($job);
|
||||
|
||||
$this->jobGateway->save($job);
|
||||
|
||||
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING);
|
||||
}
|
||||
|
||||
5
src/Resources/config/exceptions_services.yaml
Normal file
5
src/Resources/config/exceptions_services.yaml
Normal file
@@ -0,0 +1,5 @@
|
||||
services:
|
||||
CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\FilesystemExceptionHandler'
|
||||
CodeRhapsodie\DataflowBundle\ExceptionsHandler\FilesystemExceptionHandler:
|
||||
arguments:
|
||||
$filesystem: ~ # Filled in compiler pass
|
||||
@@ -28,7 +28,7 @@ services:
|
||||
|
||||
CodeRhapsodie\DataflowBundle\Command\JobShowCommand:
|
||||
arguments:
|
||||
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
|
||||
$jobGateway: '@CodeRhapsodie\DataflowBundle\Gateway\JobGateway'
|
||||
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
|
||||
tags: ['console.command']
|
||||
|
||||
@@ -93,3 +93,10 @@ services:
|
||||
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
|
||||
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
|
||||
$dispatcher: '@event_dispatcher'
|
||||
$jobGateway: '@CodeRhapsodie\DataflowBundle\Gateway\JobGateway'
|
||||
CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler'
|
||||
CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler:
|
||||
CodeRhapsodie\DataflowBundle\Gateway\JobGateway:
|
||||
arguments:
|
||||
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
|
||||
$exceptionHandler: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface'
|
||||
|
||||
Reference in New Issue
Block a user