15 Commits

Author SHA1 Message Date
AUDUL
9d76b45771 Added custom index for job status (#78)
* Added custom index for job status
2025-07-04 09:26:13 +02:00
Matt Mankins
db37c4bdd1 Update Kudos Github Action to support generation from source repo only (#71)
* Update semicolons-kudos.yaml

* Update GitHub Action workflow for Semicolons Kudos Action

* Update semicolons-kudos.yaml

* Update GitHub Action workflow for Semicolons Kudos Action

* Update GitHub Action workflow for Semicolons Kudos Action

---------

Co-authored-by: semicolons-for-kudos[bot] <145267638+semicolons-for-kudos[bot]@users.noreply.github.com>
2023-12-27 17:29:46 +01:00
Olivier PORTIER
f20cd96ec5 Update semicolons-kudos.yaml 2023-12-20 11:49:50 +01:00
Olivier PORTIER
fd2c6aaab5 Update semicolons-kudos.yaml (#70) 2023-12-20 11:36:51 +01:00
Olivier PORTIER
4efd310a6e Initiate Kudos on dataflow-bundle by creating new file semicolons-kudos.yaml (#69) 2023-12-20 11:11:26 +01:00
Jérémy J
cec42a3337 Fix log exception argument typing 2023-12-06 13:56:16 +01:00
jbcr
d440ad008b add sonar config 2023-11-16 16:50:24 +01:00
jeremycr
e8b362526a Fix DBAL 2.12 compatibility break (#68) 2023-07-27 16:47:50 +02:00
jeremycr
3c56a90a93 Added the possibility to define a custom item index for exception logs (#66) 2023-07-27 09:38:22 +02:00
jeremycr
1b2b1be958 Removed travis for now, to be replaced by github actions in the future (#67) 2023-07-27 09:29:34 +02:00
jeremycr
25b2e9ec0f Upgrade for Symfony 6 (#65)
* Upgrade for Symfony 6
2022-08-18 09:36:43 +02:00
jeremycr
d89d7b72b0 Updated changelog for 3.1.0 (#64) 2021-04-21 10:40:27 +02:00
jeremycr
3cf05b555d Added messenger mode (#63) 2021-04-21 09:35:39 +02:00
Mathieu Ledru
9624f68675 Introduce the possibility to add asynchronous steps (#61) 2021-04-21 09:20:48 +02:00
jeremycr
b191e33c47 Added PHP 8 support and DBAL 3 support (#59) 2021-03-23 09:06:04 +01:00
64 changed files with 1172 additions and 815 deletions

View File

@@ -1,3 +0,0 @@
service_name : travis-ci
coverage_clover: var/build/clover.xml
json_path : var/build/upload.json

27
.github/workflows/build.yml vendored Normal file
View File

@@ -0,0 +1,27 @@
name: Build
on:
push:
branches:
- master
jobs:
build:
name: Build
runs-on: ubuntu-latest
permissions: read-all
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis
- uses: sonarsource/sonarqube-scan-action@master
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_HOST_URL: ${{ secrets.SONAR_HOST_URL }}
# If you wish to fail your job when the Quality Gate is red, uncomment the
# following lines. This would typically be used to fail a deployment.
# - uses: sonarsource/sonarqube-quality-gate-action@master
# timeout-minutes: 5
# env:
# SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}

26
.github/workflows/semicolons-kudos.yaml vendored Normal file
View File

@@ -0,0 +1,26 @@
name: Kudos for Code
on:
push:
branches: ["master"]
workflow_dispatch:
jobs:
kudos:
name: Semicolons Kudos
permissions: write-all
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: LoremLabs/kudos-for-code-action@latest
with:
search-dir: "."
destination: "artifact"
generate-nomerges: true
generate-validemails: true
generate-limitdepth: 0
generate-fromrepo: true
analyze-repo: false
skip-ids: ""

View File

@@ -1,128 +0,0 @@
language: php
sudo: false
cache:
directories:
- $HOME/.composer/cache
branches:
only:
- master
- /^\d+\.\d+$/
- travis-setup
env:
global:
- SYMFONY_DEPRECATIONS_HELPER="max[self]=0"
- PHPUNIT_FLAGS="-v"
- PHPUNIT_ENABLED="true"
- STABILITY=stable
- COVERALLS_ENABLED="false"
matrix:
fast_finish: true
include:
- php: '7.1'
- php: '7.2'
- php: '7.3'
- php: '7.4'
# Enable code coverage with the previous supported PHP version
- php: '7.3'
env:
- SYMFONY_VERSION=3.4.*
- COVERALLS_ENABLED="true"
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
# Enable code coverage with the latest supported PHP version
- php: '7.4'
env:
- SYMFONY_VERSION=3.4.*
- COVERALLS_ENABLED="true"
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
# Minimum supported dependencies with the latest and oldest supported PHP versions
- php: '7.1'
env:
- COMPOSER_FLAGS="--prefer-lowest"
- php: '7.3'
env:
- COMPOSER_FLAGS="--prefer-lowest"
# Test each supported Symfony version with lowest supported PHP version
- php: '7.1'
env:
- SYMFONY_VERSION=3.4.*
- php: '7.1'
env:
- SYMFONY_VERSION=4.3.*
- php: '7.1'
env:
- SYMFONY_VERSION=4.4.*
- php: '7.2'
env:
- COVERALLS_ENABLED="true"
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
- SYMFONY_VERSION=5.0.*
# Test unsupported versions of Symfony
- php: '7.1'
env:
- SYMFONY_VERSION=4.1.*
- php: '7.1'
env:
- SYMFONY_VERSION=4.2.*
# Test upcoming Symfony versions with lowest supported PHP version and dev dependencies
- php: '7.2'
env:
- STABILITY=dev
- SYMFONY_VERSION=5.1.*
# Test upcoming PHP versions with dev dependencies
#- php: '7.5snapshot'
# env:
# - STABILITY=dev
# - COMPOSER_FLAGS="--ignore-platform-reqs --prefer-stable"
allow_failures:
# 4.0 not supported because of https://github.com/advisories/GHSA-pgwj-prpq-jpc2
- env:
- SYMFONY_VERSION=4.0.*
- env:
- SYMFONY_VERSION=4.1.*
- env:
- SYMFONY_VERSION=4.2.*
- env:
- STABILITY=dev
- COMPOSER_FLAGS="--ignore-platform-reqs --prefer-stable"
- env:
- STABILITY=dev
- SYMFONY_VERSION=5.1.*
before_install:
- if [[ "$SYMFONY_VERSION" != "" ]]; then
travis_retry composer global require "symfony/flex:^1.4";
composer config extra.symfony.require $SYMFONY_VERSION;
fi
- if [[ "$STABILITY" != "stable" ]]; then
travis_retry composer config minimum-stability $STABILITY;
fi
- if [[ "$COVERALLS_ENABLED" != "true" ]]; then
phpenv config-rm xdebug.ini || true;
fi
- if [[ "$COVERALLS_ENABLED" == "true" ]]; then
travis_retry composer require --dev satooshi/php-coveralls:^2.0 --no-update $COMPOSER_FLAGS;
fi
install:
- travis_retry composer update --prefer-dist --no-interaction --no-suggest --no-progress --ansi $COMPOSER_FLAGS
script: ./vendor/bin/phpunit $PHPUNIT_FLAGS
after_success:
- if [[ "$PHPUNIT_ENABLED" == "true" && "$COVERALLS_ENABLED" == "true" ]]; then
./vendor/bin/php-coveralls -vvv --config .coveralls.yml;
fi;

View File

@@ -1,3 +1,26 @@
# Version 4.2.0
* Added custom index for job status
# Version 4.1.0
* Added custom index for exception log
# Version 4.0.0
* Added Symfony 6 support
* PHP minimum requirements bumped to 8.0
# Version 3.1.0
* Added optional "messenger mode", to delegate jobs execution to workers from the Symfony messenger component
* Added support for asynchronous steps execution, using the AMPHP library (contribution from [matyo91](https://github.com/matyo91))
# Version 3.0.0
* Added PHP 8 support
* PHP minimum requirements bumped to 7.3
* Added Doctrine DBAL 3 support
* Doctrine DBAL minimum requirements bumped to 2.12
# Version 2.2.0
* Improve logging Dataflow job

View File

@@ -3,13 +3,9 @@
DataflowBundle is a bundle for Symfony 3.4+
providing an easy way to create import / export dataflow.
[![Build Status](https://travis-ci.com/code-rhapsodie/dataflow-bundle.svg?branch=master)](https://travis-ci.com/code-rhapsodie/dataflow-bundle)
[![Coverage Status](https://coveralls.io/repos/github/code-rhapsodie/dataflow-bundle/badge.svg)](https://coveralls.io/github/code-rhapsodie/dataflow-bundle)
Dataflow uses a linear generic workflow in three parts:
* one reader
* any number of steps
* any number of steps that can be synchronous or asynchronous
* one or more writers
The reader can read data from anywhere and return data row by row. Each step processes the current row data.
@@ -20,7 +16,6 @@ As the following schema shows, you can define more than one dataflow:
![Dataflow schema](src/Resources/doc/schema.png)
# Features
* Define and configure a Dataflow
@@ -37,8 +32,6 @@ As the following schema shows, you can define more than one dataflow:
Security notice: Symfony 4.x is not supported before 4.1.12, see https://github.com/advisories/GHSA-pgwj-prpq-jpc2
And basically, every allowed-to-failed jobs in our travis configuration are not fully supported.
### Add the dependency
To install this bundle, run this command :
@@ -138,6 +131,31 @@ code_rhapsodie_dataflow:
default_logger: monolog.logger.custom #Service ID of the logger you want Dataflow to use
```
### Messenger mode
Dataflow can delegate the execution of its jobs to the Symfony messenger component, if available.
This allows jobs to be executed concurrently by workers instead of sequentially.
To enable messenger mode:
```yaml
code_rhapsodie_dataflow:
messenger_mode:
enabled: true
# bus: 'messenger.default_bus' #Service ID of the bus you want Dataflow to use, if not the default one
```
You also need to route Dataflow messages to the proper transport:
```yaml
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage: async
```
## Define a dataflow type
This bundle uses a fixed and simple workflow structure in order to let you focus on the data processing logic part of your dataflow.
@@ -216,6 +234,7 @@ If you're using Symfony auto-configuration for your services, this tag will be a
Otherwise, manually add the tag `coderhapsodie.dataflow.type` in your dataflow type service configuration:
```yaml
```yaml
CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType:
tags:
@@ -247,6 +266,10 @@ class MyFirstDataflowType extends AbstractDataflowType
With this configuration, the option `fileName` is required. For an advanced usage of the option resolver, read the [Symfony documentation](https://symfony.com/doc/current/components/options_resolver.html).
For asynchronous management, `AbstractDataflowType` come with two default options :
- loopInterval : default to 0. Update this interval if you wish customise the `tick` loop duration.
- emitInterval : default to 0. Update this interval to have a control when reader must emit new data in the flow pipeline.
### Logging
All exceptions will be caught and written in the logger.
@@ -340,6 +363,7 @@ $builder->setReader(($this->myReader)())
*Steps* are operations performed on the elements before they are handled by the *Writers*. Usually, steps are either:
- converters, that alter the element
- filters, that conditionally prevent further operations on the element
- generators, that can include asynchronous operations
A *Step* can be any callable, taking the element as its argument, and returning either:
- the element, possibly altered
@@ -357,6 +381,16 @@ $builder->addStep(function ($item) {
return $item;
});
// asynchronous step with 2 scale factor
$builder->addStep(function ($item): \Generator {
yield new \Amp\Delayed(1000); // asynchronous processing for 1 second long
// Titles are changed to all caps before export
$item['title'] = strtolower($item['title']);
return $item;
}, 2);
$builder->addStep(function ($item) {
// Private items are not exported
if ($item['private']) {
@@ -368,6 +402,8 @@ $builder->addStep(function ($item) {
//[...]
```
Note : you can ensure writing order for asynchronous operations if all steps are scaled at 1 factor.
### Writers
*Writers* perform the actual import / export operations.
@@ -527,6 +563,8 @@ Several commands are provided to manage schedules and run jobs.
`code-rhapsodie:dataflow:run-pending` Executes job in the queue according to their schedule.
When messenger mode is enabled, jobs will still be created according to their schedule, but execution will be handled by the messenger component instead.
`code-rhapsodie:dataflow:schedule:list` Display the list of dataflows scheduled.
`code-rhapsodie:dataflow:schedule:change-status` Enable or disable a scheduled dataflow
@@ -564,6 +602,8 @@ $ bin/console code-rhapsodie:dataflow:run-pending --connection=default
Please report issues and request features at https://github.com/code-rhapsodie/dataflow-bundle/issues.
Please note that only the last release of the 3.x and the 4.x versions of this bundle are actively supported.
# Contributing
Contributions are very welcome. Please see [CONTRIBUTING.md](CONTRIBUTING.md) for

View File

@@ -18,17 +18,8 @@ class AbstractDataflowTypeTest extends TestCase
$dataflowType = new class($label, $options, $values, $testCase) extends AbstractDataflowType
{
private $label;
private $options;
private $values;
private $testCase;
public function __construct(string $label, array $options, array $values, TestCase $testCase)
public function __construct(private string $label, private array $options, private array $values, private TestCase $testCase)
{
$this->label = $label;
$this->options = $options;
$this->values = $values;
$this->testCase = $testCase;
}
public function getLabel(): string

View File

@@ -0,0 +1,47 @@
<?php
namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType\Dataflow;
use Amp\Delayed;
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\AMPAsyncDataflow;
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\Dataflow;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use PHPUnit\Framework\TestCase;
class AMPAsyncDataflowTest extends TestCase
{
public function testProcess()
{
$reader = [1, 2, 3];
$result = [];
$dataflow = new AMPAsyncDataflow($reader, 'simple');
$dataflow->addStep(static fn($item) => $item + 1);
$dataflow->addStep(static function($item): \Generator {
yield new Delayed(10); //delay 10 milliseconds
return $item * 2;
});
$dataflow->addWriter(new class($result) implements WriterInterface {
private $buffer;
public function __construct(&$buffer) {
$this->buffer = &$buffer;
}
public function prepare()
{
}
public function write($item)
{
$this->buffer[] = $item;
}
public function finish()
{
}
});
$dataflow->process();
self::assertSame([4, 6, 8], $result);
}
}

View File

@@ -43,9 +43,7 @@ class CollectionWriterTest extends TestCase
$embeddedWriter
->expects($this->exactly(count($values)))
->method('write')
->withConsecutive(...array_map(function ($item) {
return [$item];
}, $values))
->withConsecutive(...array_map(fn($item) => [$item], $values))
;
$writer = new CollectionWriter($embeddedWriter);

View File

@@ -10,34 +10,24 @@ use PHPUnit\Framework\TestCase;
class DelegatorWriterTest extends TestCase
{
/** @var DelegatorWriter */
private $delegatorWriter;
private \CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegatorWriter $delegatorWriter;
/** @var DelegateWriterInterface|MockObject */
private $delegateInt;
private \CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegateWriterInterface|\PHPUnit\Framework\MockObject\MockObject $delegateInt;
/** @var DelegateWriterInterface|MockObject */
private $delegateString;
private \CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegateWriterInterface|\PHPUnit\Framework\MockObject\MockObject $delegateString;
/** @var DelegateWriterInterface|MockObject */
private $delegateArray;
private \CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegateWriterInterface|\PHPUnit\Framework\MockObject\MockObject $delegateArray;
protected function setUp(): void
{
$this->delegateInt = $this->createMock(DelegateWriterInterface::class);
$this->delegateInt->method('supports')->willReturnCallback(function ($argument) {
return is_int($argument);
});
$this->delegateInt->method('supports')->willReturnCallback(fn($argument) => is_int($argument));
$this->delegateString = $this->createMock(DelegateWriterInterface::class);
$this->delegateString->method('supports')->willReturnCallback(function ($argument) {
return is_string($argument);
});
$this->delegateString->method('supports')->willReturnCallback(fn($argument) => is_string($argument));
$this->delegateArray = $this->createMock(DelegateWriterInterface::class);
$this->delegateArray->method('supports')->willReturnCallback(function ($argument) {
return is_array($argument);
});
$this->delegateArray->method('supports')->willReturnCallback(fn($argument) => is_array($argument));
$this->delegatorWriter = new DelegatorWriter();
$this->delegatorWriter->addDelegates([

View File

@@ -9,24 +9,19 @@ use CodeRhapsodie\DataflowBundle\Exceptions\UnknownDataflowTypeException;
use CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManager;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Doctrine\DBAL\Driver\Connection;
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\DBAL\Connection;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
class ScheduledDataflowManagerTest extends TestCase
{
/** @var ScheduledDataflowManager */
private $manager;
private \CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManager $manager;
/** @var Connection|MockObject */
private $connection;
private \Doctrine\DBAL\Connection|\PHPUnit\Framework\MockObject\MockObject $connection;
/** @var ScheduledDataflowRepository|MockObject */
private $scheduledDataflowRepository;
private \CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository|\PHPUnit\Framework\MockObject\MockObject $scheduledDataflowRepository;
/** @var JobRepository|MockObject */
private $jobRepository;
private \CodeRhapsodie\DataflowBundle\Repository\JobRepository|\PHPUnit\Framework\MockObject\MockObject $jobRepository;
protected function setUp(): void
{
@@ -70,16 +65,12 @@ class ScheduledDataflowManagerTest extends TestCase
->expects($this->once())
->method('save')
->with(
$this->callback(function (Job $job) use ($type, $options, $next, $label, $scheduled2) {
return (
$job->getStatus() === Job::STATUS_PENDING
&& $job->getDataflowType() === $type
&& $job->getOptions() === $options
&& $job->getRequestedDate() == $next
&& $job->getLabel() === $label
&& $job->getScheduledDataflowId() === $scheduled2->getId()
);
})
$this->callback(fn(Job $job) => $job->getStatus() === Job::STATUS_PENDING
&& $job->getDataflowType() === $type
&& $job->getOptions() === $options
&& $job->getRequestedDate() == $next
&& $job->getLabel() === $label
&& $job->getScheduledDataflowId() === $scheduled2->getId())
)
;

View File

@@ -0,0 +1,55 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Tests\MessengerMode;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage;
use CodeRhapsodie\DataflowBundle\MessengerMode\JobMessageHandler;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
class JobMessageHandlerTest extends TestCase
{
private \CodeRhapsodie\DataflowBundle\Repository\JobRepository|\PHPUnit\Framework\MockObject\MockObject $repository;
private \CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface|\PHPUnit\Framework\MockObject\MockObject $processor;
private \CodeRhapsodie\DataflowBundle\MessengerMode\JobMessageHandler $handler;
protected function setUp(): void
{
$this->repository = $this->createMock(JobRepository::class);
$this->processor = $this->createMock(JobProcessorInterface::class);
$this->handler = new JobMessageHandler($this->repository, $this->processor);
}
public function testGetHandledMessages()
{
$this->assertSame([JobMessage::class], JobMessageHandler::getHandledMessages());
}
public function testInvoke()
{
$message = new JobMessage($id = 32);
$this->repository
->expects($this->once())
->method('find')
->with($id)
->willReturn($job = new Job())
;
$this->processor
->expects($this->once())
->method('process')
->with($job)
;
($this->handler)($message);
}
}

View File

@@ -0,0 +1,108 @@
<?php
namespace CodeRhapsodie\DataflowBundle\Tests\Processor;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowTypeInterface;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessor;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class JobProcessorTest extends TestCase
{
private \CodeRhapsodie\DataflowBundle\Processor\JobProcessor $processor;
private \CodeRhapsodie\DataflowBundle\Repository\JobRepository|\PHPUnit\Framework\MockObject\MockObject $repository;
private \CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface|\PHPUnit\Framework\MockObject\MockObject $registry;
private \Symfony\Component\EventDispatcher\EventDispatcherInterface|\PHPUnit\Framework\MockObject\MockObject $dispatcher;
protected function setUp(): void
{
$this->repository = $this->createMock(JobRepository::class);
$this->registry = $this->createMock(DataflowTypeRegistryInterface::class);
$this->dispatcher = $this->createMock(EventDispatcherInterface::class);
$this->processor = new JobProcessor($this->repository, $this->registry, $this->dispatcher);
}
public function testProcess()
{
$now = new \DateTimeImmutable();
$job = (new Job())
->setStatus(Job::STATUS_PENDING)
->setDataflowType($type = 'type')
->setOptions($options = ['option1' => 'value1'])
;
// Symfony 3.4 to 4.4 call
if (!class_exists(\Symfony\Contracts\EventDispatcher\Event::class)) {
$this->dispatcher
->expects($this->exactly(2))
->method('dispatch')
->withConsecutive(
[
Events::BEFORE_PROCESSING,
$this->callback(fn(ProcessingEvent $event) => $event->getJob() === $job)
],
[
Events::AFTER_PROCESSING,
$this->callback(fn(ProcessingEvent $event) => $event->getJob() === $job)
],
);
} else { // Symfony 5.0+
$this->dispatcher
->expects($this->exactly(2))
->method('dispatch')
->withConsecutive(
[
$this->callback(fn(ProcessingEvent $event) => $event->getJob() === $job),
Events::BEFORE_PROCESSING,
],
[
$this->callback(fn(ProcessingEvent $event) => $event->getJob() === $job),
Events::AFTER_PROCESSING,
],
);
}
$dataflowType = $this->createMock(DataflowTypeInterface::class);
$this->registry
->expects($this->once())
->method('getDataflowType')
->with($type)
->willReturn($dataflowType)
;
$bag = [new \Exception('message1')];
$result = new Result('name', new \DateTimeImmutable(), $end = new \DateTimeImmutable(), $count = 10, $bag);
$dataflowType
->expects($this->once())
->method('process')
->with($options)
->willReturn($result)
;
$this->repository
->expects($this->exactly(2))
->method('save')
;
$this->processor->process($job);
$this->assertGreaterThanOrEqual($now, $job->getStartTime());
$this->assertSame(Job::STATUS_COMPLETED, $job->getStatus());
$this->assertSame($end, $job->getEndTime());
$this->assertSame($count - count($bag), $job->getCount());
}
}

View File

@@ -10,8 +10,7 @@ use PHPUnit\Framework\TestCase;
class DataflowTypeRegistryTest extends TestCase
{
/** @var DataflowTypeRegistry */
private $registry;
private \CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry $registry;
protected function setUp(): void
{
@@ -33,7 +32,7 @@ class DataflowTypeRegistryTest extends TestCase
$this->registry->registerDataflowType($type);
$this->assertSame($type, $this->registry->getDataflowType(get_class($type)));
$this->assertSame($type, $this->registry->getDataflowType($type::class));
$this->assertSame($type, $this->registry->getDataflowType($alias1));
$this->assertSame($type, $this->registry->getDataflowType($alias2));
$this->assertContains($type, $this->registry->listDataflowTypes());

View File

@@ -0,0 +1,65 @@
<?php
namespace CodeRhapsodie\DataflowBundle\Tests\Runner;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Runner\MessengerDataflowRunner;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
class MessengerDataflowRunnerTest extends TestCase
{
private \CodeRhapsodie\DataflowBundle\Runner\MessengerDataflowRunner $runner;
private \CodeRhapsodie\DataflowBundle\Repository\JobRepository|\PHPUnit\Framework\MockObject\MockObject $repository;
private \Symfony\Component\Messenger\MessageBusInterface|\PHPUnit\Framework\MockObject\MockObject $bus;
protected function setUp(): void
{
$this->repository = $this->createMock(JobRepository::class);
$this->bus = $this->createMock(MessageBusInterface::class);
$this->runner = new MessengerDataflowRunner($this->repository, $this->bus);
}
public function testRunPendingDataflows()
{
$job1 = (new Job())->setId($id1 = 10);
$job2 = (new Job())->setId($id2 = 20);
$this->repository
->expects($this->exactly(3))
->method('findNextPendingDataflow')
->willReturnOnConsecutiveCalls($job1, $job2, null)
;
$this->repository
->expects($this->exactly(2))
->method('save')
->withConsecutive([$job1], [$job2])
;
$this->bus
->expects($this->exactly(2))
->method('dispatch')
->withConsecutive([
$this->callback(fn($message) => $message instanceof JobMessage && $message->getJobId() === $id1)
], [
$this->callback(fn($message) => $message instanceof JobMessage && $message->getJobId() === $id2)
])
->willReturnOnConsecutiveCalls(
new Envelope(new JobMessage($id1)),
new Envelope(new JobMessage($id2))
)
;
$this->runner->runPendingDataflows();
$this->assertSame(Job::STATUS_QUEUED, $job1->getStatus());
$this->assertSame(Job::STATUS_QUEUED, $job2->getStatus());
}
}

View File

@@ -2,55 +2,33 @@
namespace CodeRhapsodie\DataflowBundle\Tests\Runner;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowTypeInterface;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner;
use Doctrine\ORM\EntityManagerInterface;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class PendingDataflowRunnerTest extends TestCase
{
/** @var PendingDataflowRunner */
private $runner;
private \CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner $runner;
/** @var JobRepository|MockObject */
private $repository;
private \CodeRhapsodie\DataflowBundle\Repository\JobRepository|\PHPUnit\Framework\MockObject\MockObject $repository;
/** @var DataflowTypeRegistryInterface|MockObject */
private $registry;
/** @var EventDispatcherInterface|MockObject */
private $dispatcher;
private \CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface|\PHPUnit\Framework\MockObject\MockObject $processor;
protected function setUp(): void
{
$this->repository = $this->createMock(JobRepository::class);
$this->registry = $this->createMock(DataflowTypeRegistryInterface::class);
$this->dispatcher = $this->createMock(EventDispatcherInterface::class);
$this->processor = $this->createMock(JobProcessorInterface::class);
$this->runner = new PendingDataflowRunner($this->repository, $this->registry, $this->dispatcher);
$this->runner = new PendingDataflowRunner($this->repository, $this->processor);
}
public function testRunPendingDataflows()
{
$now = new \DateTime();
$job1 = (new Job())
->setStatus(Job::STATUS_PENDING)
->setDataflowType($type1 = 'type1')
->setOptions($options1 = ['option1' => 'value1'])
;
$job2 = (new Job())
->setStatus(Job::STATUS_PENDING)
->setDataflowType($type2 = 'type2')
->setOptions($options2 = ['option2' => 'value2'])
;
$job1 = new Job();
$job2 = new Job();
$this->repository
->expects($this->exactly(3))
@@ -58,113 +36,12 @@ class PendingDataflowRunnerTest extends TestCase
->willReturnOnConsecutiveCalls($job1, $job2, null)
;
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher
->expects($this->exactly(4))
->method('dispatch')
->withConsecutive(
[
Events::BEFORE_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
})
],
[
Events::AFTER_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
})
],
[
Events::BEFORE_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
})
],
[
Events::AFTER_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
})
]
);
} else { // Symfony 5.0+
$this->dispatcher
->expects($this->exactly(4))
->method('dispatch')
->withConsecutive(
[
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
}),
Events::BEFORE_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
}),
Events::AFTER_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
}),
Events::BEFORE_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
}),
Events::AFTER_PROCESSING,
]
);
}
$dataflowType1 = $this->createMock(DataflowTypeInterface::class);
$dataflowType2 = $this->createMock(DataflowTypeInterface::class);
$this->registry
$this->processor
->expects($this->exactly(2))
->method('getDataflowType')
->withConsecutive([$type1], [$type2])
->willReturnOnConsecutiveCalls($dataflowType1, $dataflowType2)
;
$bag1 = [new \Exception('message1')];
$bag2 = [new \Exception('message2')];
$result1 = new Result('name', new \DateTime(), $end1 = new \DateTime(), $count1 = 10, $bag1);
$result2 = new Result('name', new \DateTime(), $end2 = new \DateTime(), $count2 = 20, $bag2);
$dataflowType1
->expects($this->once())
->method('process')
->with($options1)
->willReturn($result1)
;
$dataflowType2
->expects($this->once())
->method('process')
->with($options2)
->willReturn($result2)
;
$this->repository
->expects($this->exactly(4))
->method('save')
->withConsecutive([$job1], [$job2])
;
$this->runner->runPendingDataflows();
$this->assertGreaterThanOrEqual($now, $job1->getStartTime());
$this->assertSame(Job::STATUS_COMPLETED, $job1->getStatus());
$this->assertSame($end1, $job1->getEndTime());
$this->assertSame($count1 - count($bag1), $job1->getCount());
$this->assertGreaterThanOrEqual($now, $job2->getStartTime());
$this->assertSame(Job::STATUS_COMPLETED, $job2->getStatus());
$this->assertSame($end2, $job2->getEndTime());
$this->assertSame($count2 - count($bag2), $job2->getCount());
}
}

View File

@@ -41,27 +41,33 @@
}
},
"require": {
"php": "^7.1",
"php": "^8.0",
"ext-json": "*",
"doctrine/dbal": "^2.0",
"doctrine/dbal": "^2.12||^3.0",
"doctrine/doctrine-bundle": "^1.0||^2.0",
"psr/log": "^1.1",
"symfony/config": "^3.4||^4.0||^5.0",
"symfony/console": "^3.4||^4.0||^5.0",
"symfony/dependency-injection": "^3.4||>=4.1.12||^5.0",
"symfony/event-dispatcher": "^3.4||^4.0||^5.0",
"symfony/http-kernel": "^3.4||^4.0||^5.0",
"symfony/lock": "^3.4||^4.0||^5.0",
"symfony/monolog-bridge": "^3.4||^4.0||^5.0",
"symfony/options-resolver": "^3.4||^4.0||^5.0",
"symfony/validator": "^3.4||^4.0||^5.0",
"symfony/yaml": "^3.4||^4.0||^5.0"
"monolog/monolog": "^1.0||^2.0",
"psr/log": "^1.1||^2.0||^3.0",
"symfony/config": "^3.4||^4.0||^5.0||^6.0",
"symfony/console": "^3.4||^4.0||^5.0||^6.0",
"symfony/dependency-injection": "^3.4||>=4.1.12||^5.0||^6.0",
"symfony/event-dispatcher": "^3.4||^4.0||^5.0||^6.0",
"symfony/http-kernel": "^3.4||^4.0||^5.0||^6.0",
"symfony/lock": "^3.4||^4.0||^5.0||^6.0",
"symfony/monolog-bridge": "^3.4||^4.0||^5.0||^6.0",
"symfony/options-resolver": "^3.4||^4.0||^5.0||^6.0",
"symfony/validator": "^3.4||^4.0||^5.0||^6.0",
"symfony/yaml": "^3.4||^4.0||^5.0||^6.0"
},
"require-dev": {
"phpunit/phpunit": "^7||^8"
"amphp/amp": "^2.5",
"phpunit/phpunit": "^7||^8||^9",
"rector/rector": "^0.13.10",
"symfony/messenger": "^4.4||^5.0||^6.0"
},
"suggest": {
"portphp/portphp": "Provides generic readers, steps and writers for your dataflows."
"amphp/amp": "Provide asynchronous steps for your dataflows",
"portphp/portphp": "Provides generic readers, steps and writers for your dataflows.",
"symfony/messenger": "Allows messenger mode, i.e. letting workers run jobs"
},
"config": {
"sort-packages": true

View File

@@ -1,32 +1,21 @@
<?xml version = '1.0' encoding = 'UTF-8'?>
<?xml version="1.0" encoding="UTF-8"?>
<!-- http://www.phpunit.de/manual/current/en/appendixes.configuration.html -->
<phpunit
backupGlobals="false"
backupStaticAttributes="false"
bootstrap="Tests/bootstrap.php"
convertErrorsToExceptions="true"
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
colors="false"
>
<php>
<ini name="error_reporting" value="-1" />
</php>
<testsuites>
<testsuite name="Dataflow tests suite">
<directory suffix="Test.php">./Tests</directory>
</testsuite>
</testsuites>
<filter>
<whitelist>
<directory>./src/</directory>
<exclude>
<directory>Tests/</directory>
<directory>vendor/</directory>
</exclude>
</whitelist>
</filter>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" backupGlobals="false" backupStaticAttributes="false" bootstrap="Tests/bootstrap.php" convertErrorsToExceptions="true" convertNoticesToExceptions="true" convertWarningsToExceptions="true" colors="false" xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.3/phpunit.xsd">
<coverage>
<include>
<directory>./src/</directory>
</include>
<exclude>
<directory>Tests/</directory>
<directory>vendor/</directory>
</exclude>
</coverage>
<php>
<ini name="error_reporting" value="-1"/>
</php>
<testsuites>
<testsuite name="Dataflow tests suite">
<directory suffix="Test.php">./Tests</directory>
</testsuite>
</testsuites>
</phpunit>

25
rector.php Normal file
View File

@@ -0,0 +1,25 @@
<?php
declare(strict_types=1);
use Rector\CodeQuality\Rector\Class_\InlineConstructorDefaultToPropertyRector;
use Rector\Config\RectorConfig;
use Rector\Set\ValueObject\LevelSetList;
use Rector\Symfony\Set\SymfonySetList;
return static function (RectorConfig $rectorConfig): void {
$rectorConfig->paths([
__DIR__ . '/src',
__DIR__ . '/Tests',
]);
// register a single rule
$rectorConfig->rule(InlineConstructorDefaultToPropertyRector::class);
$rectorConfig->sets([
SymfonySetList::SYMFONY_60,
SymfonySetList::SYMFONY_CODE_QUALITY,
SymfonySetList::SYMFONY_CONSTRUCTOR_INJECTION,
LevelSetList::UP_TO_PHP_80,
]);
};

4
sonar-project.properties Normal file
View File

@@ -0,0 +1,4 @@
sonar.projectKey=code-rhapsodie_dataflow-bundle_AYvYuaAwWE9sbcQmD1vw
sonar.sources=src
sonar.tests=Tests

View File

@@ -5,9 +5,11 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle;
use CodeRhapsodie\DataflowBundle\DependencyInjection\CodeRhapsodieDataflowExtension;
use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\BusCompilerPass;
use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\DataflowTypeCompilerPass;
use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\DefaultLoggerCompilerPass;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Extension\ExtensionInterface;
use Symfony\Component\HttpKernel\Bundle\Bundle;
/**
@@ -17,7 +19,7 @@ class CodeRhapsodieDataflowBundle extends Bundle
{
protected $name = 'CodeRhapsodieDataflowBundle';
public function getContainerExtension()
public function getContainerExtension(): ?ExtensionInterface
{
return new CodeRhapsodieDataflowExtension();
}
@@ -27,6 +29,7 @@ class CodeRhapsodieDataflowBundle extends Bundle
$container
->addCompilerPass(new DataflowTypeCompilerPass())
->addCompilerPass(new DefaultLoggerCompilerPass())
->addCompilerPass(new BusCompilerPass())
;
}
}

View File

@@ -22,24 +22,9 @@ class AddScheduledDataflowCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:add';
/** @var DataflowTypeRegistryInterface */
private $registry;
/** @var ScheduledDataflowRepository */
private $scheduledDataflowRepository;
/** @var ValidatorInterface */
private $validator;
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(DataflowTypeRegistryInterface $registry, ScheduledDataflowRepository $scheduledDataflowRepository, ValidatorInterface $validator, ConnectionFactory $connectionFactory)
public function __construct(private DataflowTypeRegistryInterface $registry, private ScheduledDataflowRepository $scheduledDataflowRepository, private ValidatorInterface $validator, private ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->registry = $registry;
$this->scheduledDataflowRepository = $scheduledDataflowRepository;
$this->validator = $validator;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -63,7 +48,7 @@ class AddScheduledDataflowCommand extends Command
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
@@ -108,7 +93,7 @@ class AddScheduledDataflowCommand extends Command
'id' => null,
'label' => $label,
'dataflow_type' => $type,
'options' => json_decode($options, true),
'options' => json_decode($options, true, 512, JSON_THROW_ON_ERROR),
'frequency' => $frequency,
'next' => new \DateTimeImmutable($firstRun),
'enabled' => $enabled,

View File

@@ -21,18 +21,9 @@ class ChangeScheduleStatusCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:change-status';
/** @var ScheduledDataflowRepository */
private $scheduledDataflowRepository;
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(ScheduledDataflowRepository $scheduledDataflowRepository, ConnectionFactory $connectionFactory)
public function __construct(private ScheduledDataflowRepository $scheduledDataflowRepository, private ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->scheduledDataflowRepository = $scheduledDataflowRepository;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -52,7 +43,7 @@ class ChangeScheduleStatusCommand extends Command
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));

View File

@@ -8,7 +8,6 @@ use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
@@ -27,18 +26,9 @@ class ExecuteDataflowCommand extends Command implements LoggerAwareInterface
protected static $defaultName = 'code-rhapsodie:dataflow:execute';
/** @var DataflowTypeRegistryInterface */
private $registry;
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(DataflowTypeRegistryInterface $registry, ConnectionFactory $connectionFactory)
public function __construct(private DataflowTypeRegistryInterface $registry, private ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->registry = $registry;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -62,13 +52,13 @@ EOF
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$fqcnOrAlias = $input->getArgument('fqcn');
$options = json_decode($input->getArgument('options'), true);
$options = json_decode($input->getArgument('options'), true, 512, JSON_THROW_ON_ERROR);
$io = new SymfonyStyle($input, $output);
$dataflowType = $this->registry->getDataflowType($fqcnOrAlias);

View File

@@ -26,18 +26,9 @@ class JobShowCommand extends Command
protected static $defaultName = 'code-rhapsodie:dataflow:job:show';
/** @var JobRepository */
private $jobRepository;
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(JobRepository $jobRepository, ConnectionFactory $connectionFactory)
public function __construct(private JobRepository $jobRepository, private ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->jobRepository = $jobRepository;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -57,7 +48,7 @@ class JobShowCommand extends Command
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
@@ -97,21 +88,19 @@ class JobShowCommand extends Command
['Started at', $job->getStartTime() ? $job->getStartTime()->format('Y-m-d H:i:s') : '-'],
['Ended at', $job->getEndTime() ? $job->getEndTime()->format('Y-m-d H:i:s') : '-'],
['Object number', $job->getCount()],
['Errors', count($job->getExceptions())],
['Errors', count((array) $job->getExceptions())],
['Status', $this->translateStatus($job->getStatus())],
];
if ($input->getOption('details')) {
$display[] = ['Type', $job->getDataflowType()];
$display[] = ['Options', json_encode($job->getOptions())];
$display[] = ['Options', json_encode($job->getOptions(), JSON_THROW_ON_ERROR)];
$io->section('Summary');
}
$io->table(['Field', 'Value'], $display);
if ($input->getOption('details')) {
$io->section('Exceptions');
$exceptions = array_map(function (string $exception) {
return substr($exception, 0, 900).'…';
}, $job->getExceptions());
$exceptions = array_map(fn(string $exception) => substr($exception, 0, 900).'…', $job->getExceptions());
$io->write($exceptions);
}

View File

@@ -24,22 +24,9 @@ class RunPendingDataflowsCommand extends Command
protected static $defaultName = 'code-rhapsodie:dataflow:run-pending';
/** @var ScheduledDataflowManagerInterface */
private $manager;
/** @var PendingDataflowRunnerInterface */
private $runner;
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(ScheduledDataflowManagerInterface $manager, PendingDataflowRunnerInterface $runner, ConnectionFactory $connectionFactory)
public function __construct(private ScheduledDataflowManagerInterface $manager, private PendingDataflowRunnerInterface $runner, private ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->manager = $manager;
$this->runner = $runner;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -59,7 +46,7 @@ EOF
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (!$this->lock()) {
$output->writeln('The command is already running in another process.');

View File

@@ -19,18 +19,9 @@ class ScheduleListCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:list';
/** @var ScheduledDataflowRepository */
private $scheduledDataflowRepository;
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(ScheduledDataflowRepository $scheduledDataflowRepository, ConnectionFactory $connectionFactory)
public function __construct(private ScheduledDataflowRepository $scheduledDataflowRepository, private ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->scheduledDataflowRepository = $scheduledDataflowRepository;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -47,7 +38,7 @@ class ScheduleListCommand extends Command
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));

View File

@@ -23,14 +23,9 @@ class SchemaCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:dump-schema';
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(ConnectionFactory $connectionFactory)
public function __construct(private ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->connectionFactory = $connectionFactory;
}
/**
@@ -49,7 +44,7 @@ class SchemaCommand extends Command
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));

View File

@@ -0,0 +1,72 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\AMPAsyncDataflow;
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\DataflowInterface;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
class AMPAsyncDataflowBuilder extends DataflowBuilder
{
public function __construct(protected ?int $loopInterval = 0, protected ?int $emitInterval = 0)
{
}
private ?string $name = null;
private ?iterable $reader = null;
private array $steps = [];
/** @var WriterInterface[] */
private array $writers = [];
public function setName(string $name): self
{
$this->name = $name;
return $this;
}
public function setReader(iterable $reader): self
{
$this->reader = $reader;
return $this;
}
public function addStep(callable $step, int $priority = 0, int $scale = 1): self
{
$this->steps[$priority][] = ['step' => $step, 'scale' => $scale];
return $this;
}
public function addWriter(WriterInterface $writer): self
{
$this->writers[] = $writer;
return $this;
}
public function getDataflow(): DataflowInterface
{
$dataflow = new AMPAsyncDataflow($this->reader, $this->name, $this->loopInterval, $this->emitInterval);
krsort($this->steps);
foreach ($this->steps as $stepArray) {
foreach ($stepArray as $step) {
$dataflow->addStep($step['step'], $step['scale']);
}
}
foreach ($this->writers as $writer) {
$dataflow->addWriter($writer);
}
return $dataflow;
}
}

View File

@@ -27,9 +27,8 @@ abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwar
$this->configureOptions($optionsResolver);
$options = $optionsResolver->resolve($options);
$builder = (new DataflowBuilder())
->setName($this->getLabel())
;
$builder = $this->createDataflowBuilder();
$builder->setName($this->getLabel());
$this->buildDataflow($builder, $options);
$dataflow = $builder->getDataflow();
if ($dataflow instanceof LoggerAwareInterface && $this->logger instanceof LoggerInterface) {
@@ -39,6 +38,11 @@ abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwar
return $dataflow->process();
}
protected function createDataflowBuilder(): DataflowBuilder
{
return new DataflowBuilder();
}
/**
* @codeCoverageIgnore
*/

View File

@@ -0,0 +1,164 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType\Dataflow;
use function Amp\coroutine;
use Amp\Deferred;
use Amp\Delayed;
use Amp\Loop;
use Amp\Producer;
use Amp\Promise;
use function Amp\Promise\wait;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use RuntimeException;
use Throwable;
class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
/** @var callable[] */
private array $steps = [];
/** @var WriterInterface[] */
private array $writers = [];
private array $states = [];
private array $stepsJobs = [];
public function __construct(private iterable $reader, private ?string $name, private ?int $loopInterval = 0, private ?int $emitInterval = 0)
{
if (!function_exists('Amp\\Promise\\wait')) {
throw new RuntimeException('Amp is not loaded. Suggest install it with composer require amphp/amp');
}
}
/**
* @param int $scale
*
* @return $this
*/
public function addStep(callable $step, $scale = 1): self
{
$this->steps[] = [$step, $scale];
return $this;
}
/**
* @return $this
*/
public function addWriter(WriterInterface $writer): self
{
$this->writers[] = $writer;
return $this;
}
/**
* {@inheritdoc}
*/
public function process(): Result
{
$count = 0;
$exceptions = [];
$startTime = new \DateTimeImmutable();
try {
foreach ($this->writers as $writer) {
$writer->prepare();
}
$deferred = new Deferred();
$resolved = false; //missing $deferred->isResolved() in version 2.5
$producer = new Producer(function (callable $emit) {
foreach ($this->reader as $index => $item) {
yield new Delayed($this->emitInterval);
yield $emit([$index, $item]);
}
});
$watcherId = Loop::repeat($this->loopInterval, function () use ($deferred, &$resolved, $producer, &$count, &$exceptions) {
if (yield $producer->advance()) {
$it = $producer->getCurrent();
[$index, $item] = $it;
$this->states[$index] = [$index, 0, $item];
} elseif (!$resolved && 0 === count($this->states)) {
$resolved = true;
$deferred->resolve();
}
foreach ($this->states as $state) {
$this->processState($state, $count, $exceptions);
}
});
wait($deferred->promise());
Loop::cancel($watcherId);
foreach ($this->writers as $writer) {
$writer->finish();
}
} catch (\Throwable $e) {
$exceptions[] = $e;
$this->logException($e);
}
return new Result($this->name, $startTime, new \DateTimeImmutable(), $count, $exceptions);
}
/**
* @param int $count internal count reference
* @param array $exceptions internal exceptions
*/
private function processState(mixed $state, int &$count, array &$exceptions): void
{
[$readIndex, $stepIndex, $item] = $state;
if ($stepIndex < count($this->steps)) {
if (!isset($this->stepsJobs[$stepIndex])) {
$this->stepsJobs[$stepIndex] = [];
}
[$step, $scale] = $this->steps[$stepIndex];
if ((is_countable($this->stepsJobs[$stepIndex]) ? count($this->stepsJobs[$stepIndex]) : 0) < $scale && !isset($this->stepsJobs[$stepIndex][$readIndex])) {
$this->stepsJobs[$stepIndex][$readIndex] = true;
/** @var Promise<void> $promise */
$promise = coroutine($step)($item);
$promise->onResolve(function (?Throwable $exception = null, $newItem = null) use ($stepIndex, $readIndex, &$exceptions) {
if ($exception) {
$exceptions[$stepIndex] = $exception;
$this->logException($exception, (string) $stepIndex);
} elseif (false === $newItem) {
unset($this->states[$readIndex]);
} else {
$this->states[$readIndex] = [$readIndex, $stepIndex + 1, $newItem];
}
unset($this->stepsJobs[$stepIndex][$readIndex]);
});
}
} else {
unset($this->states[$readIndex]);
foreach ($this->writers as $writer) {
$writer->write($item);
}
++$count;
}
}
private function logException(Throwable $e, ?string $index = null): void
{
if (!isset($this->logger)) {
return;
}
$this->logger->error($e, ['exception' => $e, 'index' => $index]);
}
}

View File

@@ -13,24 +13,16 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
/** @var string */
private $name;
/** @var iterable */
private $reader;
/** @var callable[] */
private $steps;
private array $steps = [];
/** @var WriterInterface[] */
private $writers;
private array $writers = [];
public function __construct(iterable $reader, ?string $name)
private ?\Closure $customExceptionIndex = null;
public function __construct(private iterable $reader, private ?string $name)
{
$this->reader = $reader;
$this->name = $name;
$this->steps = [];
$this->writers = [];
}
/**
@@ -53,6 +45,16 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
return $this;
}
/**
* @return $this
*/
public function setCustomExceptionIndex(callable $callable): self
{
$this->customExceptionIndex = \Closure::fromCallable($callable);
return $this;
}
/**
* {@inheritdoc}
*/
@@ -71,8 +73,17 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
try {
$this->processItem($item);
} catch (\Throwable $e) {
$exceptions[$index] = $e;
$this->logException($e, (string) $index);
$exceptionIndex = $index;
try {
if (is_callable($this->customExceptionIndex)) {
$exceptionIndex = (string) ($this->customExceptionIndex)($item, $index);
}
} catch (\Throwable $e2) {
$exceptions[$index] = $e2;
$this->logException($e2, $index);
}
$exceptions[$exceptionIndex] = $e;
$this->logException($e, $exceptionIndex);
}
++$count;
@@ -89,10 +100,7 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
return new Result($this->name, $startTime, new \DateTimeImmutable(), $count, $exceptions);
}
/**
* @param mixed $item
*/
private function processItem($item): void
private function processItem(mixed $item): void
{
foreach ($this->steps as $step) {
$item = call_user_func($step, $item);
@@ -107,7 +115,7 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
}
}
private function logException(\Throwable $e, ?string $index = null): void
private function logException(\Throwable $e, string|int|null $index = null): void
{
if (!isset($this->logger)) {
return;

View File

@@ -10,17 +10,16 @@ use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
class DataflowBuilder
{
/** @var string */
private $name;
private ?string $name = null;
/** @var iterable */
private $reader;
private ?iterable $reader = null;
/** @var array */
private $steps = [];
private array $steps = [];
/** @var WriterInterface[] */
private $writers = [];
private array $writers = [];
private ?\Closure $customExceptionIndex = null;
public function setName(string $name): self
{
@@ -50,6 +49,13 @@ class DataflowBuilder
return $this;
}
public function setCustomExceptionIndex(callable $callable): self
{
$this->customExceptionIndex = \Closure::fromCallable($callable);
return $this;
}
public function getDataflow(): DataflowInterface
{
$dataflow = new Dataflow($this->reader, $this->name);
@@ -65,6 +71,10 @@ class DataflowBuilder
$dataflow->addWriter($writer);
}
if (is_callable($this->customExceptionIndex)) {
$dataflow->setCustomExceptionIndex($this->customExceptionIndex);
}
return $dataflow;
}
}

View File

@@ -9,39 +9,19 @@ namespace CodeRhapsodie\DataflowBundle\DataflowType;
*/
class Result
{
/** @var string */
private $name;
private \DateInterval $elapsed;
/** @var \DateTimeInterface */
private $startTime;
private int $errorCount = 0;
/** @var \DateTimeInterface */
private $endTime;
private int $successCount = 0;
/** @var \DateInterval */
private $elapsed;
private array $exceptions;
/** @var int */
private $errorCount = 0;
/** @var int */
private $successCount = 0;
/** @var int */
private $totalProcessedCount = 0;
/** @var array */
private $exceptions;
public function __construct(string $name, \DateTimeInterface $startTime, \DateTimeInterface $endTime, int $totalCount, array $exceptions)
public function __construct(private string $name, private \DateTimeInterface $startTime, private \DateTimeInterface $endTime, private int $totalProcessedCount, array $exceptions)
{
$this->name = $name;
$this->startTime = $startTime;
$this->endTime = $endTime;
$this->elapsed = $startTime->diff($endTime);
$this->totalProcessedCount = $totalCount;
$this->errorCount = count($exceptions);
$this->successCount = $totalCount - $this->errorCount;
$this->successCount = $totalProcessedCount - $this->errorCount;
$this->exceptions = $exceptions;
}

View File

@@ -11,15 +11,11 @@ use CodeRhapsodie\DataflowBundle\Exceptions\UnsupportedItemTypeException;
*/
class CollectionWriter implements DelegateWriterInterface
{
/** @var WriterInterface */
private $writer;
/**
* CollectionWriter constructor.
*/
public function __construct(WriterInterface $writer)
public function __construct(private WriterInterface $writer)
{
$this->writer = $writer;
}
/**
@@ -36,7 +32,7 @@ class CollectionWriter implements DelegateWriterInterface
public function write($collection)
{
if (!is_iterable($collection)) {
throw new UnsupportedItemTypeException(sprintf('Item to write was expected to be an iterable, received %s.', is_object($collection) ? get_class($collection) : gettype($collection)));
throw new UnsupportedItemTypeException(sprintf('Item to write was expected to be an iterable, received %s.', get_debug_type($collection)));
}
foreach ($collection as $item) {

View File

@@ -12,14 +12,13 @@ use CodeRhapsodie\DataflowBundle\Exceptions\UnsupportedItemTypeException;
class DelegatorWriter implements DelegateWriterInterface
{
/** @var DelegateWriterInterface[] */
private $delegates;
private array $delegates = [];
/**
* DelegatorWriter constructor.
*/
public function __construct()
{
$this->delegates = [];
}
/**
@@ -47,7 +46,7 @@ class DelegatorWriter implements DelegateWriterInterface
return;
}
throw new UnsupportedItemTypeException(sprintf('None of the registered delegate writers support the received item of type %s', is_object($item) ? get_class($item) : gettype($item)));
throw new UnsupportedItemTypeException(sprintf('None of the registered delegate writers support the received item of type %s', get_debug_type($item)));
}
/**

View File

@@ -6,12 +6,8 @@ namespace CodeRhapsodie\DataflowBundle\DataflowType\Writer;
class PortWriterAdapter implements WriterInterface
{
/** @var \Port\Writer */
private $writer;
public function __construct(\Port\Writer $writer)
public function __construct(private \Port\Writer $writer)
{
$this->writer = $writer;
}
public function prepare()

View File

@@ -16,10 +16,8 @@ interface WriterInterface
/**
* Write an item.
*
* @param mixed $item
*/
public function write($item);
public function write(mixed $item);
/**
* Called after the dataflow is processed.

View File

@@ -29,5 +29,10 @@ class CodeRhapsodieDataflowExtension extends Extension
$container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']);
$container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']);
if ($config['messenger_mode']['enabled']) {
$container->setParameter('coderhapsodie.dataflow.bus', $config['messenger_mode']['bus']);
$loader->load('messenger_services.yaml');
}
}
}

View File

@@ -0,0 +1,36 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler;
use CodeRhapsodie\DataflowBundle\Runner\MessengerDataflowRunner;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Exception\InvalidArgumentException;
use Symfony\Component\DependencyInjection\Reference;
class BusCompilerPass implements CompilerPassInterface
{
/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
{
if (!$container->hasParameter('coderhapsodie.dataflow.bus')) {
return;
}
$bus = $container->getParameter('coderhapsodie.dataflow.bus');
if (!$container->has($bus)) {
throw new InvalidArgumentException(sprintf('Service "%s" not found', $bus));
}
if (!$container->has(MessengerDataflowRunner::class)) {
return;
}
$definition = $container->findDefinition(MessengerDataflowRunner::class);
$definition->setArgument('$bus', new Reference($bus));
}
}

View File

@@ -5,7 +5,7 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler;
use CodeRhapsodie\DataflowBundle\Command\ExecuteDataflowCommand;
use CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessor;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Reference;
@@ -22,7 +22,7 @@ class DefaultLoggerCompilerPass implements CompilerPassInterface
return;
}
foreach ([ExecuteDataflowCommand::class, PendingDataflowRunner::class] as $serviceId) {
foreach ([ExecuteDataflowCommand::class, JobProcessor::class] as $serviceId) {
if (!$container->has($serviceId)) {
continue;
}

View File

@@ -6,10 +6,11 @@ namespace CodeRhapsodie\DataflowBundle\DependencyInjection;
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use Symfony\Component\Config\Definition\ConfigurationInterface;
use Symfony\Component\Messenger\MessageBusInterface;
class Configuration implements ConfigurationInterface
{
public function getConfigTreeBuilder()
public function getConfigTreeBuilder(): \Symfony\Component\Config\Definition\Builder\TreeBuilder
{
$treeBuilder = new TreeBuilder('code_rhapsodie_dataflow');
if (method_exists($treeBuilder, 'getRootNode')) {
@@ -27,6 +28,21 @@ class Configuration implements ConfigurationInterface
->scalarNode('default_logger')
->defaultValue('logger')
->end()
->arrayNode('messenger_mode')
->addDefaultsIfNotSet()
->children()
->booleanNode('enabled')
->defaultFalse()
->end()
->scalarNode('bus')
->defaultValue('messenger.default_bus')
->end()
->end()
->validate()
->ifTrue(static fn($v): bool => $v['enabled'] && !interface_exists(MessageBusInterface::class))
->thenInvalid('You need "symfony/messenger" in order to use Dataflow messenger mode.')
->end()
->end()
->end()
;

View File

@@ -13,9 +13,10 @@ use Symfony\Component\Validator\Constraints as Asserts;
*/
class Job
{
const STATUS_PENDING = 0;
const STATUS_RUNNING = 1;
const STATUS_COMPLETED = 2;
public const STATUS_PENDING = 0;
public const STATUS_RUNNING = 1;
public const STATUS_COMPLETED = 2;
public const STATUS_QUEUED = 3;
private const KEYS = [
'id',
@@ -31,74 +32,35 @@ class Job
'end_time',
];
/**
* @var int|null
*/
private $id;
private ?int $id = null;
/**
* @var int
*
* @Asserts\Range(min=0, max=2)
*/
private $status;
#[Asserts\Range(min: 0, max: 2)]
private int $status = self::STATUS_PENDING;
/**
* @var string|null
*
* @Asserts\NotBlank()
* @Asserts\Length(min=1, max=255)
* @Asserts\Regex("#^[[:alnum:] ]+\z#u")
*/
private $label;
#[Asserts\NotBlank]
#[Asserts\Length(min: 1, max: 255)]
#[Asserts\Regex('#^[[:alnum:] ]+\z#u')]
private ?string $label = null;
/**
* @var string|null
*
* @Asserts\NotBlank()
* @Asserts\Length(min=1, max=255)
* @Asserts\Regex("#^[[:alnum:]\\]+\z#u")
*/
private $dataflowType;
#[Asserts\NotBlank]
#[Asserts\Length(min: 1, max: 255)]
#[Asserts\Regex('#^[[:alnum:]\\\]+\z#u')]
private ?string $dataflowType = null;
/**
* @var array|null
*/
private $options;
private ?array $options = null;
/**
* @var \DateTimeInterface|null
*/
private $requestedDate;
private ?\DateTimeInterface $requestedDate = null;
/**
* @var int|null
*/
private $scheduledDataflowId;
private ?int $scheduledDataflowId = null;
/**
* @var int|null
*/
private $count;
private ?int $count = 0;
/**
* @var array|null
*/
private $exceptions;
private ?array $exceptions = null;
/**
* @var \DateTimeInterface|null
*/
private $startTime;
private ?\DateTimeInterface $startTime = null;
/**
* @var \DateTimeInterface|null
*/
private $endTime;
private ?\DateTimeInterface $endTime = null;
/**
* @return Job
*/
public static function createFromScheduledDataflow(ScheduledDataflow $scheduled): self
{
return (new static())
@@ -112,8 +74,6 @@ class Job
public function __construct()
{
$this->count = 0;
$this->status = static::STATUS_PENDING;
}
public static function createFromArray(array $datas)

View File

@@ -14,7 +14,7 @@ use Symfony\Component\Validator\Constraints as Asserts;
*/
class ScheduledDataflow
{
const AVAILABLE_FREQUENCIES = [
public const AVAILABLE_FREQUENCIES = [
'1 hour',
'1 day',
'1 week',
@@ -23,51 +23,29 @@ class ScheduledDataflow
private const KEYS = ['id', 'label', 'dataflow_type', 'options', 'frequency', 'next', 'enabled'];
/**
* @var int|null
*/
private $id;
private ?int $id = null;
#[Asserts\NotBlank]
#[Asserts\Length(min: 1, max: 255)]
#[Asserts\Regex('#^[[:alnum:] ]+\z#u')]
private ?string $label = null;
#[Asserts\NotBlank]
#[Asserts\Length(min: 1, max: 255)]
#[Asserts\Regex('#^[[:alnum:]\\\]+\z#u')]
private ?string $dataflowType = null;
private ?array $options = null;
/**
* @var string|null
*
* @Asserts\NotBlank()
* @Asserts\Length(min=1, max=255)
* @Asserts\Regex("#^[[:alnum:] ]+\z#u")
*/
private $label;
/**
* @var string|null
*
* @Asserts\NotBlank()
* @Asserts\Length(min=1, max=255)
* @Asserts\Regex("#^[[:alnum:]\\]+\z#u")
*/
private $dataflowType;
/**
* @var array|null
*/
private $options;
/**
* @var string|null
*
* @Asserts\NotBlank()
* @Frequency()
*/
private $frequency;
#[Asserts\NotBlank]
private ?string $frequency = null;
/**
* @var \DateTimeInterface|null
*/
private $next;
private ?\DateTimeInterface $next = null;
/**
* @var bool|null
*/
private $enabled;
private ?bool $enabled = null;
public static function createFromArray(array $datas)
{

View File

@@ -7,7 +7,7 @@ namespace CodeRhapsodie\DataflowBundle\Event;
/*
* @codeCoverageIgnore
*/
if (class_exists('Symfony\Contracts\EventDispatcher\Event')) {
if (class_exists(\Symfony\Contracts\EventDispatcher\Event::class)) {
// For Symfony 5.0+
abstract class CrEvent extends \Symfony\Contracts\EventDispatcher\Event
{

View File

@@ -6,6 +6,6 @@ namespace CodeRhapsodie\DataflowBundle\Event;
final class Events
{
const BEFORE_PROCESSING = 'coderhapsodie.dataflow.before_processing';
const AFTER_PROCESSING = 'coderhapsodie.dataflow.after_processing';
public const BEFORE_PROCESSING = 'coderhapsodie.dataflow.before_processing';
public const AFTER_PROCESSING = 'coderhapsodie.dataflow.after_processing';
}

View File

@@ -13,15 +13,11 @@ use CodeRhapsodie\DataflowBundle\Entity\Job;
*/
class ProcessingEvent extends CrEvent
{
/** @var Job */
private $job;
/**
* ProcessingEvent constructor.
*/
public function __construct(Job $job)
public function __construct(private Job $job)
{
$this->job = $job;
}
public function getJob(): Job

View File

@@ -13,14 +13,8 @@ use Symfony\Component\DependencyInjection\Container;
*/
class ConnectionFactory
{
private $connectionName;
private $container;
public function __construct(Container $container, string $connectionName)
public function __construct(private Container $container, private string $connectionName)
{
$this->connectionName = $connectionName;
$this->container = $container;
}
public function setConnectionName(string $connectionName)
@@ -28,7 +22,7 @@ class ConnectionFactory
$this->connectionName = $connectionName;
}
public function getConnection(): \Doctrine\DBAL\Driver\Connection
public function getConnection(): \Doctrine\DBAL\Connection
{
return $this->container->get(sprintf('doctrine.dbal.%s_connection', $this->connectionName));
}

View File

@@ -13,13 +13,11 @@ class BufferHandler extends AbstractProcessingHandler
{
private const FORMAT = "[%datetime%] %level_name% when processing item %context.index%: %message% %context% %extra%\n";
private $buffer;
private array $buffer = [];
public function __construct($level = Logger::DEBUG, bool $bubble = true)
{
parent::__construct($level, $bubble);
$this->buffer = [];
}
public function clearBuffer(): array

View File

@@ -10,20 +10,20 @@ use Psr\Log\LoggerInterface;
final class DelegatingLogger extends AbstractLogger
{
/** @var LoggerInterface[] */
private $loggers;
private ?array $loggers = null;
public function __construct(iterable $loggers)
{
foreach ($loggers as $logger) {
if (!$logger instanceof LoggerInterface) {
throw new \InvalidArgumentException(sprintf('Only instances of %s should be passed to the constructor of %s. An instance of %s was passed instead.', LoggerInterface::class, self::class, get_class($logger)));
throw new \InvalidArgumentException(sprintf('Only instances of %s should be passed to the constructor of %s. An instance of %s was passed instead.', LoggerInterface::class, self::class, $logger::class));
}
$this->loggers[] = $logger;
}
}
public function log($level, $message, array $context = [])
public function log($level, $message, array $context = []): void
{
foreach ($this->loggers as $logger) {
$logger->log($level, $message, $context);

View File

@@ -8,27 +8,15 @@ use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Doctrine\DBAL\Driver\Connection;
use Doctrine\DBAL\Connection;
/**
* Handles scheduled dataflows execution dates based on their frequency.
*/
class ScheduledDataflowManager implements ScheduledDataflowManagerInterface
{
/** @var ScheduledDataflowRepository */
private $scheduledDataflowRepository;
/** @var JobRepository */
private $jobRepository;
/** @var Connection */
private $connection;
public function __construct(Connection $connection, ScheduledDataflowRepository $scheduledDataflowRepository, JobRepository $jobRepository)
public function __construct(private Connection $connection, private ScheduledDataflowRepository $scheduledDataflowRepository, private JobRepository $jobRepository)
{
$this->connection = $connection;
$this->scheduledDataflowRepository = $scheduledDataflowRepository;
$this->jobRepository = $jobRepository;
}
/**

View File

@@ -0,0 +1,17 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\MessengerMode;
class JobMessage
{
public function __construct(private int $jobId)
{
}
public function getJobId(): int
{
return $this->jobId;
}
}

View File

@@ -0,0 +1,26 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\MessengerMode;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
class JobMessageHandler implements MessageSubscriberInterface
{
public function __construct(private JobRepository $repository, private JobProcessorInterface $processor)
{
}
public function __invoke(JobMessage $message)
{
$this->processor->process($this->repository->find($message->getJobId()));
}
public static function getHandledMessages(): iterable
{
return [JobMessage::class];
}
}

View File

@@ -0,0 +1,87 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Processor;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Logger\BufferHandler;
use CodeRhapsodie\DataflowBundle\Logger\DelegatingLogger;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Monolog\Logger;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
public function __construct(private JobRepository $repository, private DataflowTypeRegistryInterface $registry, private EventDispatcherInterface $dispatcher)
{
}
public function process(Job $job): void
{
$this->beforeProcessing($job);
$dataflowType = $this->registry->getDataflowType($job->getDataflowType());
$loggers = [new Logger('dataflow_internal', [$bufferHandler = new BufferHandler()])];
if (isset($this->logger)) {
$loggers[] = $this->logger;
}
$logger = new DelegatingLogger($loggers);
if ($dataflowType instanceof LoggerAwareInterface) {
$dataflowType->setLogger($logger);
}
$result = $dataflowType->process($job->getOptions());
if (!$dataflowType instanceof LoggerAwareInterface) {
foreach ($result->getExceptions() as $index => $e) {
$logger->error($e, ['index' => $index]);
}
}
$this->afterProcessing($job, $result, $bufferHandler);
}
private function beforeProcessing(Job $job): void
{
// Symfony 3.4 to 4.4 call
if (!class_exists(\Symfony\Contracts\EventDispatcher\Event::class)) {
$this->dispatcher->dispatch(Events::BEFORE_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::BEFORE_PROCESSING);
}
$job
->setStatus(Job::STATUS_RUNNING)
->setStartTime(new \DateTime())
;
$this->repository->save($job);
}
private function afterProcessing(Job $job, Result $result, BufferHandler $bufferLogger): void
{
$job
->setEndTime($result->getEndTime())
->setStatus(Job::STATUS_COMPLETED)
->setCount($result->getSuccessCount())
->setExceptions($bufferLogger->clearBuffer())
;
$this->repository->save($job);
// Symfony 3.4 to 4.4 call
if (!class_exists(\Symfony\Contracts\EventDispatcher\Event::class)) {
$this->dispatcher->dispatch(Events::AFTER_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING);
}
}
}

View File

@@ -0,0 +1,12 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Processor;
use CodeRhapsodie\DataflowBundle\Entity\Job;
interface JobProcessorInterface
{
public function process(Job $job): void;
}

View File

@@ -13,10 +13,10 @@ use CodeRhapsodie\DataflowBundle\Exceptions\UnknownDataflowTypeException;
class DataflowTypeRegistry implements DataflowTypeRegistryInterface
{
/** @var array|DataflowTypeInterface[] */
private $fqcnRegistry = [];
private array $fqcnRegistry = [];
/** @var array|DataflowTypeInterface[] */
private $aliasesRegistry = [];
private array $aliasesRegistry = [];
/**
* {@inheritdoc}
@@ -31,7 +31,7 @@ class DataflowTypeRegistry implements DataflowTypeRegistryInterface
return $this->aliasesRegistry[$fqcnOrAlias];
}
throw UnknownDataflowTypeException::create($fqcnOrAlias, array_merge(array_keys($this->fqcnRegistry), array_keys($this->aliasesRegistry)));
throw UnknownDataflowTypeException::create($fqcnOrAlias, [...array_keys($this->fqcnRegistry), ...array_keys($this->aliasesRegistry)]);
}
/**
@@ -47,7 +47,7 @@ class DataflowTypeRegistry implements DataflowTypeRegistryInterface
*/
public function registerDataflowType(DataflowTypeInterface $dataflowType): void
{
$this->fqcnRegistry[get_class($dataflowType)] = $dataflowType;
$this->fqcnRegistry[$dataflowType::class] = $dataflowType;
foreach ($dataflowType->getAliases() as $alias) {
$this->aliasesRegistry[$alias] = $dataflowType;
}

View File

@@ -38,7 +38,7 @@ trait InitFromDbTrait
return [];
}
$array = json_decode($value, true);
$array = json_decode($value, true, 512, JSON_THROW_ON_ERROR);
return (false === $array) ? [] : $array;
}

View File

@@ -6,7 +6,8 @@ namespace CodeRhapsodie\DataflowBundle\Repository;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use Doctrine\DBAL\Driver\Connection;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\ParameterType;
use Doctrine\DBAL\Query\QueryBuilder;
/**
@@ -21,37 +22,31 @@ class JobRepository
public const TABLE_NAME = 'cr_dataflow_job';
private const FIELDS_TYPE = [
'id' => \PDO::PARAM_INT,
'status' => \PDO::PARAM_INT,
'label' => \PDO::PARAM_STR,
'dataflow_type' => \PDO::PARAM_STR,
'options' => \PDO::PARAM_STR,
'id' => ParameterType::INTEGER,
'status' => ParameterType::INTEGER,
'label' => ParameterType::STRING,
'dataflow_type' => ParameterType::STRING,
'options' => ParameterType::STRING,
'requested_date' => 'datetime',
'scheduled_dataflow_id' => \PDO::PARAM_INT,
'count' => \PDO::PARAM_INT,
'exceptions' => \PDO::PARAM_STR,
'scheduled_dataflow_id' => ParameterType::INTEGER,
'count' => ParameterType::INTEGER,
'exceptions' => ParameterType::STRING,
'start_time' => 'datetime',
'end_time' => 'datetime',
];
/**
* @var \Doctrine\DBAL\Connection
*/
private $connection;
/**
* JobRepository constructor.
*/
public function __construct(Connection $connection)
public function __construct(private Connection $connection)
{
$this->connection = $connection;
}
public function find(int $jobId)
{
$qb = $this->createQueryBuilder();
$qb
->andWhere($qb->expr()->eq('id', $qb->createNamedParameter($jobId, \PDO::PARAM_INT)))
->andWhere($qb->expr()->eq('id', $qb->createNamedParameter($jobId, ParameterType::INTEGER)))
;
return $this->returnFirstOrNull($qb);
@@ -62,12 +57,12 @@ class JobRepository
$qb = $this->createQueryBuilder();
$qb
->andWhere($qb->expr()->isNull('scheduled_dataflow_id'))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, \PDO::PARAM_INT)));
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, ParameterType::INTEGER)));
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
while (false !== ($row = $stmt->fetchAssociative())) {
yield Job::createFromArray($this->initDateTime($this->initArray($row)));
}
}
@@ -76,8 +71,8 @@ class JobRepository
{
$qb = $this->createQueryBuilder();
$qb
->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($scheduled->getId(), \PDO::PARAM_INT)))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, \PDO::PARAM_INT)));
->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($scheduled->getId(), ParameterType::INTEGER)))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, ParameterType::INTEGER)));
return $this->returnFirstOrNull($qb);
}
@@ -86,7 +81,7 @@ class JobRepository
{
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->lte('requested_date', $qb->createNamedParameter(new \DateTime(), 'datetime')))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, \PDO::PARAM_INT)))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, ParameterType::INTEGER)))
->orderBy('requested_date', 'ASC')
->setMaxResults(1)
;
@@ -97,7 +92,7 @@ class JobRepository
public function findLastForDataflowId(int $dataflowId): ?Job
{
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($dataflowId, \PDO::PARAM_INT)))
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($dataflowId, ParameterType::INTEGER)))
->orderBy('requested_date', 'DESC')
->setMaxResults(1)
;
@@ -115,7 +110,7 @@ class JobRepository
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
while (false !== ($row = $stmt->fetchAssociative())) {
yield Job::createFromArray($row);
}
}
@@ -123,14 +118,14 @@ class JobRepository
public function findForScheduled(int $id): iterable
{
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($id, \PDO::PARAM_INT)))
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($id, ParameterType::INTEGER)))
->orderBy('requested_date', 'DESC')
->setMaxResults(20);
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
while (false !== ($row = $stmt->fetchAssociative())) {
yield Job::createFromArray($row);
}
}
@@ -141,10 +136,10 @@ class JobRepository
unset($datas['id']);
if (is_array($datas['options'])) {
$datas['options'] = json_encode($datas['options']);
$datas['options'] = json_encode($datas['options'], JSON_THROW_ON_ERROR);
}
if (is_array($datas['exceptions'])) {
$datas['exceptions'] = json_encode($datas['exceptions']);
$datas['exceptions'] = json_encode($datas['exceptions'], JSON_THROW_ON_ERROR);
}
if (null === $job->getId()) {
@@ -172,6 +167,6 @@ class JobRepository
return null;
}
return Job::createFromArray($this->initDateTime($this->initArray($stmt->fetch(\PDO::FETCH_ASSOC))));
return Job::createFromArray($this->initDateTime($this->initArray($stmt->fetchAssociative())));
}
}

View File

@@ -5,7 +5,8 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Repository;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use Doctrine\DBAL\Driver\Connection;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\ParameterType;
use Doctrine\DBAL\Query\QueryBuilder;
/**
@@ -20,25 +21,20 @@ class ScheduledDataflowRepository
public const TABLE_NAME = 'cr_dataflow_scheduled';
private const FIELDS_TYPE = [
'id' => \PDO::PARAM_INT,
'label' => \PDO::PARAM_STR,
'dataflow_type' => \PDO::PARAM_STR,
'options' => \PDO::PARAM_STR,
'frequency' => \PDO::PARAM_STR,
'id' => ParameterType::INTEGER,
'label' => ParameterType::STRING,
'dataflow_type' => ParameterType::STRING,
'options' => ParameterType::STRING,
'frequency' => ParameterType::STRING,
'next' => 'datetime',
'enabled' => \PDO::PARAM_BOOL,
'enabled' => ParameterType::BOOLEAN,
];
/**
* @var \Doctrine\DBAL\Connection
*/
private $connection;
/**
* JobRepository constructor.
*/
public function __construct(Connection $connection)
public function __construct(private Connection $connection)
{
$this->connection = $connection;
}
/**
@@ -52,13 +48,13 @@ class ScheduledDataflowRepository
$qb->andWhere($qb->expr()->lte('next', $qb->createNamedParameter(new \DateTime(), 'datetime')))
->andWhere($qb->expr()->eq('enabled', 1))
->orderBy('next', 'ASC')
;
;
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
while (false !== ($row = $stmt->fetchAssociative())) {
yield ScheduledDataflow::createFromArray($this->initDateTime($this->initArray($row)));
}
}
@@ -66,7 +62,7 @@ class ScheduledDataflowRepository
public function find(int $scheduleId): ?ScheduledDataflow
{
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->eq('id', $qb->createNamedParameter($scheduleId, \PDO::PARAM_INT)))
$qb->andWhere($qb->expr()->eq('id', $qb->createNamedParameter($scheduleId, ParameterType::INTEGER)))
->setMaxResults(1)
;
@@ -82,8 +78,8 @@ class ScheduledDataflowRepository
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
yield ScheduledDataflow::createFromArray($this->initDateTime($this->initOptions($row)));
while (false !== ($row = $stmt->fetchAssociative())) {
yield ScheduledDataflow::createFromArray($this->initDateTime($this->initArray($row)));
}
}
@@ -96,7 +92,7 @@ class ScheduledDataflowRepository
->orderBy('w.label', 'ASC')
->groupBy('w.id');
return $query->execute()->fetchAll(\PDO::FETCH_ASSOC);
return $query->execute()->fetchAllAssociative();
}
public function save(ScheduledDataflow $scheduledDataflow)
@@ -105,7 +101,7 @@ class ScheduledDataflowRepository
unset($datas['id']);
if (is_array($datas['options'])) {
$datas['options'] = json_encode($datas['options']);
$datas['options'] = json_encode($datas['options'], JSON_THROW_ON_ERROR);
}
if (null === $scheduledDataflow->getId()) {
@@ -147,6 +143,6 @@ class ScheduledDataflowRepository
return null;
}
return ScheduledDataflow::createFromArray($this->initDateTime($this->initArray($stmt->fetch(\PDO::FETCH_ASSOC))));
return ScheduledDataflow::createFromArray($this->initDateTime($this->initArray($stmt->fetchAssociative())));
}
}

View File

@@ -0,0 +1,12 @@
services:
CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface: '@CodeRhapsodie\DataflowBundle\Runner\MessengerDataflowRunner'
CodeRhapsodie\DataflowBundle\Runner\MessengerDataflowRunner:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$bus: ~ # Filled in compiler pass
CodeRhapsodie\DataflowBundle\MessengerMode\JobMessageHandler:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$processor: '@CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface'
tags: ['messenger.message_handler']

View File

@@ -76,6 +76,12 @@ services:
CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface: '@CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner'
CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$processor: '@CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface'
CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface: '@CodeRhapsodie\DataflowBundle\Processor\JobProcessor'
CodeRhapsodie\DataflowBundle\Processor\JobProcessor:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'

View File

@@ -0,0 +1,26 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Runner;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Symfony\Component\Messenger\MessageBusInterface;
class MessengerDataflowRunner implements PendingDataflowRunnerInterface
{
public function __construct(private JobRepository $repository, private MessageBusInterface $bus)
{
}
public function runPendingDataflows(): void
{
while (null !== ($job = $this->repository->findNextPendingDataflow())) {
$this->bus->dispatch(new JobMessage($job->getId()));
$job->setStatus(Job::STATUS_QUEUED);
$this->repository->save($job);
}
}
}

View File

@@ -4,38 +4,13 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Runner;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Logger\BufferHandler;
use CodeRhapsodie\DataflowBundle\Logger\DelegatingLogger;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Monolog\Logger;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class PendingDataflowRunner implements PendingDataflowRunnerInterface, LoggerAwareInterface
class PendingDataflowRunner implements PendingDataflowRunnerInterface
{
use LoggerAwareTrait;
/** @var JobRepository */
private $repository;
/** @var DataflowTypeRegistryInterface */
private $registry;
/** @var EventDispatcherInterface */
private $dispatcher;
public function __construct(JobRepository $repository, DataflowTypeRegistryInterface $registry, EventDispatcherInterface $dispatcher)
public function __construct(private JobRepository $repository, private JobProcessorInterface $processor)
{
$this->repository = $repository;
$this->registry = $registry;
$this->dispatcher = $dispatcher;
}
/**
@@ -44,62 +19,7 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface, LoggerAwa
public function runPendingDataflows(): void
{
while (null !== ($job = $this->repository->findNextPendingDataflow())) {
$this->beforeProcessing($job);
$dataflowType = $this->registry->getDataflowType($job->getDataflowType());
$loggers = [new Logger('dataflow_internal', [$bufferHandler = new BufferHandler()])];
if (isset($this->logger)) {
$loggers[] = $this->logger;
}
$logger = new DelegatingLogger($loggers);
if ($dataflowType instanceof LoggerAwareInterface) {
$dataflowType->setLogger($logger);
}
$result = $dataflowType->process($job->getOptions());
if (!$dataflowType instanceof LoggerAwareInterface) {
foreach ($result->getExceptions() as $index => $e) {
$logger->error($e, ['index' => $index]);
}
}
$this->afterProcessing($job, $result, $bufferHandler);
}
}
private function beforeProcessing(Job $job): void
{
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher->dispatch(Events::BEFORE_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::BEFORE_PROCESSING);
}
$job
->setStatus(Job::STATUS_RUNNING)
->setStartTime(new \DateTime())
;
$this->repository->save($job);
}
private function afterProcessing(Job $job, Result $result, BufferHandler $bufferLogger): void
{
$job
->setEndTime($result->getEndTime())
->setStatus(Job::STATUS_COMPLETED)
->setCount($result->getSuccessCount())
->setExceptions($bufferLogger->clearBuffer())
;
$this->repository->save($job);
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher->dispatch(Events::AFTER_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING);
$this->processor->process($job);
}
}
}

View File

@@ -48,6 +48,7 @@ class DataflowSchemaProvider
$tableSchedule->addColumn('enabled', 'boolean', ['notnull' => true]);
$tableJob->addForeignKeyConstraint($tableSchedule, ['scheduled_dataflow_id'], ['id']);
$tableJob->addIndex(['status'], 'idx_status');
return $schema;
}