28 Commits

Author SHA1 Message Date
jeremycr
a5518c80e2 Added more output when errors occured during execute command (#49)
* Added more output when errors occured during execute command

* Apply suggestions from code review

Co-Authored-By: jbcr <51637606+jbcr@users.noreply.github.com>
2020-02-27 09:57:42 +01:00
jeremycr
bd9171ad53 Added semi-colon after each query to ease copy-paste (#48) 2020-02-14 14:37:18 +01:00
jeremycr
5b4b3f1b6f Fixed the connection proxy class created by the factory (#47) 2020-01-30 14:37:47 +01:00
jbcr
d1330ae638 display errors catched during processing (#44)
* display errors catched during processing
* update changelog
2019-12-04 15:37:24 +01:00
jbcr
42b242ee6c catch all error (#43) 2019-12-04 15:37:00 +01:00
jeremycr
4d98adfe0a Updated composer aliases (#40) 2019-11-29 09:10:16 +01:00
jeremycr
c5fc6adf08 Passed PHP-CS-Fixer 2.16.1 (#39) 2019-11-28 14:38:16 +01:00
jeremycr
8efb4bd2d9 Removed unused dependency (#38) 2019-11-28 14:36:31 +01:00
jbcr
a9b19d933a Add Symfony 5.0 compatibility (#35)
* update travis config
* add doctrine dbal 2 in requirement
* remove php cs fixer. Use a global installed CS Fixer
* Add backward compatibility
* add coverage for SF5.0
* add tests
* code coverage on lowest version
* code coverage on 3.4 version
2019-11-26 14:25:05 +01:00
jeremycr
ca946429b1 Fixed next execution for scheduled dataflows not increasing (#36) 2019-11-22 14:12:04 +01:00
jeremycr
26ac98eb98 Added CollectionWriter and DelegatorWriter (#34)
* Added CollectionWriter and DelegatorWriter

* Added explanations and examples in README
2019-11-21 11:47:21 +01:00
jbcr
d18494212d fix example (#33)
* fix example
2019-11-19 08:18:56 +01:00
Olivier PORTIER
fbc4a20b57 Mise à jour de la commande d'installation code-rhapsodie/dataflow-bundle (#31) 2019-11-18 16:47:30 +01:00
jbcr
099cdd6579 replace dependency to doctrine/orm by doctrine/dbal (#30) 2019-11-08 16:39:09 +01:00
jbcr
206eeae297 add depentency to doctrine/doctrine-bundle (#29) 2019-11-08 14:57:52 +01:00
jbcr
c85c74fe7a make coderhapsodie.dataflow.connection as alias (#28) 2019-11-08 14:10:50 +01:00
jbcr
015e25beff fix command dependency injection (#27) 2019-11-08 11:18:22 +01:00
jbcr
c1c8db7105 fix json_decode on null (#26)
* fix json_decode on null
2019-11-08 10:44:12 +01:00
jbcr
d10642add7 add docs for v2 #21 (#24)
* add docs for v2 #21
Co-Authored-By: jeremycr <32451794+jeremycr@users.noreply.github.com>
2019-11-08 10:43:54 +01:00
jbcr
318f844ccf refactor getQueryBuilder and change visibility (#25)
* refactor getQueryBuilder and change visibility
2019-11-08 08:27:17 +01:00
jbcr
1eedeceef8 Add schema provider (#23)
add schema provider and schema command #23
2019-11-07 16:04:54 +01:00
jbcr
164e68c8ef add connection option for all command #15 (#22)
add connection option for all command #15
2019-11-07 14:34:28 +01:00
jbcr
f444d4d8c0 add factory to determine the current connexion #14 (#20)
* add factory to determine the current connexion #14
2019-11-07 11:49:00 +01:00
jbcr
e78a918af1 add configuration #12 (#18)
* add configuration #12
2019-11-07 11:23:21 +01:00
jbcr
be4cfd00a1 remove ORM and rewrite repository (#17)
remove ORM and rewrite repository #17
2019-11-07 10:51:18 +01:00
jbcr
96dcf8935d change branch alias to start v2 dev (#10) 2019-11-05 15:44:55 +01:00
jeremycr
23801f8785 Added tests (#9) 2019-10-24 15:14:43 +02:00
jbcr
689d79535c add schema and some explaining text (#8)
* add schema and some explaining text

* Apply suggestions from code review

Co-Authored-By: jeremycr <32451794+jeremycr@users.noreply.github.com>

* fix doc

* Apply suggestions from code review

Co-Authored-By: SofLesc <SofLesc@users.noreply.github.com>

* fix recomends
2019-10-24 15:14:19 +02:00
47 changed files with 1774 additions and 522 deletions

View File

@@ -30,6 +30,7 @@ matrix:
# Enable code coverage with the latest supported PHP version
- php: '7.3'
env:
- SYMFONY_VERSION=3.4.*
- COVERALLS_ENABLED="true"
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
@@ -47,10 +48,15 @@ matrix:
- SYMFONY_VERSION=3.4.*
- php: '7.1'
env:
- SYMFONY_VERSION=4.2.*
- SYMFONY_VERSION=4.3.*
- php: '7.1'
env:
- SYMFONY_VERSION=4.3.*
- SYMFONY_VERSION=4.4.*
- php: '7.2'
env:
- COVERALLS_ENABLED="true"
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
- SYMFONY_VERSION=5.0.*
# Test unsupported versions of Symfony
- php: '7.1'
@@ -59,13 +65,16 @@ matrix:
- php: '7.1'
env:
- SYMFONY_VERSION=4.1.*
- php: '7.1'
env:
- SYMFONY_VERSION=4.2.*
# Test upcoming Symfony versions with lowest supported PHP version and dev dependencies
- php: '7.1'
- php: '7.2'
env:
- STABILITY=dev
- SYMFONY_VERSION=4.4.*
- SYMFONY_VERSION=5.1.*
# Test upcoming PHP versions with dev dependencies
- php: '7.4snapshot'
@@ -78,12 +87,14 @@ matrix:
- SYMFONY_VERSION=4.0.*
- env:
- SYMFONY_VERSION=4.1.*
- env:
- SYMFONY_VERSION=4.2.*
- env:
- STABILITY=dev
- COMPOSER_FLAGS="--ignore-platform-reqs --prefer-stable"
- env:
- STABILITY=dev
- SYMFONY_VERSION=4.4.*
- SYMFONY_VERSION=5.1.*
before_install:
- if [[ "$SYMFONY_VERSION" != "" ]]; then
@@ -108,4 +119,4 @@ script: ./vendor/bin/phpunit $PHPUNIT_FLAGS
after_success:
- if [[ "$PHPUNIT_ENABLED" == "true" && "$COVERALLS_ENABLED" == "true" ]]; then
./vendor/bin/php-coveralls -vvv --config .coveralls.yml;
fi;
fi;

38
CHANGELOG.md Normal file
View File

@@ -0,0 +1,38 @@
# Version 2.1.0
* Added CollectionWriter and DelegatorWriter
* Adding Symfony 5.0 compatibility
* Save all exceptions caught in the log for `code-rhapsodie:dataflow:execute`
* Added more output when errors occured during `code-rhapsodie:dataflow:execute`
# Version 2.0.2
* Fixed the connection proxy class created by the factory
# Version 2.0.1
* Fixed next execution time not increasing for scheduled dataflows
# Version 2.0.0
* Add Doctrine DBAL multi-connection support
* Add configuration to define the default Doctrine DBAL connection
* Remove Doctrine ORM
* Rewrite repositories
# Version 1.0.1
* Fix lost dependency
* Fix schedule removing
# Version 1.0.0
Initial version
* Define and configure a Dataflow
* Run the Job scheduled
* Run one Dataflow from the command line
* Define the schedule for a Dataflow from the command line
* Enable/Disable a scheduled Dataflow from the command line
* Display the list of scheduled Dataflow from the command line
* Display the result for the last Job for a Dataflow from the command line

227
README.md
View File

@@ -1,4 +1,4 @@
# Code-Rhapsodie Dataflow Bundle
# Code Rhapsodie Dataflow Bundle
DataflowBundle is a bundle for Symfony 3.4+
providing an easy way to create import / export dataflow.
@@ -7,6 +7,20 @@ providing an easy way to create import / export dataflow.
[![Coverage Status](https://coveralls.io/repos/github/code-rhapsodie/dataflow-bundle/badge.svg)](https://coveralls.io/github/code-rhapsodie/dataflow-bundle)
Dataflow uses a linear generic workflow in three parts:
* one reader
* any number of steps
* one or more writers
The reader can read data from anywhere and return data row by row. Each step processes the current row data.
The steps are executed in the order in which they are added.
And, one or more writers save the row anywhere you want.
As the following schema shows, you can define more than one dataflow:
![Dataflow schema](src/Resources/doc/schema.png)
# Features
* Define and configure a Dataflow
@@ -16,6 +30,7 @@ providing an easy way to create import / export dataflow.
* Enable/Disable a scheduled Dataflow from the command line
* Display the list of scheduled Dataflow from the command line
* Display the result for the last Job for a Dataflow from the command line
* Work with multiple Doctrine DBAL connections
## Installation
@@ -25,7 +40,7 @@ providing an easy way to create import / export dataflow.
To install this bundle, run this command :
```shell script
$ composer require code-rhapsodie/dataflow
$ composer require code-rhapsodie/dataflow-bundle
```
#### Suggest
@@ -84,26 +99,32 @@ public function registerBundles()
### Update the database
This bundle uses Doctrine ORM for drive the database table for store Dataflow schedule (`cr_dataflow_scheduled`)
This bundle uses Doctrine DBAL to store Dataflow schedule into the database table (`cr_dataflow_scheduled`)
and jobs (`cr_dataflow_job`).
#### Doctrine migration
Execute the command to generate the migration for your database:
```shell script
$ bin/console doctrine:migration:diff
```
#### Other migration tools
If you use [Phinx](https://phinx.org/) or [Kaliop Migration Bundle](https://github.com/kaliop-uk/ezmigrationbundle) or whatever,
If you use [Doctrine Migration Bundle](https://symfony.com/doc/master/bundles/DoctrineMigrationsBundle/index.html) or [Phinx](https://phinx.org/)
or [Kaliop Migration Bundle](https://github.com/kaliop-uk/ezmigrationbundle) or whatever,
you can add a new migration with the generated SQL query from this command:
```shell script
$ bin/console doctrine:schema:update --dump-sql
$ bin/console code-rhapsodie:dataflow:dump-schema
```
If you have already the tables, you can add a new migration with the generated update SQL query from this command:
```shell script
$ bin/console code-rhapsodie:dataflow:dump-schema --update
```
## Configuration
By default, the Doctrine DBAL connection used is `default`. You can configure the default connection.
Add this configuration into your Symfony configuration:
```yaml
code_rhapsodie_dataflow:
dbal_default_connection: test #Name of the default connection used by Dataflow bundle
```
## Define a dataflow type
@@ -146,11 +167,11 @@ class MyFirstDataflowType extends AbstractDataflowType
protected function buildDataflow(DataflowBuilder $builder, array $options): void
{
$this->myReader->setFilename($options['fileName']);
$this->myWriter->setDestinationFilePath($options['to-file']);
$builder->setReader($this->myReader)
->addStep(function($data) use ($options) {
$builder
->setReader($this->myReader->read($options['from-file']))
->addStep(function ($data) use ($options) {
// TODO : Write your code here...
return $data;
})
@@ -160,11 +181,8 @@ class MyFirstDataflowType extends AbstractDataflowType
protected function configureOptions(OptionsResolver $optionsResolver): void
{
$optionsResolver->setDefaults([
'my_option' => 'my_default_value',
'fileName' => null,
]);
$optionsResolver->setRequired('fileName');
$optionsResolver->setDefaults(['to-file' => '/tmp/dataflow.csv', 'from-file' => null]);
$optionsResolver->setRequired('from-file');
}
public function getLabel(): string
@@ -208,11 +226,8 @@ class MyFirstDataflowType extends AbstractDataflowType
// ...
protected function configureOptions(OptionsResolver $optionsResolver): void
{
$optionsResolver->setDefaults([
'my_option' => 'my_default_value',
'fileName' => null,
]);
$optionsResolver->setRequired('fileName');
$optionsResolver->setDefaults(['to-file' => '/tmp/dataflow.csv', 'from-file' => null]);
$optionsResolver->setRequired('from-file');
}
}
@@ -260,27 +275,18 @@ namespace CodeRhapsodie\DataflowExemple\Reader;
class FileReader
{
private $filename;
/**
* Set the filename option needed by the Reader.
*/
public function setFilename(string $filename) {
$this->filename = $filename;
}
public function __invoke(): iterable
public function read(string $filename): iterable
{
if (!$this->filename) {
if (!$filename) {
throw new \Exception("The file name is not defined. Define it with 'setFilename' method");
}
if (!$fh = fopen($this->filename, 'r')) {
throw new \Exception("Unable to open file '".$this->filename."' for read.");
if (!$fh = fopen($filename, 'r')) {
throw new \Exception("Unable to open file '".$filename."' for read.");
}
while (false === ($read = fread($fh, 1024))) {
yield explode("|", $read);
while (false !== ($read = fgets($fh))) {
yield explode('|', trim($read));
}
}
}
@@ -306,14 +312,16 @@ A *Step* can be any callable, taking the element as its argument, and returning
A few examples:
```php
$builder->addStep(function($item) {
<?php
//[...]
$builder->addStep(function ($item) {
// Titles are changed to all caps before export
$item['title'] = strtoupper($item['title']);
return $item;
});
$builder->addStep(function($item) {
$builder->addStep(function ($item) {
// Private items are not exported
if ($item['private']) {
return false;
@@ -321,6 +329,7 @@ $builder->addStep(function($item) {
return $item;
});
//[...]
```
### Writers
@@ -348,11 +357,20 @@ class FileWriter implements WriterInterface
{
private $fh;
/** @var string */
private $path;
public function setDestinationFilePath(string $path) {
$this->path = $path;
}
public function prepare()
{
if (!$this->fh = fopen('/path/to/file', 'w')) {
throw new \Exception("Unable to open in write mode the output file.");
if (null === $this->path) {
throw new \Exception('Define the destination file name before use');
}
if (!$this->fh = fopen($this->path, 'w')) {
throw new \Exception('Unable to open in write mode the output file.');
}
}
@@ -368,6 +386,95 @@ class FileWriter implements WriterInterface
}
```
#### CollectionWriter
If you want to write multiple items from a single item read, you can use the generic `CollectionWriter`. This writer will iterate over any `iterable` it receives, and pass each item from that collection to your own writer that handles single items.
```php
$builder->addWriter(new CollectionWriter($mySingleItemWriter));
```
#### DelegatorWriter
If you want to call different writers depending on what item is read, you can use the generic `DelegatorWriter`.
As an example, let's suppose our items are arrays with the first entry being either `product` or `order`. We want to use a different writer based on that value.
First, create your writers implementing `DelegateWriterInterface` (this interface extends `WriterInterface` so your writers can still be used without the `DelegatorWriter`).
```php
<?php
namespace CodeRhapsodie\DataflowExemple\Writer;
use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface;
class ProductWriter implements DelegateWriterInterface
{
public function supports($item): bool
{
return 'product' === reset($item);
}
public function prepare()
{
}
public function write($item)
{
// Process your product
}
public function finish()
{
}
}
```
```php
<?php
namespace CodeRhapsodie\DataflowExemple\Writer;
use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface;
class OrderWriter implements DelegateWriterInterface
{
public function supports($item): bool
{
return 'order' === reset($item);
}
public function prepare()
{
}
public function write($item)
{
// Process your order
}
public function finish()
{
}
}
```
Then, configure your `DelegatorWriter` and add it to your dataflow type.
```php
protected function buildDataflow(DataflowBuilder $builder, array $options): void
{
// Snip add reader and steps
$delegatorWriter = new DelegatorWriter();
$delegatorWriter->addDelegate(new ProductWriter());
$delegatorWriter->addDelegate(new OrderWriter());
$builder->addWriter($delegatorWriter);
}
```
During execution, the `DelegatorWriter` will simply pass each item received to its first delegate (in the order those were added) that supports it. If no delegate supports an item, an exception will be thrown.
## Queue
All pending dataflow job processes are stored in a queue into the database.
@@ -394,6 +501,28 @@ Several commands are provided to manage schedules and run jobs.
`code-rhapsodie:dataflow:execute` Let you execute one dataflow job.
`code-rhapsodie:dataflow:dump-schema` Generates schema create / update SQL queries
### Work with many databases
All commands have a `--connection` option to define what Doctrine DBAL connection to use during execution.
Example:
This command uses the `default` DBAL connection to generate all schema update queries.
```shell script
$ bin/console code-rhapsodie:dataflow:dump-schema --update --connection=default
```
To execute all pending job for a specific connection use:
```shell script
# Run for dataflow DBAL connection
$ bin/console code-rhapsodie:dataflow:run-pending --connection=dataflow
# Run for default DBAL connection
$ bin/console code-rhapsodie:dataflow:run-pending --connection=default
```
# Issues and feature requests

View File

@@ -0,0 +1,55 @@
<?php
namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder;
use PHPUnit\Framework\TestCase;
use Symfony\Component\OptionsResolver\OptionsResolver;
class AbstractDataflowTypeTest extends TestCase
{
public function testProcess()
{
$label = 'Test label';
$options = ['testOption' => 'Test value'];
$values = [1, 2, 3];
$testCase = $this;
$dataflowType = new class($label, $options, $values, $testCase) extends AbstractDataflowType
{
private $label;
private $options;
private $values;
private $testCase;
public function __construct(string $label, array $options, array $values, TestCase $testCase)
{
$this->label = $label;
$this->options = $options;
$this->values = $values;
$this->testCase = $testCase;
}
public function getLabel(): string
{
return $this->label;
}
protected function configureOptions(OptionsResolver $optionsResolver): void
{
$optionsResolver->setDefined('testOption');
}
protected function buildDataflow(DataflowBuilder $builder, array $options): void
{
$builder->setReader($this->values);
$this->testCase->assertSame($this->options, $options);
}
};
$result = $dataflowType->process($options);
$this->assertSame($label, $result->getName());
$this->assertSame(count($values), $result->getTotalProcessedCount());
}
}

View File

@@ -0,0 +1,56 @@
<?php
namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType\Writer;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\CollectionWriter;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use CodeRhapsodie\DataflowBundle\Exceptions\UnsupportedItemTypeException;
use PHPUnit\Framework\TestCase;
class CollectionWriterTest extends TestCase
{
public function testNotACollection()
{
$this->expectException(UnsupportedItemTypeException::class);
$writer = new CollectionWriter($this->createMock(WriterInterface::class));
$writer->write('Not an iterable');
}
public function testSupports()
{
$writer = new CollectionWriter($this->createMock(WriterInterface::class));
$this->assertTrue($writer->supports([]));
$this->assertTrue($writer->supports(new \ArrayIterator([])));
$this->assertFalse($writer->supports(''));
$this->assertFalse($writer->supports(0));
}
public function testAll()
{
$values = ['a', 'b', 'c'];
$embeddedWriter = $this->createMock(WriterInterface::class);
$embeddedWriter
->expects($this->once())
->method('prepare')
;
$embeddedWriter
->expects($this->once())
->method('finish')
;
$embeddedWriter
->expects($this->exactly(count($values)))
->method('write')
->withConsecutive(...array_map(function ($item) {
return [$item];
}, $values))
;
$writer = new CollectionWriter($embeddedWriter);
$writer->prepare();
$writer->write($values);
$writer->finish();
}
}

View File

@@ -0,0 +1,160 @@
<?php
namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType\Writer;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegateWriterInterface;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegatorWriter;
use CodeRhapsodie\DataflowBundle\Exceptions\UnsupportedItemTypeException;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
class DelegatorWriterTest extends TestCase
{
/** @var DelegatorWriter */
private $delegatorWriter;
/** @var DelegateWriterInterface|MockObject */
private $delegateInt;
/** @var DelegateWriterInterface|MockObject */
private $delegateString;
/** @var DelegateWriterInterface|MockObject */
private $delegateArray;
protected function setUp(): void
{
$this->delegateInt = $this->createMock(DelegateWriterInterface::class);
$this->delegateInt->method('supports')->willReturnCallback(function ($argument) {
return is_int($argument);
});
$this->delegateString = $this->createMock(DelegateWriterInterface::class);
$this->delegateString->method('supports')->willReturnCallback(function ($argument) {
return is_string($argument);
});
$this->delegateArray = $this->createMock(DelegateWriterInterface::class);
$this->delegateArray->method('supports')->willReturnCallback(function ($argument) {
return is_array($argument);
});
$this->delegatorWriter = new DelegatorWriter();
$this->delegatorWriter->addDelegates([
$this->delegateInt,
$this->delegateString,
$this->delegateArray,
]);
}
public function testUnsupported()
{
$this->expectException(UnsupportedItemTypeException::class);
$this->delegatorWriter->write(new \stdClass());
}
public function testStopAtFirstSupportingDelegate()
{
$value = 0;
$this->delegateInt->expects($this->once())->method('supports');
$this->delegateInt
->expects($this->once())
->method('write')
->with($value)
;
$this->delegateString->expects($this->never())->method('supports');
$this->delegateArray->expects($this->never())->method('supports');
$this->delegateString->expects($this->never())->method('write');
$this->delegateArray->expects($this->never())->method('write');
$this->delegatorWriter->write($value);
}
public function testNotSupported()
{
$value = new \stdClass();
$this->delegateInt
->expects($this->once())
->method('supports')
->with($value)
;
$this->delegateString
->expects($this->once())
->method('supports')
->with($value)
;
$this->delegateArray
->expects($this->once())
->method('supports')
->with($value)
;
$this->assertFalse($this->delegatorWriter->supports($value));
}
public function testSupported()
{
$value = '';
$this->delegateInt
->expects($this->once())
->method('supports')
->with($value)
;
$this->delegateString
->expects($this->once())
->method('supports')
->with($value)
;
$this->delegateArray
->expects($this->never())
->method('supports')
;
$this->assertTrue($this->delegatorWriter->supports($value));
}
public function testAll()
{
$value = ['a'];
$this->delegateInt
->expects($this->once())
->method('supports')
->with($value)
;
$this->delegateString
->expects($this->once())
->method('supports')
->with($value)
;
$this->delegateArray
->expects($this->once())
->method('supports')
->with($value)
;
$this->delegateInt->expects($this->once())->method('prepare');
$this->delegateString->expects($this->once())->method('prepare');
$this->delegateArray->expects($this->once())->method('prepare');
$this->delegateInt->expects($this->once())->method('finish');
$this->delegateString->expects($this->once())->method('finish');
$this->delegateArray->expects($this->once())->method('finish');
$this->delegateInt->expects($this->never())->method('write');
$this->delegateString->expects($this->never())->method('write');
$this->delegateArray
->expects($this->once())
->method('write')
->with($value)
;
$this->delegatorWriter->prepare();
$this->delegatorWriter->write($value);
$this->delegatorWriter->finish();
}
}

View File

@@ -0,0 +1,37 @@
<?php
namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType\Writer;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter;
use PHPUnit\Framework\TestCase;
class PortWriterAdapterTest extends TestCase
{
public function testAll()
{
$value = 'not an array';
$writer = $this->getMockBuilder('\Port\Writer')
->setMethods(['prepare', 'finish', 'writeItem'])
->getMock()
;
$writer
->expects($this->once())
->method('prepare')
;
$writer
->expects($this->once())
->method('finish')
;
$writer
->expects($this->once())
->method('writeItem')
->with([$value])
;
$adapter = new PortWriterAdapter($writer);
$adapter->prepare();
$adapter->write($value);
$adapter->finish();
}
}

View File

@@ -9,6 +9,7 @@ use CodeRhapsodie\DataflowBundle\Exceptions\UnknownDataflowTypeException;
use CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManager;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Doctrine\DBAL\Driver\Connection;
use Doctrine\ORM\EntityManagerInterface;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
@@ -18,8 +19,8 @@ class ScheduledDataflowManagerTest extends TestCase
/** @var ScheduledDataflowManager */
private $manager;
/** @var EntityManagerInterface|MockObject */
private $em;
/** @var Connection|MockObject */
private $connection;
/** @var ScheduledDataflowRepository|MockObject */
private $scheduledDataflowRepository;
@@ -29,17 +30,18 @@ class ScheduledDataflowManagerTest extends TestCase
protected function setUp(): void
{
$this->em = $this->createMock(EntityManagerInterface::class);
$this->connection = $this->createMock(Connection::class);
$this->scheduledDataflowRepository = $this->createMock(ScheduledDataflowRepository::class);
$this->jobRepository = $this->createMock(JobRepository::class);
$this->manager = new ScheduledDataflowManager($this->em, $this->scheduledDataflowRepository, $this->jobRepository);
$this->manager = new ScheduledDataflowManager($this->connection, $this->scheduledDataflowRepository, $this->jobRepository);
}
public function testCreateJobsFromScheduledDataflows()
{
$scheduled1 = new ScheduledDataflow();
$scheduled2 = (new ScheduledDataflow())
->setId(-1)
->setDataflowType($type = 'testType')
->setOptions($options = ['opt' => 'val'])
->setNext($next = new \DateTime())
@@ -60,9 +62,13 @@ class ScheduledDataflowManagerTest extends TestCase
->willReturnOnConsecutiveCalls(new Job(), null)
;
$this->em
$this->connection
->expects($this->once())
->method('persist')
->method('beginTransaction')
;
$this->jobRepository
->expects($this->once())
->method('save')
->with(
$this->callback(function (Job $job) use ($type, $options, $next, $label, $scheduled2) {
return (
@@ -71,19 +77,65 @@ class ScheduledDataflowManagerTest extends TestCase
&& $job->getOptions() === $options
&& $job->getRequestedDate() == $next
&& $job->getLabel() === $label
&& $job->getScheduledDataflow() === $scheduled2
&& $job->getScheduledDataflowId() === $scheduled2->getId()
);
})
)
;
$this->em
$this->scheduledDataflowRepository
->expects($this->once())
->method('flush')
->method('save')
->with($scheduled2)
;
$this->connection
->expects($this->once())
->method('commit')
;
$this->manager->createJobsFromScheduledDataflows();
$this->assertEquals($next->add(\DateInterval::createFromDateString($frequency)), $scheduled2->getNext());
}
public function testCreateJobsFromScheduledDataflowsWithError()
{
$scheduled1 = new ScheduledDataflow();
$this->scheduledDataflowRepository
->expects($this->once())
->method('findReadyToRun')
->willReturn([$scheduled1])
;
$this->jobRepository
->expects($this->exactly(1))
->method('findPendingForScheduledDataflow')
->withConsecutive([$scheduled1])
->willThrowException(new \Exception())
;
$this->connection
->expects($this->once())
->method('beginTransaction')
;
$this->jobRepository
->expects($this->never())
->method('save')
;
$this->connection
->expects($this->never())
->method('commit')
;
$this->connection
->expects($this->once())
->method('rollBack')
;
$this->expectException(\Exception::class);
$this->manager->createJobsFromScheduledDataflows();
}
}

View File

@@ -20,9 +20,6 @@ class PendingDataflowRunnerTest extends TestCase
/** @var PendingDataflowRunner */
private $runner;
/** @var EntityManagerInterface|MockObject */
private $em;
/** @var JobRepository|MockObject */
private $repository;
@@ -34,12 +31,11 @@ class PendingDataflowRunnerTest extends TestCase
protected function setUp(): void
{
$this->em = $this->createMock(EntityManagerInterface::class);
$this->repository = $this->createMock(JobRepository::class);
$this->registry = $this->createMock(DataflowTypeRegistryInterface::class);
$this->dispatcher = $this->createMock(EventDispatcherInterface::class);
$this->runner = new PendingDataflowRunner($this->em, $this->repository, $this->registry, $this->dispatcher);
$this->runner = new PendingDataflowRunner($this->repository, $this->registry, $this->dispatcher);
}
public function testRunPendingDataflows()
@@ -62,36 +58,68 @@ class PendingDataflowRunnerTest extends TestCase
->willReturnOnConsecutiveCalls($job1, $job2, null)
;
$this->dispatcher
->expects($this->exactly(4))
->method('dispatch')
->withConsecutive(
[
Events::BEFORE_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
})
],
[
Events::AFTER_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
})
],
[
Events::BEFORE_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
})
],
[
Events::AFTER_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
})
]
)
;
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher
->expects($this->exactly(4))
->method('dispatch')
->withConsecutive(
[
Events::BEFORE_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
})
],
[
Events::AFTER_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
})
],
[
Events::BEFORE_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
})
],
[
Events::AFTER_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
})
]
);
} else { // Symfony 5.0+
$this->dispatcher
->expects($this->exactly(4))
->method('dispatch')
->withConsecutive(
[
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
}),
Events::BEFORE_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
}),
Events::AFTER_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
}),
Events::BEFORE_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
}),
Events::AFTER_PROCESSING,
]
);
}
$dataflowType1 = $this->createMock(DataflowTypeInterface::class);
$dataflowType2 = $this->createMock(DataflowTypeInterface::class);
@@ -103,10 +131,8 @@ class PendingDataflowRunnerTest extends TestCase
->willReturnOnConsecutiveCalls($dataflowType1, $dataflowType2)
;
$bag1 = new \SplObjectStorage();
$bag1->attach(new \Exception('message1'));
$bag2 = new \SplObjectStorage();
$bag2->attach(new \Exception('message2'));
$bag1 = [new \Exception('message1')];
$bag2 = [new \Exception('message2')];
$result1 = new Result('name', new \DateTime(), $end1 = new \DateTime(), $count1 = 10, $bag1);
$result2 = new Result('name', new \DateTime(), $end2 = new \DateTime(), $count2 = 20, $bag2);
@@ -124,9 +150,9 @@ class PendingDataflowRunnerTest extends TestCase
->willReturn($result2)
;
$this->em
$this->repository
->expects($this->exactly(4))
->method('flush')
->method('save')
;
$this->runner->runPendingDataflows();

3
UPGRADE.md Normal file
View File

@@ -0,0 +1,3 @@
# Upgrade from v1.x to v2.0
[BC] `JobRepository` and `ScheduledDataflowRepository` are no longer a Doctrine ORM repository.

View File

@@ -37,20 +37,19 @@
},
"require": {
"php": "^7.1",
"doctrine/orm": "^2.4.5",
"seld/signal-handler": "^1.0",
"symfony/config": "^3.4||^4.0",
"symfony/console": "^3.4||^4.0",
"symfony/dependency-injection": "^3.4||^4.0",
"symfony/event-dispatcher": "^3.4||^4.0",
"symfony/http-kernel": "^3.4||^4.0",
"symfony/lock": "^3.4||^4.0",
"symfony/options-resolver": "^3.4||^4.0",
"symfony/validator": "^3.4||^4.0",
"symfony/yaml": "^3.4||^4.0"
"doctrine/dbal": "^2.0",
"symfony/config": "^3.4||^4.0||^5.0",
"symfony/console": "^3.4||^4.0||^5.0",
"symfony/dependency-injection": "^3.4||^4.0||^5.0",
"symfony/event-dispatcher": "^3.4||^4.0||^5.0",
"symfony/http-kernel": "^3.4||^4.0||^5.0",
"symfony/lock": "^3.4||^4.0||^5.0",
"symfony/options-resolver": "^3.4||^4.0||^5.0",
"symfony/validator": "^3.4||^4.0||^5.0",
"symfony/yaml": "^3.4||^4.0||^5.0",
"doctrine/doctrine-bundle": "^1.0||^2.0"
},
"require-dev": {
"friendsofphp/php-cs-fixer": "^2.15",
"phpunit/phpunit": "^7||^8"
},
"suggest": {
@@ -61,7 +60,9 @@
},
"extra": {
"branch-alias": {
"dev-master": "1.x-dev"
"dev-master": "2.1.x-dev",
"dev-v2.0.x": "2.0.x-dev",
"dev-v1.x": "1.x-dev"
}
}
}

View File

@@ -9,6 +9,9 @@ use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\DataflowTypeCompil
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\HttpKernel\Bundle\Bundle;
/**
* @codeCoverageIgnore
*/
class CodeRhapsodieDataflowBundle extends Bundle
{
protected $name = 'CodeRhapsodieDataflowBundle';

View File

@@ -5,6 +5,7 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Symfony\Component\Console\Command\Command;
@@ -14,6 +15,9 @@ use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Validator\Validator\ValidatorInterface;
/**
* @codeCoverageIgnore
*/
class AddScheduledDataflowCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:add';
@@ -25,13 +29,17 @@ class AddScheduledDataflowCommand extends Command
/** @var ValidatorInterface */
private $validator;
public function __construct(DataflowTypeRegistryInterface $registry, ScheduledDataflowRepository $scheduledDataflowRepository, ValidatorInterface $validator)
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(DataflowTypeRegistryInterface $registry, ScheduledDataflowRepository $scheduledDataflowRepository, ValidatorInterface $validator, ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->registry = $registry;
$this->scheduledDataflowRepository = $scheduledDataflowRepository;
$this->validator = $validator;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -44,10 +52,12 @@ class AddScheduledDataflowCommand extends Command
->setHelp('The <info>%command.name%</info> allows you to create a new scheduled dataflow.')
->addOption('label', null, InputOption::VALUE_REQUIRED, 'Label of the scheduled dataflow')
->addOption('type', null, InputOption::VALUE_REQUIRED, 'Type of the scheduled dataflow (FQCN)')
->addOption('options', null, InputOption::VALUE_OPTIONAL, 'Options of the scheduled dataflow (ex: {"option1": "value1", "option2": "value2"})')
->addOption('options', null, InputOption::VALUE_OPTIONAL,
'Options of the scheduled dataflow (ex: {"option1": "value1", "option2": "value2"})')
->addOption('frequency', null, InputOption::VALUE_REQUIRED, 'Frequency of the scheduled dataflow')
->addOption('first_run', null, InputOption::VALUE_REQUIRED, 'Date for the first run of the scheduled dataflow (Y-m-d H:i:s)')
->addOption('enabled', null, InputOption::VALUE_REQUIRED, 'State of the scheduled dataflow');
->addOption('enabled', null, InputOption::VALUE_REQUIRED, 'State of the scheduled dataflow')
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
@@ -55,6 +65,9 @@ class AddScheduledDataflowCommand extends Command
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$choices = [];
$typeMapping = [];
foreach ($this->registry->listDataflowTypes() as $fqcn => $dataflowType) {
@@ -74,11 +87,13 @@ class AddScheduledDataflowCommand extends Command
}
$options = $input->getOption('options');
if (!$options) {
$options = $io->ask('What are the launch options for the scheduled dataflow? (ex: {"option1": "value1", "option2": "value2"})', json_encode([]));
$options = $io->ask('What are the launch options for the scheduled dataflow? (ex: {"option1": "value1", "option2": "value2"})',
json_encode([]));
}
$frequency = $input->getOption('frequency');
if (!$frequency) {
$frequency = $io->choice('What is the frequency for the scheduled dataflow?', ScheduledDataflow::AVAILABLE_FREQUENCIES);
$frequency = $io->choice('What is the frequency for the scheduled dataflow?',
ScheduledDataflow::AVAILABLE_FREQUENCIES);
}
$firstRun = $input->getOption('first_run');
if (!$firstRun) {
@@ -89,44 +104,27 @@ class AddScheduledDataflowCommand extends Command
$enabled = $io->confirm('Enable the scheduled dataflow?');
}
try {
$newScheduledDataflow = $this->createEntityFromArray([
'label' => $label,
'type' => $type,
'options' => $options,
'frequency' => $frequency,
'first_run' => $firstRun,
'enabled' => $enabled,
]);
$newScheduledDataflow = ScheduledDataflow::createFromArray([
'id' => null,
'label' => $label,
'dataflow_type' => $type,
'options' => json_decode($options, true),
'frequency' => $frequency,
'next' => new \DateTimeImmutable($firstRun),
'enabled' => $enabled,
]);
$errors = $this->validator->validate($newScheduledDataflow);
if (count($errors) > 0) {
$io->error((string) $errors);
$errors = $this->validator->validate($newScheduledDataflow);
if (count($errors) > 0) {
$io->error((string) $errors);
return 2;
}
$this->scheduledDataflowRepository->save($newScheduledDataflow);
$io->success(sprintf('New scheduled dataflow "%s" (id:%d) was created successfully.', $newScheduledDataflow->getLabel(), $newScheduledDataflow->getId()));
return 0;
} catch (\Exception $e) {
$io->error(sprintf('An error occured when creating new scheduled dataflow : "%s".', $e->getMessage()));
return 1;
return 2;
}
}
private function createEntityFromArray(array $input): ScheduledDataflow
{
$scheduledDataflow = new ScheduledDataflow();
$scheduledDataflow->setLabel($input['label']);
$scheduledDataflow->setDataflowType($input['type']);
$scheduledDataflow->setOptions(json_decode($input['options'], true));
$scheduledDataflow->setFrequency($input['frequency']);
$scheduledDataflow->setNext(new \DateTimeImmutable($input['first_run']));
$scheduledDataflow->setEnabled($input['enabled']);
$this->scheduledDataflowRepository->save($newScheduledDataflow);
$io->success(sprintf('New scheduled dataflow "%s" (id:%d) was created successfully.',
$newScheduledDataflow->getLabel(), $newScheduledDataflow->getId()));
return $scheduledDataflow;
return 0;
}
}

View File

@@ -5,6 +5,7 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
@@ -13,6 +14,9 @@ use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
/**
* @codeCoverageIgnore
*/
class ChangeScheduleStatusCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:change-status';
@@ -20,11 +24,15 @@ class ChangeScheduleStatusCommand extends Command
/** @var ScheduledDataflowRepository */
private $scheduledDataflowRepository;
public function __construct(ScheduledDataflowRepository $scheduledDataflowRepository)
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(ScheduledDataflowRepository $scheduledDataflowRepository, ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->scheduledDataflowRepository = $scheduledDataflowRepository;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -37,7 +45,8 @@ class ChangeScheduleStatusCommand extends Command
->setHelp('The <info>%command.name%</info> command able you to change schedule status.')
->addArgument('schedule-id', InputArgument::REQUIRED, 'Id of the schedule')
->addOption('enable', null, InputOption::VALUE_NONE, 'Enable the schedule')
->addOption('disable', null, InputOption::VALUE_NONE, 'Disable the schedule');
->addOption('disable', null, InputOption::VALUE_NONE, 'Disable the schedule')
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
@@ -45,9 +54,12 @@ class ChangeScheduleStatusCommand extends Command
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$io = new SymfonyStyle($input, $output);
/** @var ScheduledDataflow|null $schedule */
$schedule = $this->scheduledDataflowRepository->find($input->getArgument('schedule-id'));
$schedule = $this->scheduledDataflowRepository->find((int) $input->getArgument('schedule-id'));
if (!$schedule) {
$io->error(sprintf('Cannot find scheduled dataflow with id "%d".', $input->getArgument('schedule-id')));

View File

@@ -4,14 +4,19 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
/**
* Runs one dataflow.
*
* @codeCoverageIgnore
*/
class ExecuteDataflowCommand extends Command
{
@@ -20,11 +25,19 @@ class ExecuteDataflowCommand extends Command
/** @var DataflowTypeRegistryInterface */
private $registry;
public function __construct(DataflowTypeRegistryInterface $registry)
/** @var ConnectionFactory */
private $connectionFactory;
/** @var LoggerInterface */
private $logger;
public function __construct(DataflowTypeRegistryInterface $registry, ConnectionFactory $connectionFactory, LoggerInterface $logger)
{
parent::__construct();
$this->registry = $registry;
$this->connectionFactory = $connectionFactory;
$this->logger = $logger;
}
/**
@@ -42,7 +55,7 @@ EOF
)
->addArgument('fqcn', InputArgument::REQUIRED, 'FQCN or alias of the dataflow type')
->addArgument('options', InputArgument::OPTIONAL, 'Options for the dataflow type as a json string', '[]')
;
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
@@ -50,6 +63,9 @@ EOF
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$fqcnOrAlias = $input->getArgument('fqcn');
$options = json_decode($input->getArgument('options'), true);
@@ -61,6 +77,17 @@ EOF
$output->writeln('End time: '.$result->getEndTime()->format('Y/m/d H:i:s'));
$output->writeln('Success: '.$result->getSuccessCount());
if ($result->hasErrors() > 0) {
$output->writeln('<error> Errors: '.$result->getErrorCount().' </error>');
$output->writeln('<error> Exceptions traces are available in the logs. </error>');
foreach ($result->getExceptions() as $e) {
$this->logger->error('Error during processing : '.$e->getMessage(), ['exception' => $e]);
}
return 1;
}
return 0;
}
}

View File

@@ -5,6 +5,7 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
@@ -12,6 +13,9 @@ use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
/**
* @codeCoverageIgnore
*/
class JobShowCommand extends Command
{
private const STATUS_MAPPING = [
@@ -25,11 +29,15 @@ class JobShowCommand extends Command
/** @var JobRepository */
private $jobRepository;
public function __construct(JobRepository $jobRepository)
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(JobRepository $jobRepository, ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->jobRepository = $jobRepository;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -42,7 +50,8 @@ class JobShowCommand extends Command
->setHelp('The <info>%command.name%</info> display job details for schedule or specific job.')
->addOption('job-id', null, InputOption::VALUE_REQUIRED, 'Id of the job to get details')
->addOption('schedule-id', null, InputOption::VALUE_REQUIRED, 'Id of schedule for last execution details')
->addOption('details', null, InputOption::VALUE_NONE, 'Display full details');
->addOption('details', null, InputOption::VALUE_NONE, 'Display full details')
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
@@ -50,6 +59,10 @@ class JobShowCommand extends Command
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$io = new SymfonyStyle($input, $output);
$jobId = (int) $input->getOption('job-id');

View File

@@ -4,15 +4,19 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManagerInterface;
use CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Command\LockableTrait;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
/**
* Runs dataflows according to user-defined schedule.
*
* @codeCoverageIgnore
*/
class RunPendingDataflowsCommand extends Command
{
@@ -26,12 +30,16 @@ class RunPendingDataflowsCommand extends Command
/** @var PendingDataflowRunnerInterface */
private $runner;
public function __construct(ScheduledDataflowManagerInterface $manager, PendingDataflowRunnerInterface $runner)
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(ScheduledDataflowManagerInterface $manager, PendingDataflowRunnerInterface $runner, ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->manager = $manager;
$this->runner = $runner;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -45,7 +53,7 @@ class RunPendingDataflowsCommand extends Command
The <info>%command.name%</info> command runs dataflows according to the schedule defined in the UI by the user.
EOF
)
;
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
@@ -59,6 +67,10 @@ EOF
return 0;
}
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$this->manager->createJobsFromScheduledDataflows();
$this->runner->runPendingDataflows();

View File

@@ -4,12 +4,17 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
/**
* @codeCoverageIgnore
*/
class ScheduleListCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:list';
@@ -17,11 +22,15 @@ class ScheduleListCommand extends Command
/** @var ScheduledDataflowRepository */
private $scheduledDataflowRepository;
public function __construct(ScheduledDataflowRepository $scheduledDataflowRepository)
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(ScheduledDataflowRepository $scheduledDataflowRepository, ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->scheduledDataflowRepository = $scheduledDataflowRepository;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -31,7 +40,8 @@ class ScheduleListCommand extends Command
{
$this
->setDescription('List scheduled dataflows')
->setHelp('The <info>%command.name%</info> lists all scheduled dataflows.');
->setHelp('The <info>%command.name%</info> lists all scheduled dataflows.')
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
@@ -39,6 +49,9 @@ class ScheduleListCommand extends Command
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$io = new SymfonyStyle($input, $output);
$display = [];
$schedules = $this->scheduledDataflowRepository->listAllOrderedByLabel();
@@ -48,7 +61,7 @@ class ScheduleListCommand extends Command
$schedule['label'],
$schedule['enabled'] ? 'yes' : 'no',
$schedule['startTime'] ? (new \DateTime($schedule['startTime']))->format('Y-m-d H:i:s') : '-',
$schedule['next'] ? $schedule['next']->format('Y-m-d H:i:s') : '-',
$schedule['next'] ? (new \DateTime($schedule['next']))->format('Y-m-d H:i:s') : '-',
];
}

View File

@@ -0,0 +1,99 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use CodeRhapsodie\DataflowBundle\SchemaProvider\DataflowSchemaProvider;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Table;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
/**
* @codeCoverageIgnore
*/
class SchemaCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:dump-schema';
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->connectionFactory = $connectionFactory;
}
/**
* {@inheritdoc}
*/
protected function configure()
{
$this
->setDescription('Generates schema create / update SQL queries')
->setHelp('The <info>%command.name%</info> help you to generate SQL Query to create or update your database schema for this bundle')
->addOption('update', null, InputOption::VALUE_NONE, 'Dump only the update SQL queries.')
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use')
;
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$connection = $this->connectionFactory->getConnection();
$schemaProvider = new DataflowSchemaProvider();
$schema = $schemaProvider->createSchema();
$sqls = $schema->toSql($connection->getDatabasePlatform());
if ($input->getOption('update')) {
$sm = $connection->getSchemaManager();
$tableArray = [JobRepository::TABLE_NAME, ScheduledDataflowRepository::TABLE_NAME];
$tables = [];
foreach ($sm->listTables() as $table) {
/** @var Table $table */
if (in_array($table->getName(), $tableArray)) {
$tables[] = $table;
}
}
$namespaces = [];
if ($connection->getDatabasePlatform()->supportsSchemas()) {
$namespaces = $sm->listNamespaceNames();
}
$sequences = [];
if ($connection->getDatabasePlatform()->supportsSequences()) {
$sequences = $sm->listSequences();
}
$oldSchema = new Schema($tables, $sequences, $sm->createSchemaConfig(), $namespaces);
$sqls = $schema->getMigrateFromSql($oldSchema, $connection->getDatabasePlatform());
}
$io = new SymfonyStyle($input, $output);
$io->text('Execute these SQL Queries on your database:');
foreach ($sqls as $sql) {
$io->text($sql.';');
}
}
}

View File

@@ -8,6 +8,9 @@ use Symfony\Component\OptionsResolver\OptionsResolver;
abstract class AbstractDataflowType implements DataflowTypeInterface
{
/**
* @codeCoverageIgnore
*/
public function getAliases(): iterable
{
return [];
@@ -28,6 +31,9 @@ abstract class AbstractDataflowType implements DataflowTypeInterface
return $dataflow->process();
}
/**
* @codeCoverageIgnore
*/
protected function configureOptions(OptionsResolver $optionsResolver): void
{
}

View File

@@ -6,8 +6,6 @@ namespace CodeRhapsodie\DataflowBundle\DataflowType\Dataflow;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use CodeRhapsodie\DataflowBundle\Exceptions\InterruptedProcessingException;
use Seld\Signal\SignalHandler;
class Dataflow implements DataflowInterface
{
@@ -23,10 +21,6 @@ class Dataflow implements DataflowInterface
/** @var WriterInterface[] */
private $writers = [];
/**
* @param iterable $reader
* @param string|null $name
*/
public function __construct(iterable $reader, ?string $name)
{
$this->reader = $reader;
@@ -34,8 +28,6 @@ class Dataflow implements DataflowInterface
}
/**
* @param callable $step
*
* @return $this
*/
public function addStep(callable $step): self
@@ -46,8 +38,6 @@ class Dataflow implements DataflowInterface
}
/**
* @param WriterInterface $writer
*
* @return $this
*/
public function addWriter(WriterInterface $writer): self
@@ -63,13 +53,9 @@ class Dataflow implements DataflowInterface
public function process(): Result
{
$count = 0;
$exceptions = new \SplObjectStorage();
$exceptions = [];
$startTime = new \DateTime();
SignalHandler::create(['SIGTERM', 'SIGINT'], function () {
throw new InterruptedProcessingException();
});
foreach ($this->writers as $writer) {
$writer->prepare();
}
@@ -77,8 +63,8 @@ class Dataflow implements DataflowInterface
foreach ($this->reader as $index => $item) {
try {
$this->processItem($item);
} catch (\Exception $e) {
$exceptions->attach($e, $index);
} catch (\Throwable $e) {
$exceptions[$index] = $e;
}
++$count;

View File

@@ -13,8 +13,6 @@ interface DataflowInterface
{
/**
* Processes the data.
*
* @return Result
*/
public function process(): Result;
}

View File

@@ -4,6 +4,9 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType;
/**
* @codeCoverageIgnore
*/
class Result
{
/** @var string */
@@ -27,17 +30,10 @@ class Result
/** @var int */
private $totalProcessedCount = 0;
/** @var \SplObjectStorage */
/** @var array */
private $exceptions;
/**
* @param string $name
* @param \DateTimeInterface $startTime
* @param \DateTimeInterface $endTime
* @param int $totalCount
* @param \SplObjectStorage $exceptions
*/
public function __construct(string $name, \DateTimeInterface $startTime, \DateTimeInterface $endTime, int $totalCount, \SplObjectStorage $exceptions)
public function __construct(string $name, \DateTimeInterface $startTime, \DateTimeInterface $endTime, int $totalCount, array $exceptions)
{
$this->name = $name;
$this->startTime = $startTime;
@@ -49,74 +45,47 @@ class Result
$this->exceptions = $exceptions;
}
/**
* @return string
*/
public function getName(): string
{
return $this->name;
}
/**
* @return \DateTimeInterface
*/
public function getStartTime(): \DateTimeInterface
{
return $this->startTime;
}
/**
* @return \DateTimeInterface
*/
public function getEndTime(): \DateTimeInterface
{
return $this->endTime;
}
/**
* @return \DateInterval
*/
public function getElapsed(): \DateInterval
{
return $this->elapsed;
}
/**
* @return int
*/
public function getErrorCount(): int
{
return $this->errorCount;
}
/**
* @return int
*/
public function getSuccessCount(): int
{
return $this->successCount;
}
/**
* @return int
*/
public function getTotalProcessedCount(): int
{
return $this->totalProcessedCount;
}
/**
* @return bool
*/
public function hasErrors(): bool
{
return $this->errorCount > 0;
}
/**
* @return \SplObjectStorage
*/
public function getExceptions(): \SplObjectStorage
public function getExceptions(): array
{
return $this->exceptions;
}

View File

@@ -0,0 +1,62 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType\Writer;
use CodeRhapsodie\DataflowBundle\Exceptions\UnsupportedItemTypeException;
/**
* Delegates the writing of each item in a collection to an embedded writer.
*/
class CollectionWriter implements DelegateWriterInterface
{
/** @var WriterInterface */
private $writer;
/**
* CollectionWriter constructor.
*/
public function __construct(WriterInterface $writer)
{
$this->writer = $writer;
}
/**
* {@inheritdoc}
*/
public function prepare()
{
$this->writer->prepare();
}
/**
* {@inheritdoc}
*/
public function write($collection)
{
if (!is_iterable($collection)) {
throw new UnsupportedItemTypeException(sprintf('Item to write was expected to be an iterable, received %s.', is_object($collection) ? get_class($collection) : gettype($collection)));
}
foreach ($collection as $item) {
$this->writer->write($item);
}
}
/**
* {@inheritdoc}
*/
public function finish()
{
$this->writer->finish();
}
/**
* {@inheritdoc}
*/
public function supports($item): bool
{
return is_iterable($item);
}
}

View File

@@ -0,0 +1,18 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType\Writer;
/**
* A writer that can be used as a delegate of DelegatorWriter.
*/
interface DelegateWriterInterface extends WriterInterface
{
/**
* Returns true if the argument is of a supported type.
*
* @param $item
*/
public function supports($item): bool;
}

View File

@@ -0,0 +1,96 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType\Writer;
use CodeRhapsodie\DataflowBundle\Exceptions\UnsupportedItemTypeException;
/**
* Writer that delegated the actual writing to other writers.
*/
class DelegatorWriter implements DelegateWriterInterface
{
/** @var DelegateWriterInterface[] */
private $delegates;
/**
* DelegatorWriter constructor.
*/
public function __construct()
{
$this->delegates = [];
}
/**
* {@inheritdoc}
*/
public function prepare()
{
foreach ($this->delegates as $delegate) {
$delegate->prepare();
}
}
/**
* {@inheritdoc}
*/
public function write($item)
{
foreach ($this->delegates as $delegate) {
if (!$delegate->supports($item)) {
continue;
}
$delegate->write($item);
return;
}
throw new UnsupportedItemTypeException(sprintf('None of the registered delegate writers support the received item of type %s', is_object($item) ? get_class($item) : gettype($item)));
}
/**
* {@inheritdoc}
*/
public function finish()
{
foreach ($this->delegates as $delegate) {
$delegate->finish();
}
}
/**
* {@inheritdoc}
*/
public function supports($item): bool
{
foreach ($this->delegates as $delegate) {
if ($delegate->supports($item)) {
return true;
}
}
return false;
}
/**
* Registers a collection of delegates.
*
* @param iterable|DelegateWriterInterface[] $delegates
*/
public function addDelegates(iterable $delegates): void
{
foreach ($delegates as $delegate) {
$this->addDelegate($delegate);
}
}
/**
* Registers one delegate.
*/
public function addDelegate(DelegateWriterInterface $delegate): void
{
$this->delegates[] = $delegate;
}
}

View File

@@ -4,11 +4,25 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType\Writer;
/**
* Represents a writer for dataflows.
*/
interface WriterInterface
{
/**
* Called before the dataflow is processed.
*/
public function prepare();
/**
* Write an item.
*
* @param mixed $item
*/
public function write($item);
/**
* Called after the dataflow is processed.
*/
public function finish();
}

View File

@@ -10,6 +10,9 @@ use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Extension\Extension;
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
/**
* @codeCoverageIgnore
*/
class CodeRhapsodieDataflowExtension extends Extension
{
public function load(array $configs, ContainerBuilder $container)
@@ -21,5 +24,9 @@ class CodeRhapsodieDataflowExtension extends Extension
->registerForAutoconfiguration(DataflowTypeInterface::class)
->addTag('coderhapsodie.dataflow.type')
;
$configuration = new Configuration();
$config = $this->processConfiguration($configuration, $configs);
$container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']);
}
}

View File

@@ -11,6 +11,8 @@ use Symfony\Component\DependencyInjection\Reference;
/**
* Registers dataflow types in the registry.
*
* @codeCoverageIgnore
*/
class DataflowTypeCompilerPass implements CompilerPassInterface
{

View File

@@ -0,0 +1,27 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DependencyInjection;
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use Symfony\Component\Config\Definition\ConfigurationInterface;
class Configuration implements ConfigurationInterface
{
public function getConfigTreeBuilder()
{
$treeBuilder = new TreeBuilder();
$rootNode = $treeBuilder->root('code_rhapsodie_dataflow');
$rootNode
->children()
->scalarNode('dbal_default_connection')
->defaultValue('default')
->end()
->end()
;
return $treeBuilder;
}
}

View File

@@ -4,13 +4,12 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Entity;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Validator\Constraints as Asserts;
/**
* Dataflow execution status.
*
* @ORM\Entity(repositoryClass="CodeRhapsodie\DataflowBundle\Repository\JobRepository")
* @ORM\Table(name="cr_dataflow_job")
* @codeCoverageIgnore
*/
class Job
{
@@ -18,89 +17,88 @@ class Job
const STATUS_RUNNING = 1;
const STATUS_COMPLETED = 2;
private const KEYS = [
'id',
'status',
'label',
'dataflow_type',
'options',
'requested_date',
'scheduled_dataflow_id',
'count',
'exceptions',
'start_time',
'end_time',
];
/**
* @var int
*
* @ORM\Id()
* @ORM\Column(type="integer")
* @ORM\GeneratedValue(strategy="AUTO")
* @var int|null
*/
private $id;
/**
* @var int
*
* @ORM\Column(type="integer")
* @Asserts\Range(min=0, max=2)
*/
private $status;
/**
* @var string|null
*
* @ORM\Column(type="string")
* @Asserts\NotBlank()
* @Asserts\Length(min=1, max=255)
* @Asserts\Regex("#^[[:alnum:] ]+\z#u")
*/
private $label;
/**
* @var string|null
*
* @ORM\Column(type="string")
* @Asserts\NotBlank()
* @Asserts\Length(min=1, max=255)
* @Asserts\Regex("#^[[:alnum:]\\]+\z#u")
*/
private $dataflowType;
/**
* @var array|null
*
* @ORM\Column(type="json")
*/
private $options;
/**
* @var \DateTimeInterface|null
*
* @ORM\Column(type="datetime", nullable=true)
* @Asserts\DateTime()
*/
private $requestedDate;
/**
* @var ScheduledDataflow|null
*
* @ORM\ManyToOne(targetEntity="ScheduledDataflow", inversedBy="jobs")
* @ORM\JoinColumn(nullable=true)
* @var int|null
*/
private $scheduledDataflow;
private $scheduledDataflowId;
/**
* @var int|null
*
* @ORM\Column(type="integer", nullable=true)
*/
private $count;
/**
* @var array|null
*
* @ORM\Column(type="json", nullable=true)
*/
private $exceptions;
/**
* @var \DateTimeInterface|null
*
* @ORM\Column(type="datetime", nullable=true)
*/
private $startTime;
/**
* @var \DateTimeInterface|null
*
* @ORM\Column(type="datetime", nullable=true)
*/
private $endTime;
/**
* @param ScheduledDataflow $scheduled
*
* @return Job
*/
public static function createFromScheduledDataflow(ScheduledDataflow $scheduled): self
@@ -111,31 +109,72 @@ class Job
->setOptions($scheduled->getOptions())
->setRequestedDate(clone $scheduled->getNext())
->setLabel($scheduled->getLabel())
->setScheduledDataflow($scheduled)
;
->setScheduledDataflowId($scheduled->getId());
}
/**
* @return int
*/
public function getId(): int
public function __construct()
{
$this->count = 0;
$this->status = static::STATUS_PENDING;
}
public static function createFromArray(array $datas)
{
$lost = array_diff(static::KEYS, array_keys($datas));
if (count($lost) > 0) {
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ', $lost).'"');
}
$job = new self();
$job->id = null === $datas['id'] ? null : (int) $datas['id'];
$job->setStatus(null === $datas['status'] ? null : (int) $datas['status']);
$job->setLabel($datas['label']);
$job->setDataflowType($datas['dataflow_type']);
$job->setOptions($datas['options']);
$job->setRequestedDate($datas['requested_date']);
$job->setScheduledDataflowId(null === $datas['scheduled_dataflow_id'] ? null : (int) $datas['scheduled_dataflow_id']);
$job->setCount(null === $datas['count'] ? null : (int) $datas['count']);
$job->setExceptions($datas['exceptions']);
$job->setStartTime($datas['start_time']);
$job->setEndTime($datas['end_time']);
return $job;
}
public function toArray(): array
{
return [
'id' => $this->getId(),
'status' => $this->getStatus(),
'label' => $this->getLabel(),
'dataflow_type' => $this->getDataflowType(),
'options' => $this->getOptions(),
'requested_date' => $this->getRequestedDate(),
'scheduled_dataflow_id' => $this->getScheduledDataflowId(),
'count' => $this->getCount(),
'exceptions' => $this->getExceptions(),
'start_time' => $this->getStartTime(),
'end_time' => $this->getEndTime(),
];
}
public function setId(int $id): Job
{
$this->id = $id;
return $this;
}
public function getId(): ?int
{
return $this->id;
}
/**
* @return int
*/
public function getStatus(): int
{
return $this->status;
}
/**
* @param int $status
*
* @return Job
*/
public function setStatus(int $status): Job
{
$this->status = $status;
@@ -143,19 +182,11 @@ class Job
return $this;
}
/**
* @return string|null
*/
public function getLabel(): ?string
{
return $this->label;
}
/**
* @param string|null $label
*
* @return Job
*/
public function setLabel(?string $label): Job
{
$this->label = $label;
@@ -163,19 +194,11 @@ class Job
return $this;
}
/**
* @return string|null
*/
public function getDataflowType(): ?string
{
return $this->dataflowType;
}
/**
* @param string|null $dataflowType
*
* @return Job
*/
public function setDataflowType(?string $dataflowType): Job
{
$this->dataflowType = $dataflowType;
@@ -183,19 +206,11 @@ class Job
return $this;
}
/**
* @return array|null
*/
public function getOptions(): ?array
{
return $this->options;
}
/**
* @param array|null $options
*
* @return Job
*/
public function setOptions(?array $options): Job
{
$this->options = $options;
@@ -203,19 +218,11 @@ class Job
return $this;
}
/**
* @return \DateTimeInterface|null
*/
public function getRequestedDate(): ?\DateTimeInterface
{
return $this->requestedDate;
}
/**
* @param \DateTimeInterface|null $requestedDate
*
* @return Job
*/
public function setRequestedDate(?\DateTimeInterface $requestedDate): Job
{
$this->requestedDate = $requestedDate;
@@ -223,39 +230,23 @@ class Job
return $this;
}
/**
* @return ScheduledDataflow|null
*/
public function getScheduledDataflow(): ?ScheduledDataflow
public function getScheduledDataflowId(): ?int
{
return $this->scheduledDataflow;
return $this->scheduledDataflowId;
}
/**
* @param ScheduledDataflow|null $scheduledDataflow
*
* @return Job
*/
public function setScheduledDataflow(?ScheduledDataflow $scheduledDataflow): Job
public function setScheduledDataflowId(?int $scheduledDataflowId): Job
{
$this->scheduledDataflow = $scheduledDataflow;
$this->scheduledDataflowId = $scheduledDataflowId;
return $this;
}
/**
* @return int|null
*/
public function getCount(): ?int
{
return $this->count;
}
/**
* @param int|null $count
*
* @return Job
*/
public function setCount(?int $count): Job
{
$this->count = $count;
@@ -263,19 +254,11 @@ class Job
return $this;
}
/**
* @return array|null
*/
public function getExceptions(): ?array
{
return $this->exceptions;
}
/**
* @param array|null $exceptions
*
* @return Job
*/
public function setExceptions(?array $exceptions): Job
{
$this->exceptions = $exceptions;
@@ -283,19 +266,11 @@ class Job
return $this;
}
/**
* @return \DateTimeInterface|null
*/
public function getStartTime(): ?\DateTimeInterface
{
return $this->startTime;
}
/**
* @param \DateTimeInterface|null $startTime
*
* @return Job
*/
public function setStartTime(?\DateTimeInterface $startTime): Job
{
$this->startTime = $startTime;
@@ -303,19 +278,11 @@ class Job
return $this;
}
/**
* @return \DateTimeInterface|null
*/
public function getEndTime(): ?\DateTimeInterface
{
return $this->endTime;
}
/**
* @param \DateTimeInterface|null $endTime
*
* @return Job
*/
public function setEndTime(?\DateTimeInterface $endTime): Job
{
$this->endTime = $endTime;

View File

@@ -5,13 +5,12 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Entity;
use CodeRhapsodie\DataflowBundle\Validator\Constraints\Frequency;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Validator\Constraints as Asserts;
/**
* Schedule for a regular execution of a dataflow.
*
* @ORM\Entity(repositoryClass="CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository")
* @ORM\Table(name="cr_dataflow_scheduled")
* @codeCoverageIgnore
*/
class ScheduledDataflow
{
@@ -22,88 +21,104 @@ class ScheduledDataflow
'1 month',
];
private const KEYS = ['id', 'label', 'dataflow_type', 'options', 'frequency', 'next', 'enabled'];
/**
* @var int
*
* @ORM\Id()
* @ORM\Column(name="id", type="integer")
* @ORM\GeneratedValue(strategy="AUTO")
* @var int|null
*/
private $id;
/**
* @var string|null
*
* @ORM\Column(type="string")
* @Asserts\NotBlank()
* @Asserts\Length(min=1, max=255)
* @Asserts\Regex("#^[[:alnum:] ]+\z#u")
*/
private $label;
/**
* @var string|null
*
* @ORM\Column(type="string")
* @Asserts\NotBlank()
* @Asserts\Length(min=1, max=255)
* @Asserts\Regex("#^[[:alnum:]\\]+\z#u")
*/
private $dataflowType;
/**
* @var array|null
*
* @ORM\Column(type="json")
*/
private $options;
/**
* @var string|null
*
* @ORM\Column(type="string")
*
* @Asserts\NotBlank()
* @Frequency()
*/
private $frequency;
/**
* @var \DateTimeInterface|null
*
* @ORM\Column(type="datetime", nullable=true)
*/
private $next;
/**
* @var bool|null
*
* @ORM\Column(type="boolean")
*/
private $enabled;
/**
* @var Job[]
*
* @ORM\OneToMany(targetEntity="Job", mappedBy="scheduledDataflow", cascade={"persist", "remove"})
* @ORM\OrderBy({"startTime" = "DESC"})
*/
private $jobs;
public static function createFromArray(array $datas)
{
$lost = array_diff(static::KEYS, array_keys($datas));
if (count($lost) > 0) {
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ', $lost).'"');
}
/**
* @return int
*/
public function getId(): int
$scheduledDataflow = new self();
$scheduledDataflow->id = null === $datas['id'] ? null : (int) $datas['id'];
$scheduledDataflow->setLabel($datas['label']);
$scheduledDataflow->setDataflowType($datas['dataflow_type']);
$scheduledDataflow->setOptions($datas['options']);
$scheduledDataflow->setFrequency($datas['frequency']);
$scheduledDataflow->setNext($datas['next']);
$scheduledDataflow->setEnabled(null === $datas['enabled'] ? null : (bool) $datas['enabled']);
return $scheduledDataflow;
}
public function toArray(): array
{
return [
'id' => $this->getId(),
'label' => $this->getLabel(),
'dataflow_type' => $this->getDataflowType(),
'options' => $this->getOptions(),
'frequency' => $this->getFrequency(),
'next' => $this->getNext(),
'enabled' => $this->getEnabled(),
];
}
public function setId(int $id): ScheduledDataflow
{
$this->id = $id;
return $this;
}
public function getId(): ?int
{
return $this->id;
}
/**
* @return string|null
*/
public function getLabel(): ?string
{
return $this->label;
}
/**
* @param string|null $label
*
* @return ScheduledDataflow
*/
public function setLabel(?string $label): ScheduledDataflow
{
$this->label = $label;
@@ -111,19 +126,11 @@ class ScheduledDataflow
return $this;
}
/**
* @return string|null
*/
public function getDataflowType(): ?string
{
return $this->dataflowType;
}
/**
* @param string|null $dataflowType
*
* @return ScheduledDataflow
*/
public function setDataflowType(?string $dataflowType): ScheduledDataflow
{
$this->dataflowType = $dataflowType;
@@ -131,19 +138,11 @@ class ScheduledDataflow
return $this;
}
/**
* @return array|null
*/
public function getOptions(): ?array
{
return $this->options;
}
/**
* @param array|null $options
*
* @return ScheduledDataflow
*/
public function setOptions(?array $options): ScheduledDataflow
{
$this->options = $options;
@@ -151,19 +150,11 @@ class ScheduledDataflow
return $this;
}
/**
* @return string|null
*/
public function getFrequency(): ?string
{
return $this->frequency;
}
/**
* @param string|null $frequency
*
* @return ScheduledDataflow
*/
public function setFrequency(?string $frequency): ScheduledDataflow
{
$this->frequency = $frequency;
@@ -171,19 +162,11 @@ class ScheduledDataflow
return $this;
}
/**
* @return \DateTimeInterface|null
*/
public function getNext(): ?\DateTimeInterface
{
return $this->next;
}
/**
* @param \DateTimeInterface|null $next
*
* @return ScheduledDataflow
*/
public function setNext(?\DateTimeInterface $next): ScheduledDataflow
{
$this->next = $next;
@@ -191,19 +174,11 @@ class ScheduledDataflow
return $this;
}
/**
* @return bool|null
*/
public function getEnabled(): ?bool
{
return $this->enabled;
}
/**
* @param bool|null $enabled
*
* @return ScheduledDataflow
*/
public function setEnabled(?bool $enabled): ScheduledDataflow
{
$this->enabled = $enabled;

20
src/Event/CrEvent.php Normal file
View File

@@ -0,0 +1,20 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Event;
/*
* @codeCoverageIgnore
*/
if (class_exists('Symfony\Contracts\EventDispatcher\Event')) {
// For Symfony 5.0+
abstract class CrEvent extends \Symfony\Contracts\EventDispatcher\Event
{
}
} else {
// For Symfony 3.4 to 4.4
abstract class CrEvent extends \Symfony\Component\EventDispatcher\Event
{
}
}

View File

@@ -5,29 +5,25 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Event;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use Symfony\Component\EventDispatcher\Event;
/**
* Event used during the dataflow lifecycle.
*
* @codeCoverageIgnore
*/
class ProcessingEvent extends Event
class ProcessingEvent extends CrEvent
{
/** @var Job */
private $job;
/**
* ProcessingEvent constructor.
*
* @param Job $job
*/
public function __construct(Job $job)
{
$this->job = $job;
}
/**
* @return Job
*/
public function getJob(): Job
{
return $this->job;

View File

@@ -0,0 +1,12 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Exceptions;
/**
* Exception thrown when a writer receives an item of an unsupported type.
*/
class UnsupportedItemTypeException extends \Exception
{
}

View File

@@ -0,0 +1,35 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Factory;
use Symfony\Component\DependencyInjection\Container;
/**
* Class ConnectionFactory.
*
* @codeCoverageIgnore
*/
class ConnectionFactory
{
private $connectionName;
private $container;
public function __construct(Container $container, string $connectionName)
{
$this->connectionName = $connectionName;
$this->container = $container;
}
public function setConnectionName(string $connectionName)
{
$this->connectionName = $connectionName;
}
public function getConnection(): \Doctrine\DBAL\Driver\Connection
{
return $this->container->get(sprintf('doctrine.dbal.%s_connection', $this->connectionName));
}
}

View File

@@ -4,29 +4,29 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Manager;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Doctrine\ORM\EntityManagerInterface;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Doctrine\DBAL\Driver\Connection;
/**
* Handles scheduled dataflows execution dates based on their frequency.
*/
class ScheduledDataflowManager implements ScheduledDataflowManagerInterface
{
/** @var EntityManagerInterface */
private $em;
/** @var ScheduledDataflowRepository */
private $scheduledDataflowRepository;
/** @var JobRepository */
private $jobRepository;
public function __construct(EntityManagerInterface $em, ScheduledDataflowRepository $scheduledDataflowRepository, JobRepository $jobRepository)
/** @var Connection */
private $connection;
public function __construct(Connection $connection, ScheduledDataflowRepository $scheduledDataflowRepository, JobRepository $jobRepository)
{
$this->em = $em;
$this->connection = $connection;
$this->scheduledDataflowRepository = $scheduledDataflowRepository;
$this->jobRepository = $jobRepository;
}
@@ -36,21 +36,23 @@ class ScheduledDataflowManager implements ScheduledDataflowManagerInterface
*/
public function createJobsFromScheduledDataflows(): void
{
foreach ($this->scheduledDataflowRepository->findReadyToRun() as $scheduled) {
if (null !== $this->jobRepository->findPendingForScheduledDataflow($scheduled)) {
continue;
$this->connection->beginTransaction();
try {
foreach ($this->scheduledDataflowRepository->findReadyToRun() as $scheduled) {
if (null !== $this->jobRepository->findPendingForScheduledDataflow($scheduled)) {
continue;
}
$this->createPendingForScheduled($scheduled);
$this->updateScheduledDataflowNext($scheduled);
}
$this->createPendingForScheduled($scheduled);
$this->updateScheduledDataflowNext($scheduled);
} catch (\Throwable $e) {
$this->connection->rollBack();
throw $e;
}
$this->em->flush();
$this->connection->commit();
}
/**
* @param ScheduledDataflow $scheduled
*/
private function updateScheduledDataflowNext(ScheduledDataflow $scheduled): void
{
$interval = \DateInterval::createFromDateString($scheduled->getFrequency());
@@ -62,13 +64,11 @@ class ScheduledDataflowManager implements ScheduledDataflowManagerInterface
}
$scheduled->setNext($next);
$this->scheduledDataflowRepository->save($scheduled);
}
/**
* @param ScheduledDataflow $scheduled
*/
private function createPendingForScheduled(ScheduledDataflow $scheduled): void
{
$this->em->persist(Job::createFromScheduledDataflow($scheduled));
$this->jobRepository->save(Job::createFromScheduledDataflow($scheduled));
}
}

View File

@@ -4,8 +4,8 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Registry;
use CodeRhapsodie\DataflowBundle\Exceptions\UnknownDataflowTypeException;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowTypeInterface;
use CodeRhapsodie\DataflowBundle\Exceptions\UnknownDataflowTypeException;
/**
* Array based dataflow types registry.
@@ -31,7 +31,7 @@ class DataflowTypeRegistry implements DataflowTypeRegistryInterface
return $this->aliasesRegistry[$fqcnOrAlias];
}
throw new UnknownDataflowTypeException();
throw new UnknownDataflowTypeException($fqcnOrAlias);
}
/**

View File

@@ -13,10 +13,6 @@ interface DataflowTypeRegistryInterface
{
/**
* Get a registered dataflow type from its FQCN or one of its aliases.
*
* @param string $fqcnOrAlias
*
* @return DataflowTypeInterface
*/
public function getDataflowType(string $fqcnOrAlias): DataflowTypeInterface;
@@ -29,8 +25,6 @@ interface DataflowTypeRegistryInterface
/**
* Registers a dataflow type.
*
* @param DataflowTypeInterface $dataflowType
*/
public function registerDataflowType(DataflowTypeInterface $dataflowType): void;
}

View File

@@ -0,0 +1,45 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Repository;
/**
* @codeCoverageIgnore
*/
trait InitFromDbTrait
{
private function initDateTime(array $datas): array
{
foreach (static::FIELDS_TYPE as $key => $type) {
if ('datetime' === $type && null !== $datas[$key]) {
$datas[$key] = new \DateTime($datas[$key]);
}
}
return $datas;
}
private function initArray(array $datas): array
{
if (!is_array($datas['options'])) {
$datas['options'] = $this->strToArray($datas['options']);
}
if (array_key_exists('exceptions', $datas) && !is_array($datas['exceptions'])) {
$datas['exceptions'] = $this->strToArray($datas['exceptions']);
}
return $datas;
}
private function strToArray($value): array
{
if (null === $value) {
return [];
}
$array = json_decode($value, true);
return (false === $array) ? [] : $array;
}
}

View File

@@ -4,62 +4,174 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Repository;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use Doctrine\Common\Collections\Criteria;
use Doctrine\ORM\EntityRepository;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use Doctrine\DBAL\Driver\Connection;
use Doctrine\DBAL\Query\QueryBuilder;
/**
* Repository.
*
* @codeCoverageIgnore
*/
class JobRepository extends EntityRepository
class JobRepository
{
use InitFromDbTrait;
public const TABLE_NAME = 'cr_dataflow_job';
private const FIELDS_TYPE = [
'id' => \PDO::PARAM_INT,
'status' => \PDO::PARAM_INT,
'label' => \PDO::PARAM_STR,
'dataflow_type' => \PDO::PARAM_STR,
'options' => \PDO::PARAM_STR,
'requested_date' => 'datetime',
'scheduled_dataflow_id' => \PDO::PARAM_INT,
'count' => \PDO::PARAM_INT,
'exceptions' => \PDO::PARAM_STR,
'start_time' => 'datetime',
'end_time' => 'datetime',
];
/**
* @var \Doctrine\DBAL\Connection
*/
private $connection;
/**
* JobRepository constructor.
*/
public function __construct(Connection $connection)
{
$this->connection = $connection;
}
public function find(int $jobId)
{
$qb = $this->createQueryBuilder();
$qb
->andWhere($qb->expr()->eq('id', $qb->createNamedParameter($jobId, \PDO::PARAM_INT)))
;
return $this->returnFirstOrNull($qb);
}
public function findOneshotDataflows(): iterable
{
return $this->findBy([
'scheduledDataflow' => null,
'status' => Job::STATUS_PENDING,
]);
$qb = $this->createQueryBuilder();
$qb
->andWhere($qb->expr()->isNull('scheduled_dataflow_id'))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, \PDO::PARAM_INT)));
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
yield Job::createFromArray($this->initDateTime($this->initArray($row)));
}
}
public function findPendingForScheduledDataflow(ScheduledDataflow $scheduled): ?Job
{
return $this->findOneBy([
'scheduledDataflow' => $scheduled->getId(),
'status' => Job::STATUS_PENDING,
]);
$qb = $this->createQueryBuilder();
$qb
->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($scheduled->getId(), \PDO::PARAM_INT)))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, \PDO::PARAM_INT)));
return $this->returnFirstOrNull($qb);
}
public function findNextPendingDataflow(): ?Job
{
$criteria = (new Criteria())
->where(Criteria::expr()->lte('requestedDate', new \DateTime()))
->andWhere(Criteria::expr()->eq('status', Job::STATUS_PENDING))
->orderBy(['requestedDate' => Criteria::ASC])
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->lte('requested_date', $qb->createNamedParameter(new \DateTime(), 'datetime')))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, \PDO::PARAM_INT)))
->orderBy('requested_date', 'ASC')
->setMaxResults(1)
;
return $this->matching($criteria)->first() ?: null;
return $this->returnFirstOrNull($qb);
}
public function findLastForDataflowId(int $dataflowId): ?Job
{
return $this->findOneBy(['scheduledDataflow' => $dataflowId], ['requestedDate' => 'desc']);
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($dataflowId, \PDO::PARAM_INT)))
->orderBy('requested_date', 'DESC')
->setMaxResults(1)
;
return $this->returnFirstOrNull($qb);
}
public function findLatests(): iterable
{
return $this->findBy([], ['requestedDate' => 'desc'], 20);
$qb = $this->createQueryBuilder();
$qb
->orderBy('requested_date', 'DESC')
->setMaxResults(20);
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
yield Job::createFromArray($row);
}
}
public function findForScheduled(int $id): iterable
{
return $this->findBy(['scheduledDataflow' => $id], ['requestedDate' => 'desc'], 20);
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($id, \PDO::PARAM_INT)))
->orderBy('requested_date', 'DESC')
->setMaxResults(20);
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
yield Job::createFromArray($row);
}
}
public function save(Job $job)
{
$this->_em->persist($job);
$this->_em->flush();
$datas = $job->toArray();
unset($datas['id']);
if (is_array($datas['options'])) {
$datas['options'] = json_encode($datas['options']);
}
if (is_array($datas['exceptions'])) {
$datas['exceptions'] = json_encode($datas['exceptions']);
}
if (null === $job->getId()) {
$this->connection->insert(static::TABLE_NAME, $datas, static::FIELDS_TYPE);
$job->setId((int) $this->connection->lastInsertId());
return;
}
$this->connection->update(static::TABLE_NAME, $datas, ['id' => $job->getId()], static::FIELDS_TYPE);
}
public function createQueryBuilder($alias = null): QueryBuilder
{
$qb = $this->connection->createQueryBuilder();
$qb->select('*')
->from(static::TABLE_NAME, $alias);
return $qb;
}
private function returnFirstOrNull(QueryBuilder $qb): ?Job
{
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return null;
}
return Job::createFromArray($this->initDateTime($this->initArray($stmt->fetch(\PDO::FETCH_ASSOC))));
}
}

View File

@@ -5,14 +5,42 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Repository;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use Doctrine\Common\Collections\Criteria;
use Doctrine\ORM\EntityRepository;
use Doctrine\DBAL\Driver\Connection;
use Doctrine\DBAL\Query\QueryBuilder;
/**
* Repository for the ScheduledDataflow entity.
*
* @codeCoverageIgnore
*/
class ScheduledDataflowRepository extends EntityRepository
class ScheduledDataflowRepository
{
use InitFromDbTrait;
public const TABLE_NAME = 'cr_dataflow_scheduled';
private const FIELDS_TYPE = [
'id' => \PDO::PARAM_INT,
'label' => \PDO::PARAM_STR,
'dataflow_type' => \PDO::PARAM_STR,
'options' => \PDO::PARAM_STR,
'frequency' => \PDO::PARAM_STR,
'next' => 'datetime',
'enabled' => \PDO::PARAM_BOOL,
];
/**
* @var \Doctrine\DBAL\Connection
*/
private $connection;
/**
* JobRepository constructor.
*/
public function __construct(Connection $connection)
{
$this->connection = $connection;
}
/**
* Finds all enabled scheduled dataflows with a passed next run date.
*
@@ -20,42 +48,105 @@ class ScheduledDataflowRepository extends EntityRepository
*/
public function findReadyToRun(): iterable
{
$criteria = (new Criteria())
->where(Criteria::expr()->lte('next', new \DateTime()))
->andWhere(Criteria::expr()->eq('enabled', 1))
->orderBy(['next' => Criteria::ASC])
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->lte('next', $qb->createNamedParameter(new \DateTime(), 'datetime')))
->andWhere($qb->expr()->eq('enabled', 1))
->orderBy('next', 'ASC')
;
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
yield ScheduledDataflow::createFromArray($this->initDateTime($this->initArray($row)));
}
}
public function find(int $scheduleId): ?ScheduledDataflow
{
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->eq('id', $qb->createNamedParameter($scheduleId, \PDO::PARAM_INT)))
->setMaxResults(1)
;
return $this->matching($criteria);
return $this->returnFirstOrNull($qb);
}
public function findAllOrderedByLabel(): iterable
{
return $this->findBy([], ['label' => 'asc']);
$qb = $this->createQueryBuilder();
$qb->orderBy('label', 'ASC');
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
yield ScheduledDataflow::createFromArray($this->initDateTime($this->initOptions($row)));
}
}
public function listAllOrderedByLabel(): array
{
$query = $this->createQueryBuilder('w')
->select('w.id', 'w.label', 'w.enabled', 'w.next', 'max(j.startTime) as startTime')
->leftJoin('w.jobs', 'j')
$query = $this->connection->createQueryBuilder()
->from(static::TABLE_NAME, 'w')
->select('w.id', 'w.label', 'w.enabled', 'w.next', 'max(j.start_time) as startTime')
->leftJoin('w', JobRepository::TABLE_NAME, 'j', 'j.scheduled_dataflow_id = w.id')
->orderBy('w.label', 'ASC')
->groupBy('w.id');
return $query->getQuery()->execute();
return $query->execute()->fetchAll(\PDO::FETCH_ASSOC);
}
public function save(ScheduledDataflow $scheduledDataflow)
{
$this->_em->persist($scheduledDataflow);
$this->_em->flush();
$datas = $scheduledDataflow->toArray();
unset($datas['id']);
if (is_array($datas['options'])) {
$datas['options'] = json_encode($datas['options']);
}
if (null === $scheduledDataflow->getId()) {
$this->connection->insert(static::TABLE_NAME, $datas, static::FIELDS_TYPE);
$scheduledDataflow->setId((int) $this->connection->lastInsertId());
return;
}
$this->connection->update(static::TABLE_NAME, $datas, ['id' => $scheduledDataflow->getId()], static::FIELDS_TYPE);
}
public function delete(int $id): void
{
$dataflow = $this->find($id);
$this->connection->beginTransaction();
try {
$this->connection->delete(JobRepository::TABLE_NAME, ['scheduled_dataflow_id' => $id]);
$this->connection->delete(static::TABLE_NAME, ['id' => $id]);
} catch (\Throwable $e) {
$this->connection->rollBack();
throw $e;
}
$this->_em->remove($dataflow);
$this->_em->flush();
$this->connection->commit();
}
public function createQueryBuilder($alias = null): QueryBuilder
{
$qb = $this->connection->createQueryBuilder();
$qb->select('*')
->from(static::TABLE_NAME, $alias);
return $qb;
}
private function returnFirstOrNull(QueryBuilder $qb): ?ScheduledDataflow
{
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return null;
}
return ScheduledDataflow::createFromArray($this->initDateTime($this->initArray($stmt->fetch(\PDO::FETCH_ASSOC))));
}
}

View File

@@ -10,53 +10,74 @@ services:
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
$scheduledDataflowRepository: '@CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository'
$validator: '@validator'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\ChangeScheduleStatusCommand:
arguments:
$scheduledDataflowRepository: '@CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\ExecuteDataflowCommand:
arguments:
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
$logger: '@logger'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\JobShowCommand:
arguments:
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\RunPendingDataflowsCommand:
arguments:
$manager: '@CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManagerInterface'
$runner: '@CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\ScheduleListCommand:
arguments:
$scheduledDataflowRepository: '@CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\SchemaCommand:
arguments:
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository:
factory: ['@doctrine.orm.default_entity_manager', 'getRepository']
arguments: ['CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow']
lazy: true
arguments: ['@coderhapsodie.dataflow.connection']
CodeRhapsodie\DataflowBundle\Repository\JobRepository:
factory: ['@doctrine.orm.default_entity_manager', 'getRepository']
arguments: ['CodeRhapsodie\DataflowBundle\Entity\Job']
lazy: true
arguments: ['@coderhapsodie.dataflow.connection']
coderhapsodie.dataflow.connection: "@coderhapsodie.dataflow.connection.internal"
coderhapsodie.dataflow.connection.internal:
lazy: true
class: Doctrine\DBAL\Connection
factory: ['@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory', 'getConnection']
CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory:
arguments: ['@service_container', '%coderhapsodie.dataflow.dbal_default_connection%']
CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManagerInterface: '@CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManager'
CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManager:
arguments:
$em: '@doctrine.orm.default_entity_manager'
$connection: '@coderhapsodie.dataflow.connection'
$scheduledDataflowRepository: '@CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository'
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface: '@CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner'
CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner:
arguments:
$em: '@doctrine.orm.default_entity_manager'
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
$dispatcher: '@event_dispatcher'

Binary file not shown.

After

Width:  |  Height:  |  Size: 130 KiB

View File

@@ -10,14 +10,10 @@ use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class PendingDataflowRunner implements PendingDataflowRunnerInterface
{
/** @var EntityManagerInterface */
private $em;
/** @var JobRepository */
private $repository;
@@ -27,9 +23,8 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
/** @var EventDispatcherInterface */
private $dispatcher;
public function __construct(EntityManagerInterface $em, JobRepository $repository, DataflowTypeRegistryInterface $registry, EventDispatcherInterface $dispatcher)
public function __construct(JobRepository $repository, DataflowTypeRegistryInterface $registry, EventDispatcherInterface $dispatcher)
{
$this->em = $em;
$this->repository = $repository;
$this->registry = $registry;
$this->dispatcher = $dispatcher;
@@ -50,24 +45,22 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
}
}
/**
* @param Job $job
*/
private function beforeProcessing(Job $job): void
{
$this->dispatcher->dispatch(Events::BEFORE_PROCESSING, new ProcessingEvent($job));
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher->dispatch(Events::BEFORE_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::BEFORE_PROCESSING);
}
$job
->setStatus(Job::STATUS_RUNNING)
->setStartTime(new \DateTime())
;
$this->em->flush();
$this->repository->save($job);
}
/**
* @param Job $job
* @param Result $result
*/
private function afterProcessing(Job $job, Result $result): void
{
$exceptions = [];
@@ -82,8 +75,13 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
->setCount($result->getSuccessCount())
->setExceptions($exceptions)
;
$this->em->flush();
$this->repository->save($job);
$this->dispatcher->dispatch(Events::AFTER_PROCESSING, new ProcessingEvent($job));
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher->dispatch(Events::AFTER_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING);
}
}
}

View File

@@ -0,0 +1,54 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\SchemaProvider;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Doctrine\DBAL\Schema\Schema;
/**
* Class JobSchemaProvider.
*
* @codeCoverageIgnore
*/
class DataflowSchemaProvider
{
public function createSchema()
{
$schema = new Schema();
$tableJob = $schema->createTable(JobRepository::TABLE_NAME);
$tableJob->addColumn('id', 'integer', [
'autoincrement' => true,
]);
$tableJob->setPrimaryKey(['id']);
$tableJob->addColumn('scheduled_dataflow_id', 'integer', ['notnull' => false]);
$tableJob->addColumn('status', 'integer', ['notnull' => true]);
$tableJob->addColumn('label', 'string', ['notnull' => true, 'length' => 255]);
$tableJob->addColumn('dataflow_type', 'string', ['notnull' => true, 'length' => 255]);
$tableJob->addColumn('options', 'json', ['notnull' => true]);
$tableJob->addColumn('requested_date', 'datetime', ['notnull' => false]);
$tableJob->addColumn('count', 'integer', ['notnull' => false]);
$tableJob->addColumn('exceptions', 'json', ['notnull' => false]);
$tableJob->addColumn('start_time', 'datetime', ['notnull' => false]);
$tableJob->addColumn('end_time', 'datetime', ['notnull' => false]);
$tableSchedule = $schema->createTable(ScheduledDataflowRepository::TABLE_NAME);
$tableSchedule->addColumn('id', 'integer', [
'autoincrement' => true,
]);
$tableSchedule->setPrimaryKey(['id']);
$tableSchedule->addColumn('label', 'string', ['notnull' => true, 'length' => 255]);
$tableSchedule->addColumn('dataflow_type', 'string', ['notnull' => true, 'length' => 255]);
$tableSchedule->addColumn('options', 'json', ['notnull' => true]);
$tableSchedule->addColumn('frequency', 'string', ['notnull' => true, 'length' => 255]);
$tableSchedule->addColumn('next', 'datetime', ['notnull' => false]);
$tableSchedule->addColumn('enabled', 'boolean', ['notnull' => true]);
$tableJob->addForeignKeyConstraint($tableSchedule, ['scheduled_dataflow_id'], ['id']);
return $schema;
}
}

View File

@@ -8,6 +8,8 @@ use Symfony\Component\Validator\Constraint;
/**
* @Annotation
*
* @codeCoverageIgnore
*/
class Frequency extends Constraint
{