2 Commits

Author SHA1 Message Date
AUDUL
405ce87923 * Add possibility to save exceptions in file (#81)
* * Add possibility to save exceptions in file
2025-10-07 16:35:59 +02:00
AUDUL
0938f7fd1e Fix interface naming (#80)
* Fix interface naming
2025-10-02 15:36:14 +02:00
17 changed files with 225 additions and 17 deletions

View File

@@ -1,3 +1,9 @@
# Version 5.4.0
* Add possibility to save exceptions in file
# Version 5.3.1
* Fix interface naming
# Version 5.3.0
* Added auto update count processed item while running job

View File

@@ -146,6 +146,20 @@ framework:
CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage: async
```
### Exceptions mode
Dataflow can save exceptions in any filesystem you want.
This allows dataflow to save exceptions in filesystem instead of the database
You have to install `league/flysystem`.
To enable exceptions mode:
```yaml
code_rhapsodie_dataflow:
exceptions_mode:
type: 'file'
flysystem_service: 'app.filesystem' #The name of the \League\Flysystem\Filesystem service
```
## Define a dataflow type
This bundle uses a fixed and simple workflow structure in order to let you focus on the data processing logic part of

View File

@@ -7,6 +7,7 @@ use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Gateway\JobGateway;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessor;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
@@ -20,14 +21,16 @@ class JobProcessorTest extends TestCase
private JobRepository|MockObject $repository;
private DataflowTypeRegistryInterface|MockObject $registry;
private EventDispatcherInterface|MockObject $dispatcher;
private JobGateway|MockObject $jobGateway;
protected function setUp(): void
{
$this->repository = $this->createMock(JobRepository::class);
$this->registry = $this->createMock(DataflowTypeRegistryInterface::class);
$this->dispatcher = $this->createMock(EventDispatcherInterface::class);
$this->jobGateway = $this->createMock(JobGateway::class);
$this->processor = new JobProcessor($this->repository, $this->registry, $this->dispatcher);
$this->processor = new JobProcessor($this->repository, $this->registry, $this->dispatcher, $this->jobGateway);
}
public function testProcess()
@@ -72,7 +75,7 @@ class JobProcessorTest extends TestCase
->willReturn($result)
;
$this->repository
$this->jobGateway
->expects($this->exactly(2))
->method('save')
;

View File

@@ -6,7 +6,7 @@ namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Gateway\JobGateway;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
@@ -26,7 +26,7 @@ class JobShowCommand extends Command
Job::STATUS_COMPLETED => 'Completed',
];
public function __construct(private JobRepository $jobRepository, private ConnectionFactory $connectionFactory)
public function __construct(private JobGateway $jobGateway, private ConnectionFactory $connectionFactory)
{
parent::__construct();
}
@@ -58,9 +58,9 @@ class JobShowCommand extends Command
}
if ($scheduleId) {
$job = $this->jobRepository->findLastForDataflowId($scheduleId);
$job = $this->jobGateway->findLastForDataflowId($scheduleId);
} elseif ($jobId) {
$job = $this->jobRepository->find($jobId);
$job = $this->jobGateway->find($jobId);
} else {
$io->error('You must pass `job-id` or `schedule-id` option.');

View File

@@ -15,7 +15,7 @@ use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
*/
class CodeRhapsodieDataflowExtension extends Extension
{
public function load(array $configs, ContainerBuilder $container)
public function load(array $configs, ContainerBuilder $container): void
{
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
$loader->load('services.yaml');
@@ -29,6 +29,12 @@ class CodeRhapsodieDataflowExtension extends Extension
$container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']);
$container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']);
$container->setParameter('coderhapsodie.dataflow.exceptions_mode.type', $config['exceptions_mode']['type']);
if ($config['exceptions_mode']['type'] === 'file') {
$container->setParameter('coderhapsodie.dataflow.flysystem_service', $config['exceptions_mode']['flysystem_service']);
$loader->load('exceptions_services.yaml');
}
if ($config['messenger_mode']['enabled']) {
$container->setParameter('coderhapsodie.dataflow.bus', $config['messenger_mode']['bus']);

View File

@@ -12,7 +12,7 @@ use Symfony\Component\DependencyInjection\Reference;
class BusCompilerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container)
public function process(ContainerBuilder $container): void
{
if (!$container->hasParameter('coderhapsodie.dataflow.bus')) {
return;

View File

@@ -16,7 +16,7 @@ use Symfony\Component\DependencyInjection\Reference;
*/
class DataflowTypeCompilerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container)
public function process(ContainerBuilder $container): void
{
if (!$container->has(DataflowTypeRegistry::class)) {
return;

View File

@@ -12,7 +12,7 @@ use Symfony\Component\DependencyInjection\Reference;
class DefaultLoggerCompilerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container)
public function process(ContainerBuilder $container): void
{
$defaultLogger = $container->getParameter('coderhapsodie.dataflow.default_logger');
if (!$container->has($defaultLogger)) {

View File

@@ -0,0 +1,29 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler;
use CodeRhapsodie\DataflowBundle\ExceptionsHandler\FilesystemExceptionHandler;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Exception\InvalidArgumentException;
use Symfony\Component\DependencyInjection\Reference;
class ExceptionCompilerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container): void
{
if (!$container->hasParameter('coderhapsodie.dataflow.flysystem_service')) {
return;
}
$flysystem = $container->getParameter('coderhapsodie.dataflow.flysystem_service');
if (!$container->has($flysystem)) {
throw new InvalidArgumentException(\sprintf('Service "%s" not found', $flysystem));
}
$definition = $container->findDefinition(FilesystemExceptionHandler::class);
$definition->setArgument('$filesystem', new Reference($flysystem));
}
}

View File

@@ -38,6 +38,20 @@ class Configuration implements ConfigurationInterface
->thenInvalid('You need "symfony/messenger" in order to use Dataflow messenger mode.')
->end()
->end()
->arrayNode('exceptions_mode')
->addDefaultsIfNotSet()
->children()
->scalarNode('type')
->defaultValue('database')
->end()
->scalarNode('flysystem_service')
->end()
->validate()
->ifTrue(static fn ($v): bool => $v['type'] === 'file' && !interface_exists('\League\Flysystem\Filesystem'))
->thenInvalid('You need "league/flysystem" to use Dataflow file exception mode.')
->end()
->end()
->end()
->end()
;

View File

@@ -0,0 +1,12 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\ExceptionsHandler;
interface ExceptionHandlerInterface
{
public function save(?int $jobId, ?array $exceptions): void;
public function find(int $jobId): ?array;
}

View File

@@ -0,0 +1,37 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\ExceptionsHandler;
use League\Flysystem\Filesystem;
use League\Flysystem\FilesystemException;
class FilesystemExceptionHandler implements ExceptionHandlerInterface
{
public function __construct(private Filesystem $filesystem)
{
}
public function save(?int $jobId, ?array $exceptions): void
{
if ($jobId === null || empty($exceptions)) {
return;
}
$this->filesystem->write(\sprintf('dataflow-job-%s.log', $jobId), json_encode($exceptions));
}
public function find(int $jobId): ?array
{
try {
if (!$this->filesystem->has(\sprintf('dataflow-job-%s.log', $jobId))) {
return [];
}
return json_decode($this->filesystem->read(\sprintf('dataflow-job-%s.log', $jobId)), true);
} catch (FilesystemException) {
return [];
}
}
}

View File

@@ -0,0 +1,17 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\ExceptionsHandler;
class NullExceptionHandler implements ExceptionHandlerInterface
{
public function save(?int $jobId, ?array $exceptions): void
{
}
public function find(int $jobId): ?array
{
return null;
}
}

View File

@@ -0,0 +1,52 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Gateway;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface;
use CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
class JobGateway
{
public function __construct(private JobRepository $repository, private ExceptionHandlerInterface $exceptionHandler)
{
}
public function find(int $jobId): ?Job
{
$job = $this->repository->find($jobId);
return $this->loadExceptions($job);
}
public function save(Job $job): void
{
if (!$this->exceptionHandler instanceof NullExceptionHandler) {
$this->exceptionHandler->save($job->getId(), $job->getExceptions());
$job->setExceptions([]);
}
$this->repository->save($job);
}
public function findLastForDataflowId(int $scheduleId): ?Job
{
$job = $this->repository->findLastForDataflowId($scheduleId);
return $this->loadExceptions($job);
}
private function loadExceptions(?Job $job): ?Job
{
if ($job === null || $this->exceptionHandler instanceof NullExceptionHandler) {
return $job;
}
$this->exceptionHandler->save($job->getId(), $job->getExceptions());
return $job->setExceptions($this->exceptionHandler->find($job->getId()));
}
}

View File

@@ -4,11 +4,12 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Processor;
use CodeRhapsodie\DataflowBundle\DataflowType\RepositoryInterface;
use CodeRhapsodie\DataflowBundle\DataflowType\AutoUpdateCountInterface;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Gateway\JobGateway;
use CodeRhapsodie\DataflowBundle\Logger\BufferHandler;
use CodeRhapsodie\DataflowBundle\Logger\DelegatingLogger;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
@@ -22,8 +23,12 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
public function __construct(private JobRepository $repository, private DataflowTypeRegistryInterface $registry, private EventDispatcherInterface $dispatcher)
{
public function __construct(
private JobRepository $repository,
private DataflowTypeRegistryInterface $registry,
private EventDispatcherInterface $dispatcher,
private JobGateway $jobGateway,
) {
}
public function process(Job $job): void
@@ -31,7 +36,7 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
$this->beforeProcessing($job);
$dataflowType = $this->registry->getDataflowType($job->getDataflowType());
if ($dataflowType instanceof RepositoryInterface) {
if ($dataflowType instanceof AutoUpdateCountInterface) {
$dataflowType->setRepository($this->repository);
}
@@ -64,7 +69,7 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
->setStatus(Job::STATUS_RUNNING)
->setStartTime(new \DateTime())
;
$this->repository->save($job);
$this->jobGateway->save($job);
}
private function afterProcessing(Job $job, Result $result, BufferHandler $bufferLogger): void
@@ -75,7 +80,8 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
->setCount($result->getSuccessCount())
->setExceptions($bufferLogger->clearBuffer())
;
$this->repository->save($job);
$this->jobGateway->save($job);
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING);
}

View File

@@ -0,0 +1,5 @@
services:
CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\FilesystemExceptionHandler'
CodeRhapsodie\DataflowBundle\ExceptionsHandler\FilesystemExceptionHandler:
arguments:
$filesystem: ~ # Filled in compiler pass

View File

@@ -28,7 +28,7 @@ services:
CodeRhapsodie\DataflowBundle\Command\JobShowCommand:
arguments:
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$jobGateway: '@CodeRhapsodie\DataflowBundle\Gateway\JobGateway'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
tags: ['console.command']
@@ -93,3 +93,10 @@ services:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
$dispatcher: '@event_dispatcher'
$jobGateway: '@CodeRhapsodie\DataflowBundle\Gateway\JobGateway'
CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler'
CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler:
CodeRhapsodie\DataflowBundle\Gateway\JobGateway:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$exceptionHandler: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface'