24 Commits

Author SHA1 Message Date
jeremycr
806e5bb90f Fixed next execution for scheduled dataflows not increasing (#36) 2019-11-22 14:15:29 +01:00
jbcr
d18494212d fix example (#33)
* fix example
2019-11-19 08:18:56 +01:00
Olivier PORTIER
fbc4a20b57 Mise à jour de la commande d'installation code-rhapsodie/dataflow-bundle (#31) 2019-11-18 16:47:30 +01:00
jbcr
099cdd6579 replace dependency to doctrine/orm by doctrine/dbal (#30) 2019-11-08 16:39:09 +01:00
jbcr
206eeae297 add depentency to doctrine/doctrine-bundle (#29) 2019-11-08 14:57:52 +01:00
jbcr
c85c74fe7a make coderhapsodie.dataflow.connection as alias (#28) 2019-11-08 14:10:50 +01:00
jbcr
015e25beff fix command dependency injection (#27) 2019-11-08 11:18:22 +01:00
jbcr
c1c8db7105 fix json_decode on null (#26)
* fix json_decode on null
2019-11-08 10:44:12 +01:00
jbcr
d10642add7 add docs for v2 #21 (#24)
* add docs for v2 #21
Co-Authored-By: jeremycr <32451794+jeremycr@users.noreply.github.com>
2019-11-08 10:43:54 +01:00
jbcr
318f844ccf refactor getQueryBuilder and change visibility (#25)
* refactor getQueryBuilder and change visibility
2019-11-08 08:27:17 +01:00
jbcr
1eedeceef8 Add schema provider (#23)
add schema provider and schema command #23
2019-11-07 16:04:54 +01:00
jbcr
164e68c8ef add connection option for all command #15 (#22)
add connection option for all command #15
2019-11-07 14:34:28 +01:00
jbcr
f444d4d8c0 add factory to determine the current connexion #14 (#20)
* add factory to determine the current connexion #14
2019-11-07 11:49:00 +01:00
jbcr
e78a918af1 add configuration #12 (#18)
* add configuration #12
2019-11-07 11:23:21 +01:00
jbcr
be4cfd00a1 remove ORM and rewrite repository (#17)
remove ORM and rewrite repository #17
2019-11-07 10:51:18 +01:00
jbcr
96dcf8935d change branch alias to start v2 dev (#10) 2019-11-05 15:44:55 +01:00
jeremycr
23801f8785 Added tests (#9) 2019-10-24 15:14:43 +02:00
jbcr
689d79535c add schema and some explaining text (#8)
* add schema and some explaining text

* Apply suggestions from code review

Co-Authored-By: jeremycr <32451794+jeremycr@users.noreply.github.com>

* fix doc

* Apply suggestions from code review

Co-Authored-By: SofLesc <SofLesc@users.noreply.github.com>

* fix recomends
2019-10-24 15:14:19 +02:00
jbcr
f17322bdd8 cascade remove (#7) 2019-10-17 16:33:53 +02:00
jbcr
90855562ca fix the branch alias for master (#5) 2019-10-15 16:33:19 +02:00
jbcr
776323118a add the required dependencie seld/signal-handler (#4)
* add the requied dependencie seld/signal-handler

* downgrade to 1.0 from 1.2
2019-10-15 15:30:48 +02:00
jbcr
8371e2d1a6 fix command name (#3) 2019-10-15 15:16:03 +02:00
jeremycr
426c7b4324 Added more examples and other improvements in the doc (#2)
* Added more examples and other improvements in the doc

* Fix typo

* Updated README

* Updated README

* Typos
2019-10-11 09:06:24 +02:00
Arnaud Lafon
567a70aa84 Travis and coveralls setup (#1)
* Initial Travis setup

* Downgraded to phpunit 7 because phpunit 8 does not support php 7.1

* Fixed multiple constraint error message issue

* phpcsfixer required in dev & first fix run

* Add coveralls support
2019-10-10 14:37:16 +02:00
41 changed files with 1277 additions and 310 deletions

3
.coveralls.yml Normal file
View File

@@ -0,0 +1,3 @@
service_name : travis-ci
coverage_clover: var/build/clover.xml
json_path : var/build/upload.json

111
.travis.yml Normal file
View File

@@ -0,0 +1,111 @@
language: php
sudo: false
cache:
directories:
- $HOME/.composer/cache
branches:
only:
- master
- /^\d+\.\d+$/
- travis-setup
env:
global:
- SYMFONY_DEPRECATIONS_HELPER="max[self]=0"
- PHPUNIT_FLAGS="-v"
- PHPUNIT_ENABLED="true"
- STABILITY=stable
- COVERALLS_ENABLED="false"
matrix:
fast_finish: true
include:
- php: '7.1'
- php: '7.2'
- php: '7.3'
# Enable code coverage with the latest supported PHP version
- php: '7.3'
env:
- COVERALLS_ENABLED="true"
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
# Minimum supported dependencies with the latest and oldest supported PHP versions
- php: '7.1'
env:
- COMPOSER_FLAGS="--prefer-lowest"
- php: '7.3'
env:
- COMPOSER_FLAGS="--prefer-lowest"
# Test each supported Symfony version with lowest supported PHP version
- php: '7.1'
env:
- SYMFONY_VERSION=3.4.*
- php: '7.1'
env:
- SYMFONY_VERSION=4.2.*
- php: '7.1'
env:
- SYMFONY_VERSION=4.3.*
# Test unsupported versions of Symfony
- php: '7.1'
env:
- SYMFONY_VERSION=4.0.*
- php: '7.1'
env:
- SYMFONY_VERSION=4.1.*
# Test upcoming Symfony versions with lowest supported PHP version and dev dependencies
- php: '7.1'
env:
- STABILITY=dev
- SYMFONY_VERSION=4.4.*
# Test upcoming PHP versions with dev dependencies
- php: '7.4snapshot'
env:
- STABILITY=dev
- COMPOSER_FLAGS="--ignore-platform-reqs --prefer-stable"
allow_failures:
- env:
- SYMFONY_VERSION=4.0.*
- env:
- SYMFONY_VERSION=4.1.*
- env:
- STABILITY=dev
- COMPOSER_FLAGS="--ignore-platform-reqs --prefer-stable"
- env:
- STABILITY=dev
- SYMFONY_VERSION=4.4.*
before_install:
- if [[ "$SYMFONY_VERSION" != "" ]]; then
travis_retry composer global require "symfony/flex:^1.4";
composer config extra.symfony.require $SYMFONY_VERSION;
fi
- if [[ "$STABILITY" != "stable" ]]; then
travis_retry composer config minimum-stability $STABILITY;
fi
- if [[ "$COVERALLS_ENABLED" != "true" ]]; then
phpenv config-rm xdebug.ini || true;
fi
- if [[ "$COVERALLS_ENABLED" == "true" ]]; then
travis_retry composer require --dev satooshi/php-coveralls:^2.0 --no-update $COMPOSER_FLAGS;
fi
install:
- travis_retry composer update --prefer-dist --no-interaction --no-suggest --no-progress --ansi $COMPOSER_FLAGS
script: ./vendor/bin/phpunit $PHPUNIT_FLAGS
after_success:
- if [[ "$PHPUNIT_ENABLED" == "true" && "$COVERALLS_ENABLED" == "true" ]]; then
./vendor/bin/php-coveralls -vvv --config .coveralls.yml;
fi;

23
CHANGELOG.md Normal file
View File

@@ -0,0 +1,23 @@
# Version 2.0.0
* Add Doctrine DBAL multi-connection support
* Add configuration to define the default Doctrine DBAL connection
* Remove Doctrine ORM
* Rewrite repositories
# Version 1.0.1
* Fix lost dependency
* Fix schedule removing
# Version 1.0.0
Initial version
* Define and configure a Dataflow
* Run the Job scheduled
* Run one Dataflow from the command line
* Define the schedule for a Dataflow from the command line
* Enable/Disable a scheduled Dataflow from the command line
* Display the list of scheduled Dataflow from the command line
* Display the result for the last Job for a Dataflow from the command line

203
README.md
View File

@@ -1,8 +1,26 @@
# Code-Rhapsodie Dataflow Bundle
# Code Rhapsodie Dataflow Bundle
DataflowBundle is a bundle for Symfony 3.4+
providing an easy way to create import / export dataflow.
[![Build Status](https://travis-ci.org/code-rhapsodie/dataflow-bundle.svg?branch=master)](https://travis-ci.org/code-rhapsodie/dataflow-bundle)
[![Coverage Status](https://coveralls.io/repos/github/code-rhapsodie/dataflow-bundle/badge.svg)](https://coveralls.io/github/code-rhapsodie/dataflow-bundle)
Dataflow uses a linear generic workflow in three parts:
* one reader
* any number of steps
* one or more writers
The reader can read data from anywhere and return data row by row. Each step processes the current row data.
The steps are executed in the order in which they are added.
And, one or more writers save the row anywhere you want.
As the following schema shows, you can define more than one dataflow:
![Dataflow schema](src/Resources/doc/schema.png)
# Features
* Define and configure a Dataflow
@@ -12,6 +30,7 @@ providing an easy way to create import / export dataflow.
* Enable/Disable a scheduled Dataflow from the command line
* Display the list of scheduled Dataflow from the command line
* Display the result for the last Job for a Dataflow from the command line
* Work with multiple Doctrine DBAL connections
## Installation
@@ -21,7 +40,7 @@ providing an easy way to create import / export dataflow.
To install this bundle, run this command :
```shell script
$ composer require code-rhapsodie/dataflow
$ composer require code-rhapsodie/dataflow-bundle
```
#### Suggest
@@ -80,42 +99,48 @@ public function registerBundles()
### Update the database
This bundle use Dotrine ORM for drive the database table for store Dataflow schedule (`cr_dataflow_scheduled`)
This bundle uses Doctrine DBAL to store Dataflow schedule into the database table (`cr_dataflow_scheduled`)
and jobs (`cr_dataflow_job`).
#### Doctrine migration
Execute the command for generate the migration for your database:
```shell script
$ bin/console doctrine:migration:diff
```
#### Other migration tools
If you use [Phinx](https://phinx.org/) or [Kaliop Migration Bundle](https://github.com/kaliop-uk/ezmigrationbundle) or whatever,
If you use [Doctrine Migration Bundle](https://symfony.com/doc/master/bundles/DoctrineMigrationsBundle/index.html) or [Phinx](https://phinx.org/)
or [Kaliop Migration Bundle](https://github.com/kaliop-uk/ezmigrationbundle) or whatever,
you can add a new migration with the generated SQL query from this command:
```shell script
$ bin/console doctrine:schema:update --dump-sql
$ bin/console code-rhapsodie:dataflow:dump-schema
```
If you have already the tables, you can add a new migration with the generated update SQL query from this command:
```shell script
$ bin/console code-rhapsodie:dataflow:dump-schema --update
```
## Configuration
By default, the Doctrine DBAL connection used is `default`. You can configure the default connection.
Add this configuration into your Symfony configuration:
```yaml
code_rhapsodie_dataflow:
dbal_default_connection: test #Name of the default connection used by Dataflow bundle
```
## Define a dataflow type
This bundle uses a fixed and simple workflow structure in order to let you focus on the data processing logic part of your dataflow.
A dataflow type defines the different parts of your dataflow. A dataflow is comprised of:
A dataflow type defines the different parts of your dataflow. A dataflow is made of:
- exactly one *Reader*
- any number of *Steps*
- one or more *Writers*
Dataflow types can be configured with options.
A dataflow type must implements `CodeRhapsodie\DataflowBundle\DataflowType\DataflowTypeInterface`.
A dataflow type must implement `CodeRhapsodie\DataflowBundle\DataflowType\DataflowTypeInterface`.
To help with creating your workflow types, an abstract class `CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType`
is provided, allowing you to define your dataflow through an handy builder `CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder`.
To help with creating your dataflow types, an abstract class `CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType`
is provided, allowing you to define your dataflow through a handy builder `CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder`.
This is an example to define one class DataflowType:
@@ -142,11 +167,11 @@ class MyFirstDataflowType extends AbstractDataflowType
protected function buildDataflow(DataflowBuilder $builder, array $options): void
{
$this->myReader->setFilename($options['fileName']);
$this->myWriter->setDestinationFilePath($options['to-file']);
$builder->setReader($this->myReader)
->addStep(function($data) use ($options) {
$builder
->setReader($this->myReader->read($options['from-file']))
->addStep(function ($data) use ($options) {
// TODO : Write your code here...
return $data;
})
@@ -156,11 +181,8 @@ class MyFirstDataflowType extends AbstractDataflowType
protected function configureOptions(OptionsResolver $optionsResolver): void
{
$optionsResolver->setDefaults([
'my_option' => 'my_default_value',
'fileName' => null,
]);
$optionsResolver->setRequired('fileName');
$optionsResolver->setDefaults(['to-file' => '/tmp/dataflow.csv', 'from-file' => null]);
$optionsResolver->setRequired('from-file');
}
public function getLabel(): string
@@ -176,8 +198,11 @@ class MyFirstDataflowType extends AbstractDataflowType
```
The `DataflowTypeInterface` is used by Symfony for auto-configuration our custom datafow type only if the folder is correctly configured (see the `services` configuration file in your projet).
If you don't use the auto-configuration, you must add this tag `coderhapsodie.dataflow.type` in your dataflow type service configuration:
Dataflow types must be tagged with `coderhapsodie.dataflow.type`.
If you're using Symfony auto-configuration for your services, this tag will be automatically added to all services implementing `DataflowTypeInterface`.
Otherwise, manually add the tag `coderhapsodie.dataflow.type` in your dataflow type service configuration:
```yaml
CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType:
@@ -185,9 +210,9 @@ If you don't use the auto-configuration, you must add this tag `coderhapsodie.da
- { name: coderhapsodie.dataflow.type }
```
### Use the options for your dataflow type
### Use options for your dataflow type
The `AbstractDataflowType` can help you define the options of your Datataflow type.
The `AbstractDataflowType` can help you define options for your Dataflow type.
Add this method in your DataflowType class:
@@ -201,11 +226,8 @@ class MyFirstDataflowType extends AbstractDataflowType
// ...
protected function configureOptions(OptionsResolver $optionsResolver): void
{
$optionsResolver->setDefaults([
'my_option' => 'my_default_value',
'fileName' => null,
]);
$optionsResolver->setRequired('fileName');
$optionsResolver->setDefaults(['to-file' => '/tmp/dataflow.csv', 'from-file' => null]);
$optionsResolver->setRequired('from-file');
}
}
@@ -213,7 +235,6 @@ class MyFirstDataflowType extends AbstractDataflowType
With this configuration, the option `fileName` is required. For an advanced usage of the option resolver, read the [Symfony documentation](https://symfony.com/doc/current/components/options_resolver.html).
### Check if your DataflowType is ready
Execute this command to check if your DataflowType is correctly registered:
@@ -239,9 +260,9 @@ Symfony Container Public and Private Services Tagged with "coderhapsodie.dataflo
### Readers
*Readers* provide the workflow with elements to import / export. Usually, elements are read from an external resource (file, database, webservice, etc).
*Readers* provide the dataflow with elements to import / export. Usually, elements are read from an external resource (file, database, webservice, etc).
A *Reader* must implements `Port\Reader` or return a `iterable` if you use the `Port\Reader\IteratorReader`.
A *Reader* can be any `iterable`.
The only constraint on the returned elements typing is that they cannot be `false`.
@@ -254,36 +275,27 @@ namespace CodeRhapsodie\DataflowExemple\Reader;
class FileReader
{
private $filename;
/**
* Set the filename option needed by the Reader.
*/
public function setFilename(string $filename) {
$this->filename = $filename;
}
public function __invoke(): iterable
public function read(string $filename): iterable
{
if (!$this->filename) {
if (!$filename) {
throw new \Exception("The file name is not defined. Define it with 'setFilename' method");
}
if (!$fh = fopen($this->filename, 'r')) {
throw new \Exception("Unable to open file '".$this->filename."' for read.");
if (!$fh = fopen($filename, 'r')) {
throw new \Exception("Unable to open file '".$filename."' for read.");
}
while (false === ($read = fread($fh, 1024))) {
yield explode("|", $read);
while (false !== ($read = fgets($fh))) {
yield explode('|', trim($read));
}
}
}
```
To setup your reader in the dataflow builder, you must use `Port\Reader\IteratorReader` like this
You can set up this reader as follows:
```php
$builder->setReader(new \Port\Reader\IteratorReader($this->myReader))
$builder->setReader(($this->myReader)())
```
@@ -291,18 +303,40 @@ $builder->setReader(new \Port\Reader\IteratorReader($this->myReader))
*Steps* are operations performed on the elements before they are handled by the *Writers*. Usually, steps are either:
- converters, that alter the element
- filters, that conditionally prevents further operations on the element
- filters, that conditionally prevent further operations on the element
A *Step* can be any callable, taking the element as its argument, and returning either:
- the element, possibly altered
- `false`, if no further operations should be performed on this element
A few examples:
```php
<?php
//[...]
$builder->addStep(function ($item) {
// Titles are changed to all caps before export
$item['title'] = strtoupper($item['title']);
return $item;
});
$builder->addStep(function ($item) {
// Private items are not exported
if ($item['private']) {
return false;
}
return $item;
});
//[...]
```
### Writers
*Writers* performs the actual import / export operations.
*Writers* perform the actual import / export operations.
A *Writer* must implements `CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface`.
A *Writer* must implement `CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface`.
As this interface is not compatible with `Port\Writer`, the adapter `CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter` is provided.
This example show how to use the predefined PhpPort Writer :
@@ -311,7 +345,7 @@ This example show how to use the predefined PhpPort Writer :
$builder->addWriter(new PortWriterAdapter(new \Port\FileWriter()));
```
Or you own Writer:
Or your own Writer:
```php
<?php
@@ -323,11 +357,20 @@ class FileWriter implements WriterInterface
{
private $fh;
/** @var string */
private $path;
public function setDestinationFilePath(string $path) {
$this->path = $path;
}
public function prepare()
{
if (!$this->fh = fopen('/path/to/file', 'w')) {
throw new \Exception("Unable to open in write mode the output file.");
if (null === $this->path) {
throw new \Exception('Define the destination file name before use');
}
if (!$this->fh = fopen($this->path, 'w')) {
throw new \Exception('Unable to open in write mode the output file.');
}
}
@@ -347,17 +390,17 @@ class FileWriter implements WriterInterface
All pending dataflow job processes are stored in a queue into the database.
Add this command into your crontab for execute all queued job:
Add this command into your crontab for execute all queued jobs:
```shell script
$ SYMFONY_ENV=prod php bin/console code-rhapsodie:dataflow:job:run-pending
$ SYMFONY_ENV=prod php bin/console code-rhapsodie:dataflow:run-pending
```
## Commands
Many commands are provided.
Several commands are provided to manage schedules and run jobs.
`code-rhapsodie:dataflow:job:run-pending` Executes job in the queue according to their schedule.
`code-rhapsodie:dataflow:run-pending` Executes job in the queue according to their schedule.
`code-rhapsodie:dataflow:schedule:list` Display the list of dataflows scheduled.
@@ -367,8 +410,30 @@ Many commands are provided.
`code-rhapsodie:dataflow:job:show` Display the last result of a job.
`code-rhapsodie:dataflow:execute` Lets you execute one dataflow job.
`code-rhapsodie:dataflow:execute` Let you execute one dataflow job.
`code-rhapsodie:dataflow:dump-schema` Generates schema create / update SQL queries
### Work with many databases
All commands have a `--connection` option to define what Doctrine DBAL connection to use during execution.
Example:
This command uses the `default` DBAL connection to generate all schema update queries.
```shell script
$ bin/console code-rhapsodie:dataflow:dump-schema --update --connection=default
```
To execute all pending job for a specific connection use:
```shell script
# Run for dataflow DBAL connection
$ bin/console code-rhapsodie:dataflow:run-pending --connection=dataflow
# Run for default DBAL connection
$ bin/console code-rhapsodie:dataflow:run-pending --connection=default
```
# Issues and feature requests

View File

@@ -0,0 +1,53 @@
<?php
namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder;
use PHPUnit\Framework\Constraint\IsIdentical;
use PHPUnit\Framework\TestCase;
use Symfony\Component\OptionsResolver\OptionsResolver;
class AbstractDataflowTypeTest extends TestCase
{
public function testProcess()
{
$label = 'Test label';
$options = ['testOption' => 'Test value'];
$values = [1, 2, 3];
$dataflowType = new class($label, $options, $values) extends AbstractDataflowType
{
private $label;
private $options;
private $values;
public function __construct(string $label, array $options, array $values)
{
$this->label = $label;
$this->options = $options;
$this->values = $values;
}
public function getLabel(): string
{
return $this->label;
}
protected function configureOptions(OptionsResolver $optionsResolver): void
{
$optionsResolver->setDefined('testOption');
}
protected function buildDataflow(DataflowBuilder $builder, array $options): void
{
$builder->setReader($this->values);
(new IsIdentical($this->options))->evaluate($options);
}
};
$result = $dataflowType->process($options);
$this->assertSame($label, $result->getName());
$this->assertSame(count($values), $result->getTotalProcessedCount());
}
}

View File

@@ -0,0 +1,37 @@
<?php
namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType\Writer;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter;
use PHPUnit\Framework\TestCase;
class PortWriterAdapterTest extends TestCase
{
public function testAll()
{
$value = 'not an array';
$writer = $this->getMockBuilder('\Port\Writer')
->setMethods(['prepare', 'finish', 'writeItem'])
->getMock()
;
$writer
->expects($this->once())
->method('prepare')
;
$writer
->expects($this->once())
->method('finish')
;
$writer
->expects($this->once())
->method('writeItem')
->with([$value])
;
$adapter = new PortWriterAdapter($writer);
$adapter->prepare();
$adapter->write($value);
$adapter->finish();
}
}

View File

@@ -9,6 +9,7 @@ use CodeRhapsodie\DataflowBundle\Exceptions\UnknownDataflowTypeException;
use CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManager;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Doctrine\DBAL\Driver\Connection;
use Doctrine\ORM\EntityManagerInterface;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
@@ -18,8 +19,8 @@ class ScheduledDataflowManagerTest extends TestCase
/** @var ScheduledDataflowManager */
private $manager;
/** @var EntityManagerInterface|MockObject */
private $em;
/** @var Connection|MockObject */
private $connection;
/** @var ScheduledDataflowRepository|MockObject */
private $scheduledDataflowRepository;
@@ -29,17 +30,18 @@ class ScheduledDataflowManagerTest extends TestCase
protected function setUp(): void
{
$this->em = $this->createMock(EntityManagerInterface::class);
$this->connection = $this->createMock(Connection::class);
$this->scheduledDataflowRepository = $this->createMock(ScheduledDataflowRepository::class);
$this->jobRepository = $this->createMock(JobRepository::class);
$this->manager = new ScheduledDataflowManager($this->em, $this->scheduledDataflowRepository, $this->jobRepository);
$this->manager = new ScheduledDataflowManager($this->connection, $this->scheduledDataflowRepository, $this->jobRepository);
}
public function testCreateJobsFromScheduledDataflows()
{
$scheduled1 = new ScheduledDataflow();
$scheduled2 = (new ScheduledDataflow())
->setId(-1)
->setDataflowType($type = 'testType')
->setOptions($options = ['opt' => 'val'])
->setNext($next = new \DateTime())
@@ -60,9 +62,13 @@ class ScheduledDataflowManagerTest extends TestCase
->willReturnOnConsecutiveCalls(new Job(), null)
;
$this->em
$this->connection
->expects($this->once())
->method('persist')
->method('beginTransaction')
;
$this->jobRepository
->expects($this->once())
->method('save')
->with(
$this->callback(function (Job $job) use ($type, $options, $next, $label, $scheduled2) {
return (
@@ -71,15 +77,21 @@ class ScheduledDataflowManagerTest extends TestCase
&& $job->getOptions() === $options
&& $job->getRequestedDate() == $next
&& $job->getLabel() === $label
&& $job->getScheduledDataflow() === $scheduled2
&& $job->getScheduledDataflowId() === $scheduled2->getId()
);
})
)
;
$this->em
$this->scheduledDataflowRepository
->expects($this->once())
->method('flush')
->method('save')
->with($scheduled2)
;
$this->connection
->expects($this->once())
->method('commit')
;
$this->manager->createJobsFromScheduledDataflows();

View File

@@ -20,9 +20,6 @@ class PendingDataflowRunnerTest extends TestCase
/** @var PendingDataflowRunner */
private $runner;
/** @var EntityManagerInterface|MockObject */
private $em;
/** @var JobRepository|MockObject */
private $repository;
@@ -34,12 +31,11 @@ class PendingDataflowRunnerTest extends TestCase
protected function setUp(): void
{
$this->em = $this->createMock(EntityManagerInterface::class);
$this->repository = $this->createMock(JobRepository::class);
$this->registry = $this->createMock(DataflowTypeRegistryInterface::class);
$this->dispatcher = $this->createMock(EventDispatcherInterface::class);
$this->runner = new PendingDataflowRunner($this->em, $this->repository, $this->registry, $this->dispatcher);
$this->runner = new PendingDataflowRunner($this->repository, $this->registry, $this->dispatcher);
}
public function testRunPendingDataflows()
@@ -103,10 +99,8 @@ class PendingDataflowRunnerTest extends TestCase
->willReturnOnConsecutiveCalls($dataflowType1, $dataflowType2)
;
$bag1 = new \SplObjectStorage();
$bag1->attach(new \Exception('message1'));
$bag2 = new \SplObjectStorage();
$bag2->attach(new \Exception('message2'));
$bag1 = [new \Exception('message1')];
$bag2 = [new \Exception('message2')];
$result1 = new Result('name', new \DateTime(), $end1 = new \DateTime(), $count1 = 10, $bag1);
$result2 = new Result('name', new \DateTime(), $end2 = new \DateTime(), $count2 = 20, $bag2);
@@ -124,9 +118,9 @@ class PendingDataflowRunnerTest extends TestCase
->willReturn($result2)
;
$this->em
$this->repository
->expects($this->exactly(4))
->method('flush')
->method('save')
;
$this->runner->runPendingDataflows();

View File

@@ -36,7 +36,7 @@ class FrequencyValidatorTest extends ConstraintValidatorTestCase
public function testInvalidValue()
{
$constraint = new Frequency([
'invalidMessage' => 'testMessage',
'message' => 'testMessage',
]);
$this->validator->validate('wrong value', $constraint);
@@ -53,7 +53,7 @@ class FrequencyValidatorTest extends ConstraintValidatorTestCase
public function testNegativeIntervals($value)
{
$constraint = new Frequency([
'negativeIntervalMessage' => 'testMessage',
'message' => 'testMessage',
]);
$this->validator->validate($value, $constraint);

3
UPGRADE.md Normal file
View File

@@ -0,0 +1,3 @@
# Upgrade from v1.x to v2.0
[BC] `JobRepository` and `ScheduledDataflowRepository` are no longer a Doctrine ORM repository.

View File

@@ -37,21 +37,33 @@
},
"require": {
"php": "^7.1",
"symfony/dependency-injection": "^3.4||^4.0",
"symfony/http-kernel": "^3.4||^4.0",
"doctrine/dbal": "^2.0",
"seld/signal-handler": "^1.0",
"symfony/config": "^3.4||^4.0",
"symfony/yaml": "^3.4||^4.0",
"symfony/console": "^3.4||^4.0",
"symfony/lock": "^3.4||^4.0",
"symfony/dependency-injection": "^3.4||^4.0",
"symfony/event-dispatcher": "^3.4||^4.0",
"symfony/validator": "^3.4||^4.0",
"symfony/http-kernel": "^3.4||^4.0",
"symfony/lock": "^3.4||^4.0",
"symfony/options-resolver": "^3.4||^4.0",
"doctrine/orm": "^2.4.5"
"symfony/validator": "^3.4||^4.0",
"symfony/yaml": "^3.4||^4.0",
"doctrine/doctrine-bundle": "^1.0"
},
"require-dev": {
"phpunit/phpunit": "^8.1"
"friendsofphp/php-cs-fixer": "^2.15",
"phpunit/phpunit": "^7||^8"
},
"suggest": {
"portphp/portphp": "Provides generic readers, steps and writers for your dataflows."
},
"config": {
"sort-packages": true
},
"extra": {
"branch-alias": {
"dev-master": "2.x-dev",
"dev-v1.x": "1.x-dev"
}
}
}

View File

@@ -14,7 +14,7 @@
<ini name="error_reporting" value="-1" />
</php>
<testsuites>
<testsuite name="Port tests suite">
<testsuite name="Dataflow tests suite">
<directory suffix="Test.php">./Tests</directory>
</testsuite>
</testsuites>

View File

@@ -9,6 +9,9 @@ use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\DataflowTypeCompil
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\HttpKernel\Bundle\Bundle;
/**
* @codeCoverageIgnore
*/
class CodeRhapsodieDataflowBundle extends Bundle
{
protected $name = 'CodeRhapsodieDataflowBundle';

View File

@@ -13,7 +13,11 @@ use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Validator\Validator\ValidatorInterface;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
/**
* @codeCoverageIgnore
*/
class AddScheduledDataflowCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:add';
@@ -25,13 +29,17 @@ class AddScheduledDataflowCommand extends Command
/** @var ValidatorInterface */
private $validator;
public function __construct(DataflowTypeRegistryInterface $registry, ScheduledDataflowRepository $scheduledDataflowRepository, ValidatorInterface $validator)
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(DataflowTypeRegistryInterface $registry, ScheduledDataflowRepository $scheduledDataflowRepository, ValidatorInterface $validator, ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->registry = $registry;
$this->scheduledDataflowRepository = $scheduledDataflowRepository;
$this->validator = $validator;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -44,10 +52,12 @@ class AddScheduledDataflowCommand extends Command
->setHelp('The <info>%command.name%</info> allows you to create a new scheduled dataflow.')
->addOption('label', null, InputOption::VALUE_REQUIRED, 'Label of the scheduled dataflow')
->addOption('type', null, InputOption::VALUE_REQUIRED, 'Type of the scheduled dataflow (FQCN)')
->addOption('options', null, InputOption::VALUE_OPTIONAL, 'Options of the scheduled dataflow (ex: {"option1": "value1", "option2": "value2"})')
->addOption('options', null, InputOption::VALUE_OPTIONAL,
'Options of the scheduled dataflow (ex: {"option1": "value1", "option2": "value2"})')
->addOption('frequency', null, InputOption::VALUE_REQUIRED, 'Frequency of the scheduled dataflow')
->addOption('first_run', null, InputOption::VALUE_REQUIRED, 'Date for the first run of the scheduled dataflow (Y-m-d H:i:s)')
->addOption('enabled', null, InputOption::VALUE_REQUIRED, 'State of the scheduled dataflow');
->addOption('enabled', null, InputOption::VALUE_REQUIRED, 'State of the scheduled dataflow')
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
@@ -55,6 +65,9 @@ class AddScheduledDataflowCommand extends Command
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$choices = [];
$typeMapping = [];
foreach ($this->registry->listDataflowTypes() as $fqcn => $dataflowType) {
@@ -74,11 +87,13 @@ class AddScheduledDataflowCommand extends Command
}
$options = $input->getOption('options');
if (!$options) {
$options = $io->ask('What are the launch options for the scheduled dataflow? (ex: {"option1": "value1", "option2": "value2"})', json_encode([]));
$options = $io->ask('What are the launch options for the scheduled dataflow? (ex: {"option1": "value1", "option2": "value2"})',
json_encode([]));
}
$frequency = $input->getOption('frequency');
if (!$frequency) {
$frequency = $io->choice('What is the frequency for the scheduled dataflow?', ScheduledDataflow::AVAILABLE_FREQUENCIES);
$frequency = $io->choice('What is the frequency for the scheduled dataflow?',
ScheduledDataflow::AVAILABLE_FREQUENCIES);
}
$firstRun = $input->getOption('first_run');
if (!$firstRun) {
@@ -89,44 +104,27 @@ class AddScheduledDataflowCommand extends Command
$enabled = $io->confirm('Enable the scheduled dataflow?');
}
try {
$newScheduledDataflow = $this->createEntityFromArray([
'label' => $label,
'type' => $type,
'options' => $options,
'frequency' => $frequency,
'first_run' => $firstRun,
'enabled' => $enabled,
]);
$newScheduledDataflow = ScheduledDataflow::createFromArray([
'id' => null,
'label' => $label,
'dataflow_type' => $type,
'options' => json_decode($options, true),
'frequency' => $frequency,
'next' => new \DateTimeImmutable($firstRun),
'enabled' => $enabled,
]);
$errors = $this->validator->validate($newScheduledDataflow);
if (count($errors) > 0) {
$io->error((string) $errors);
$errors = $this->validator->validate($newScheduledDataflow);
if (count($errors) > 0) {
$io->error((string) $errors);
return 2;
}
$this->scheduledDataflowRepository->save($newScheduledDataflow);
$io->success(sprintf('New scheduled dataflow "%s" (id:%d) was created successfully.', $newScheduledDataflow->getLabel(), $newScheduledDataflow->getId()));
return 0;
} catch (\Exception $e) {
$io->error(sprintf('An error occured when creating new scheduled dataflow : "%s".', $e->getMessage()));
return 1;
return 2;
}
}
private function createEntityFromArray(array $input): ScheduledDataflow
{
$scheduledDataflow = new ScheduledDataflow();
$scheduledDataflow->setLabel($input['label']);
$scheduledDataflow->setDataflowType($input['type']);
$scheduledDataflow->setOptions(json_decode($input['options'], true));
$scheduledDataflow->setFrequency($input['frequency']);
$scheduledDataflow->setNext(new \DateTimeImmutable($input['first_run']));
$scheduledDataflow->setEnabled($input['enabled']);
$this->scheduledDataflowRepository->save($newScheduledDataflow);
$io->success(sprintf('New scheduled dataflow "%s" (id:%d) was created successfully.',
$newScheduledDataflow->getLabel(), $newScheduledDataflow->getId()));
return $scheduledDataflow;
return 0;
}
}

View File

@@ -12,7 +12,11 @@ use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
/**
* @codeCoverageIgnore
*/
class ChangeScheduleStatusCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:change-status';
@@ -20,11 +24,15 @@ class ChangeScheduleStatusCommand extends Command
/** @var ScheduledDataflowRepository */
private $scheduledDataflowRepository;
public function __construct(ScheduledDataflowRepository $scheduledDataflowRepository)
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(ScheduledDataflowRepository $scheduledDataflowRepository, ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->scheduledDataflowRepository = $scheduledDataflowRepository;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -37,7 +45,8 @@ class ChangeScheduleStatusCommand extends Command
->setHelp('The <info>%command.name%</info> command able you to change schedule status.')
->addArgument('schedule-id', InputArgument::REQUIRED, 'Id of the schedule')
->addOption('enable', null, InputOption::VALUE_NONE, 'Enable the schedule')
->addOption('disable', null, InputOption::VALUE_NONE, 'Disable the schedule');
->addOption('disable', null, InputOption::VALUE_NONE, 'Disable the schedule')
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
@@ -45,9 +54,12 @@ class ChangeScheduleStatusCommand extends Command
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$io = new SymfonyStyle($input, $output);
/** @var ScheduledDataflow|null $schedule */
$schedule = $this->scheduledDataflowRepository->find($input->getArgument('schedule-id'));
$schedule = $this->scheduledDataflowRepository->find((int) $input->getArgument('schedule-id'));
if (!$schedule) {
$io->error(sprintf('Cannot find scheduled dataflow with id "%d".', $input->getArgument('schedule-id')));

View File

@@ -8,10 +8,14 @@ use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
/**
* Runs one dataflow.
*
* @codeCoverageIgnore
*/
class ExecuteDataflowCommand extends Command
{
@@ -20,11 +24,15 @@ class ExecuteDataflowCommand extends Command
/** @var DataflowTypeRegistryInterface */
private $registry;
public function __construct(DataflowTypeRegistryInterface $registry)
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(DataflowTypeRegistryInterface $registry, ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->registry = $registry;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -42,7 +50,7 @@ EOF
)
->addArgument('fqcn', InputArgument::REQUIRED, 'FQCN or alias of the dataflow type')
->addArgument('options', InputArgument::OPTIONAL, 'Options for the dataflow type as a json string', '[]')
;
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
@@ -50,6 +58,9 @@ EOF
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$fqcnOrAlias = $input->getArgument('fqcn');
$options = json_decode($input->getArgument('options'), true);

View File

@@ -11,7 +11,11 @@ use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
/**
* @codeCoverageIgnore
*/
class JobShowCommand extends Command
{
private const STATUS_MAPPING = [
@@ -25,11 +29,15 @@ class JobShowCommand extends Command
/** @var JobRepository */
private $jobRepository;
public function __construct(JobRepository $jobRepository)
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(JobRepository $jobRepository, ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->jobRepository = $jobRepository;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -42,7 +50,8 @@ class JobShowCommand extends Command
->setHelp('The <info>%command.name%</info> display job details for schedule or specific job.')
->addOption('job-id', null, InputOption::VALUE_REQUIRED, 'Id of the job to get details')
->addOption('schedule-id', null, InputOption::VALUE_REQUIRED, 'Id of schedule for last execution details')
->addOption('details', null, InputOption::VALUE_NONE, 'Display full details');
->addOption('details', null, InputOption::VALUE_NONE, 'Display full details')
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
@@ -50,6 +59,10 @@ class JobShowCommand extends Command
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$io = new SymfonyStyle($input, $output);
$jobId = (int) $input->getOption('job-id');

View File

@@ -9,10 +9,14 @@ use CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Command\LockableTrait;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
/**
* Runs dataflows according to user-defined schedule.
*
* @codeCoverageIgnore
*/
class RunPendingDataflowsCommand extends Command
{
@@ -26,12 +30,16 @@ class RunPendingDataflowsCommand extends Command
/** @var PendingDataflowRunnerInterface */
private $runner;
public function __construct(ScheduledDataflowManagerInterface $manager, PendingDataflowRunnerInterface $runner)
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(ScheduledDataflowManagerInterface $manager, PendingDataflowRunnerInterface $runner, ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->manager = $manager;
$this->runner = $runner;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -45,7 +53,7 @@ class RunPendingDataflowsCommand extends Command
The <info>%command.name%</info> command runs dataflows according to the schedule defined in the UI by the user.
EOF
)
;
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
@@ -59,6 +67,10 @@ EOF
return 0;
}
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$this->manager->createJobsFromScheduledDataflows();
$this->runner->runPendingDataflows();

View File

@@ -7,9 +7,14 @@ namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
/**
* @codeCoverageIgnore
*/
class ScheduleListCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:list';
@@ -17,11 +22,15 @@ class ScheduleListCommand extends Command
/** @var ScheduledDataflowRepository */
private $scheduledDataflowRepository;
public function __construct(ScheduledDataflowRepository $scheduledDataflowRepository)
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(ScheduledDataflowRepository $scheduledDataflowRepository, ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->scheduledDataflowRepository = $scheduledDataflowRepository;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -31,7 +40,8 @@ class ScheduleListCommand extends Command
{
$this
->setDescription('List scheduled dataflows')
->setHelp('The <info>%command.name%</info> lists all scheduled dataflows.');
->setHelp('The <info>%command.name%</info> lists all scheduled dataflows.')
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
/**
@@ -39,6 +49,9 @@ class ScheduleListCommand extends Command
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$io = new SymfonyStyle($input, $output);
$display = [];
$schedules = $this->scheduledDataflowRepository->listAllOrderedByLabel();
@@ -48,7 +61,7 @@ class ScheduleListCommand extends Command
$schedule['label'],
$schedule['enabled'] ? 'yes' : 'no',
$schedule['startTime'] ? (new \DateTime($schedule['startTime']))->format('Y-m-d H:i:s') : '-',
$schedule['next'] ? $schedule['next']->format('Y-m-d H:i:s') : '-',
$schedule['next'] ? (new \DateTime($schedule['next']))->format('Y-m-d H:i:s') : '-',
];
}

View File

@@ -0,0 +1,99 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use CodeRhapsodie\DataflowBundle\SchemaProvider\DataflowSchemaProvider;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Table;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
/**
* @codeCoverageIgnore
*/
class SchemaCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:dump-schema';
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->connectionFactory = $connectionFactory;
}
/**
* {@inheritdoc}
*/
protected function configure()
{
$this
->setDescription('Generates schema create / update SQL queries')
->setHelp('The <info>%command.name%</info> help you to generate SQL Query to create or update your database schema for this bundle')
->addOption('update', null, InputOption::VALUE_NONE, 'Dump only the update SQL queries.')
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use')
;
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$connection = $this->connectionFactory->getConnection();
$schemaProvider = new DataflowSchemaProvider();
$schema = $schemaProvider->createSchema();
$sqls = $schema->toSql($connection->getDatabasePlatform());
if ($input->getOption('update')) {
$sm = $connection->getSchemaManager();
$tableArray = [JobRepository::TABLE_NAME, ScheduledDataflowRepository::TABLE_NAME];
$tables = [];
foreach ($sm->listTables() as $table) {
/** @var Table $table */
if (in_array($table->getName(), $tableArray)) {
$tables[] = $table;
}
}
$namespaces = [];
if ($connection->getDatabasePlatform()->supportsSchemas()) {
$namespaces = $sm->listNamespaceNames();
}
$sequences = [];
if ($connection->getDatabasePlatform()->supportsSequences()) {
$sequences = $sm->listSequences();
}
$oldSchema = new Schema($tables, $sequences, $sm->createSchemaConfig(), $namespaces);
$sqls = $schema->getMigrateFromSql($oldSchema, $connection->getDatabasePlatform());
}
$io = new SymfonyStyle($input, $output);
$io->text('Execute these SQL Queries on your database:');
foreach ($sqls as $sql) {
$io->text($sql);
}
}
}

View File

@@ -8,6 +8,9 @@ use Symfony\Component\OptionsResolver\OptionsResolver;
abstract class AbstractDataflowType implements DataflowTypeInterface
{
/**
* @codeCoverageIgnore
*/
public function getAliases(): iterable
{
return [];
@@ -28,6 +31,9 @@ abstract class AbstractDataflowType implements DataflowTypeInterface
return $dataflow->process();
}
/**
* @codeCoverageIgnore
*/
protected function configureOptions(OptionsResolver $optionsResolver): void
{
}

View File

@@ -6,8 +6,6 @@ namespace CodeRhapsodie\DataflowBundle\DataflowType\Dataflow;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use CodeRhapsodie\DataflowBundle\Exceptions\InterruptedProcessingException;
use Seld\Signal\SignalHandler;
class Dataflow implements DataflowInterface
{
@@ -25,7 +23,7 @@ class Dataflow implements DataflowInterface
/**
* @param iterable $reader
* @param null|string $name
* @param string|null $name
*/
public function __construct(iterable $reader, ?string $name)
{
@@ -63,13 +61,9 @@ class Dataflow implements DataflowInterface
public function process(): Result
{
$count = 0;
$exceptions = new \SplObjectStorage();
$exceptions = [];
$startTime = new \DateTime();
SignalHandler::create(['SIGTERM', 'SIGINT'], function () {
throw new InterruptedProcessingException();
});
foreach ($this->writers as $writer) {
$writer->prepare();
}
@@ -78,7 +72,7 @@ class Dataflow implements DataflowInterface
try {
$this->processItem($item);
} catch (\Exception $e) {
$exceptions->attach($e, $index);
$exceptions[$index] = $e;
}
++$count;

View File

@@ -4,6 +4,9 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType;
/**
* @codeCoverageIgnore
*/
class Result
{
/** @var string */
@@ -27,7 +30,7 @@ class Result
/** @var int */
private $totalProcessedCount = 0;
/** @var \SplObjectStorage */
/** @var array */
private $exceptions;
/**
@@ -37,7 +40,7 @@ class Result
* @param int $totalCount
* @param \SplObjectStorage $exceptions
*/
public function __construct(string $name, \DateTimeInterface $startTime, \DateTimeInterface $endTime, int $totalCount, \SplObjectStorage $exceptions)
public function __construct(string $name, \DateTimeInterface $startTime, \DateTimeInterface $endTime, int $totalCount, array $exceptions)
{
$this->name = $name;
$this->startTime = $startTime;
@@ -114,9 +117,9 @@ class Result
}
/**
* @return \SplObjectStorage
* @return array
*/
public function getExceptions(): \SplObjectStorage
public function getExceptions(): array
{
return $this->exceptions;
}

View File

@@ -10,6 +10,9 @@ use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Extension\Extension;
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
/**
* @codeCoverageIgnore
*/
class CodeRhapsodieDataflowExtension extends Extension
{
public function load(array $configs, ContainerBuilder $container)
@@ -21,5 +24,9 @@ class CodeRhapsodieDataflowExtension extends Extension
->registerForAutoconfiguration(DataflowTypeInterface::class)
->addTag('coderhapsodie.dataflow.type')
;
$configuration = new Configuration();
$config = $this->processConfiguration($configuration, $configs);
$container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']);
}
}

View File

@@ -11,6 +11,8 @@ use Symfony\Component\DependencyInjection\Reference;
/**
* Registers dataflow types in the registry.
*
* @codeCoverageIgnore
*/
class DataflowTypeCompilerPass implements CompilerPassInterface
{

View File

@@ -0,0 +1,27 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DependencyInjection;
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use Symfony\Component\Config\Definition\ConfigurationInterface;
class Configuration implements ConfigurationInterface
{
public function getConfigTreeBuilder()
{
$treeBuilder = new TreeBuilder();
$rootNode = $treeBuilder->root('code_rhapsodie_dataflow');
$rootNode
->children()
->scalarNode('dbal_default_connection')
->defaultValue('default')
->end()
->end()
;
return $treeBuilder;
}
}

View File

@@ -4,13 +4,12 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Entity;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Validator\Constraints as Asserts;
/**
* Dataflow execution status.
*
* @ORM\Entity(repositoryClass="CodeRhapsodie\DataflowBundle\Repository\JobRepository")
* @ORM\Table(name="cr_dataflow_job")
* @codeCoverageIgnore
*/
class Job
{
@@ -18,83 +17,84 @@ class Job
const STATUS_RUNNING = 1;
const STATUS_COMPLETED = 2;
private const KEYS = [
'id',
'status',
'label',
'dataflow_type',
'options',
'requested_date',
'scheduled_dataflow_id',
'count',
'exceptions',
'start_time',
'end_time',
];
/**
* @var int
*
* @ORM\Id()
* @ORM\Column(type="integer")
* @ORM\GeneratedValue(strategy="AUTO")
* @var int|null
*/
private $id;
/**
* @var int
*
* @ORM\Column(type="integer")
* @Asserts\Range(min=0, max=2)
*/
private $status;
/**
* @var string|null
*
* @ORM\Column(type="string")
* @Asserts\NotBlank()
* @Asserts\Length(min=1, max=255)
* @Asserts\Regex("#^[[:alnum:] ]+\z#u")
*/
private $label;
/**
* @var string|null
*
* @ORM\Column(type="string")
* @Asserts\NotBlank()
* @Asserts\Length(min=1, max=255)
* @Asserts\Regex("#^[[:alnum:]\\]+\z#u")
*/
private $dataflowType;
/**
* @var array|null
*
* @ORM\Column(type="json")
*/
private $options;
/**
* @var \DateTimeInterface|null
*
* @ORM\Column(type="datetime", nullable=true)
* @Asserts\DateTime()
*/
private $requestedDate;
/**
* @var ScheduledDataflow|null
*
* @ORM\ManyToOne(targetEntity="ScheduledDataflow", inversedBy="jobs")
* @ORM\JoinColumn(nullable=true)
* @var int|null
*/
private $scheduledDataflow;
private $scheduledDataflowId;
/**
* @var int|null
*
* @ORM\Column(type="integer", nullable=true)
*/
private $count;
/**
* @var array|null
*
* @ORM\Column(type="json", nullable=true)
*/
private $exceptions;
/**
* @var \DateTimeInterface|null
*
* @ORM\Column(type="datetime", nullable=true)
*/
private $startTime;
/**
* @var \DateTimeInterface|null
*
* @ORM\Column(type="datetime", nullable=true)
*/
private $endTime;
@@ -111,14 +111,72 @@ class Job
->setOptions($scheduled->getOptions())
->setRequestedDate(clone $scheduled->getNext())
->setLabel($scheduled->getLabel())
->setScheduledDataflow($scheduled)
;
->setScheduledDataflowId($scheduled->getId());
}
public function __construct()
{
$this->count = 0;
$this->status = static::STATUS_PENDING;
}
public static function createFromArray(array $datas)
{
$lost = array_diff(static::KEYS, array_keys($datas));
if (count($lost) > 0) {
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ',
$lost).'"');
}
$job = new self();
$job->id = null === $datas['id'] ? null : (int) $datas['id'];
$job->setStatus(null === $datas['status'] ? null : (int) $datas['status']);
$job->setLabel($datas['label']);
$job->setDataflowType($datas['dataflow_type']);
$job->setOptions($datas['options']);
$job->setRequestedDate($datas['requested_date']);
$job->setScheduledDataflowId(null === $datas['scheduled_dataflow_id'] ? null : (int) $datas['scheduled_dataflow_id']);
$job->setCount(null === $datas['count'] ? null : (int) $datas['count']);
$job->setExceptions($datas['exceptions']);
$job->setStartTime($datas['start_time']);
$job->setEndTime($datas['end_time']);
return $job;
}
public function toArray(): array
{
return [
'id' => $this->getId(),
'status' => $this->getStatus(),
'label' => $this->getLabel(),
'dataflow_type' => $this->getDataflowType(),
'options' => $this->getOptions(),
'requested_date' => $this->getRequestedDate(),
'scheduled_dataflow_id' => $this->getScheduledDataflowId(),
'count' => $this->getCount(),
'exceptions' => $this->getExceptions(),
'start_time' => $this->getStartTime(),
'end_time' => $this->getEndTime(),
];
}
/**
* @return int
* @param int $id
*
* @return Job
*/
public function getId(): int
public function setId(int $id): Job
{
$this->id = $id;
return $this;
}
/**
* @return int|null
*/
public function getId(): ?int
{
return $this->id;
}
@@ -144,7 +202,7 @@ class Job
}
/**
* @return null|string
* @return string|null
*/
public function getLabel(): ?string
{
@@ -152,7 +210,7 @@ class Job
}
/**
* @param null|string $label
* @param string|null $label
*
* @return Job
*/
@@ -164,7 +222,7 @@ class Job
}
/**
* @return null|string
* @return string|null
*/
public function getDataflowType(): ?string
{
@@ -172,7 +230,7 @@ class Job
}
/**
* @param null|string $dataflowType
* @param string|null $dataflowType
*
* @return Job
*/
@@ -224,21 +282,21 @@ class Job
}
/**
* @return ScheduledDataflow|null
* @return int|null
*/
public function getScheduledDataflow(): ?ScheduledDataflow
public function getScheduledDataflowId(): ?int
{
return $this->scheduledDataflow;
return $this->scheduledDataflowId;
}
/**
* @param ScheduledDataflow|null $scheduledDataflow
* @param int|null $scheduledDataflowId
*
* @return Job
*/
public function setScheduledDataflow(?ScheduledDataflow $scheduledDataflow): Job
public function setScheduledDataflowId(?int $scheduledDataflowId): Job
{
$this->scheduledDataflow = $scheduledDataflow;
$this->scheduledDataflowId = $scheduledDataflowId;
return $this;
}

View File

@@ -5,13 +5,12 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Entity;
use CodeRhapsodie\DataflowBundle\Validator\Constraints\Frequency;
use Doctrine\ORM\Mapping as ORM;
use Symfony\Component\Validator\Constraints as Asserts;
/**
* Schedule for a regular execution of a dataflow.
*
* @ORM\Entity(repositoryClass="CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository")
* @ORM\Table(name="cr_dataflow_scheduled")
* @codeCoverageIgnore
*/
class ScheduledDataflow
{
@@ -22,77 +21,110 @@ class ScheduledDataflow
'1 month',
];
private const KEYS = ['id', 'label', 'dataflow_type', 'options', 'frequency', 'next', 'enabled'];
/**
* @var int
*
* @ORM\Id()
* @ORM\Column(name="id", type="integer")
* @ORM\GeneratedValue(strategy="AUTO")
* @var int|null
*/
private $id;
/**
* @var string|null
*
* @ORM\Column(type="string")
* @Asserts\NotBlank()
* @Asserts\Length(min=1, max=255)
* @Asserts\Regex("#^[[:alnum:] ]+\z#u")
*/
private $label;
/**
* @var string|null
*
* @ORM\Column(type="string")
* @Asserts\NotBlank()
* @Asserts\Length(min=1, max=255)
* @Asserts\Regex("#^[[:alnum:]\\]+\z#u")
*/
private $dataflowType;
/**
* @var array|null
*
* @ORM\Column(type="json")
*/
private $options;
/**
* @var string|null
*
* @ORM\Column(type="string")
*
* @Asserts\NotBlank()
* @Frequency()
*/
private $frequency;
/**
* @var \DateTimeInterface|null
*
* @ORM\Column(type="datetime", nullable=true)
*/
private $next;
/**
* @var bool|null
*
* @ORM\Column(type="boolean")
*/
private $enabled;
/**
* @var Job[]
*
* @ORM\OneToMany(targetEntity="Job", mappedBy="scheduledDataflow", cascade={"persist"})
* @ORM\OrderBy({"startTime" = "DESC"})
*/
private $jobs;
public static function createFromArray(array $datas)
{
$lost = array_diff(static::KEYS, array_keys($datas));
if (count($lost) > 0) {
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ',
$lost).'"');
}
$scheduledDataflow = new self();
$scheduledDataflow->id = null === $datas['id'] ? null : (int) $datas['id'];
$scheduledDataflow->setLabel($datas['label']);
$scheduledDataflow->setDataflowType($datas['dataflow_type']);
$scheduledDataflow->setOptions($datas['options']);
$scheduledDataflow->setFrequency($datas['frequency']);
$scheduledDataflow->setNext($datas['next']);
$scheduledDataflow->setEnabled(null === $datas['enabled'] ? null : (bool) $datas['enabled']);
return $scheduledDataflow;
}
public function toArray(): array
{
return [
'id' => $this->getId(),
'label' => $this->getLabel(),
'dataflow_type' => $this->getDataflowType(),
'options' => $this->getOptions(),
'frequency' => $this->getFrequency(),
'next' => $this->getNext(),
'enabled' => $this->getEnabled(),
];
}
/**
* @return int
* @param int $id
*
* @return ScheduledDataflow
*/
public function getId(): int
public function setId(int $id): ScheduledDataflow
{
$this->id = $id;
return $this;
}
/**
* @return int|null
*/
public function getId(): ?int
{
return $this->id;
}
/**
* @return null|string
* @return string|null
*/
public function getLabel(): ?string
{
@@ -100,7 +132,7 @@ class ScheduledDataflow
}
/**
* @param null|string $label
* @param string|null $label
*
* @return ScheduledDataflow
*/
@@ -112,7 +144,7 @@ class ScheduledDataflow
}
/**
* @return null|string
* @return string|null
*/
public function getDataflowType(): ?string
{
@@ -120,7 +152,7 @@ class ScheduledDataflow
}
/**
* @param null|string $dataflowType
* @param string|null $dataflowType
*
* @return ScheduledDataflow
*/

View File

@@ -9,6 +9,8 @@ use Symfony\Component\EventDispatcher\Event;
/**
* Event used during the dataflow lifecycle.
*
* @codeCoverageIgnore
*/
class ProcessingEvent extends Event
{

View File

@@ -0,0 +1,35 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Factory;
use Symfony\Component\DependencyInjection\Container;
/**
* Class ConnectionFactory.
*
* @codeCoverageIgnore
*/
class ConnectionFactory
{
private $connectionName;
private $container;
public function __construct(Container $container, string $connectionName)
{
$this->connectionName = $connectionName;
$this->container = $container;
}
public function setConnectionName(string $connectionName)
{
$this->connectionName = $connectionName;
}
public function getConnection(): \Doctrine\DBAL\Driver\Connection
{
return $this->container->get(sprintf('doctrine.dbal.%s_connection', $this->connectionName));
}
}

View File

@@ -8,25 +8,25 @@ use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\DBAL\Driver\Connection;
/**
* Handles scheduled dataflows execution dates based on their frequency.
*/
class ScheduledDataflowManager implements ScheduledDataflowManagerInterface
{
/** @var EntityManagerInterface */
private $em;
/** @var ScheduledDataflowRepository */
private $scheduledDataflowRepository;
/** @var JobRepository */
private $jobRepository;
public function __construct(EntityManagerInterface $em, ScheduledDataflowRepository $scheduledDataflowRepository, JobRepository $jobRepository)
/** @var Connection */
private $connection;
public function __construct(Connection $connection, ScheduledDataflowRepository $scheduledDataflowRepository, JobRepository $jobRepository)
{
$this->em = $em;
$this->connection = $connection;
$this->scheduledDataflowRepository = $scheduledDataflowRepository;
$this->jobRepository = $jobRepository;
}
@@ -36,16 +36,21 @@ class ScheduledDataflowManager implements ScheduledDataflowManagerInterface
*/
public function createJobsFromScheduledDataflows(): void
{
foreach ($this->scheduledDataflowRepository->findReadyToRun() as $scheduled) {
if (null !== $this->jobRepository->findPendingForScheduledDataflow($scheduled)) {
continue;
$this->connection->beginTransaction();
try {
foreach ($this->scheduledDataflowRepository->findReadyToRun() as $scheduled) {
if (null !== $this->jobRepository->findPendingForScheduledDataflow($scheduled)) {
continue;
}
$this->createPendingForScheduled($scheduled);
$this->updateScheduledDataflowNext($scheduled);
}
$this->createPendingForScheduled($scheduled);
$this->updateScheduledDataflowNext($scheduled);
} catch (\Throwable $e) {
$this->connection->rollBack();
throw $e;
}
$this->em->flush();
$this->connection->commit();
}
/**
@@ -62,6 +67,7 @@ class ScheduledDataflowManager implements ScheduledDataflowManagerInterface
}
$scheduled->setNext($next);
$this->scheduledDataflowRepository->save($scheduled);
}
/**
@@ -69,6 +75,6 @@ class ScheduledDataflowManager implements ScheduledDataflowManagerInterface
*/
private function createPendingForScheduled(ScheduledDataflow $scheduled): void
{
$this->em->persist(Job::createFromScheduledDataflow($scheduled));
$this->jobRepository->save(Job::createFromScheduledDataflow($scheduled));
}
}

View File

@@ -31,7 +31,7 @@ class DataflowTypeRegistry implements DataflowTypeRegistryInterface
return $this->aliasesRegistry[$fqcnOrAlias];
}
throw new UnknownDataflowTypeException();
throw new UnknownDataflowTypeException($fqcnOrAlias);
}
/**

View File

@@ -0,0 +1,45 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Repository;
/**
* @codeCoverageIgnore
*/
trait InitFromDbTrait
{
private function initDateTime(array $datas): array
{
foreach (static::FIELDS_TYPE as $key => $type) {
if ('datetime' === $type && null !== $datas[$key]) {
$datas[$key] = new \DateTime($datas[$key]);
}
}
return $datas;
}
private function initArray(array $datas): array
{
if (!is_array($datas['options'])) {
$datas['options'] = $this->strToArray($datas['options']);
}
if (array_key_exists('exceptions', $datas) && !is_array($datas['exceptions'])) {
$datas['exceptions'] = $this->strToArray($datas['exceptions']);
}
return $datas;
}
private function strToArray($value): array
{
if (null === $value) {
return [];
}
$array = json_decode($value, true);
return (false === $array) ? [] : $array;
}
}

View File

@@ -4,62 +4,176 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Repository;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use Doctrine\Common\Collections\Criteria;
use Doctrine\ORM\EntityRepository;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use Doctrine\DBAL\Driver\Connection;
use Doctrine\DBAL\Query\QueryBuilder;
/**
* Repository.
*
* @codeCoverageIgnore
*/
class JobRepository extends EntityRepository
class JobRepository
{
use InitFromDbTrait;
public const TABLE_NAME = 'cr_dataflow_job';
private const FIELDS_TYPE = [
'id' => \PDO::PARAM_INT,
'status' => \PDO::PARAM_INT,
'label' => \PDO::PARAM_STR,
'dataflow_type' => \PDO::PARAM_STR,
'options' => \PDO::PARAM_STR,
'requested_date' => 'datetime',
'scheduled_dataflow_id' => \PDO::PARAM_INT,
'count' => \PDO::PARAM_INT,
'exceptions' => \PDO::PARAM_STR,
'start_time' => 'datetime',
'end_time' => 'datetime',
];
/**
* @var \Doctrine\DBAL\Connection
*/
private $connection;
/**
* JobRepository constructor.
*
* @param Connection $connection
*/
public function __construct(Connection $connection)
{
$this->connection = $connection;
}
public function find(int $jobId)
{
$qb = $this->createQueryBuilder();
$qb
->andWhere($qb->expr()->eq('id', $qb->createNamedParameter($jobId, \PDO::PARAM_INT)))
;
return $this->returnFirstOrNull($qb);
}
public function findOneshotDataflows(): iterable
{
return $this->findBy([
'scheduledDataflow' => null,
'status' => Job::STATUS_PENDING,
]);
$qb = $this->createQueryBuilder();
$qb
->andWhere($qb->expr()->isNull('scheduled_dataflow_id'))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, \PDO::PARAM_INT)));
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
yield Job::createFromArray($this->initDateTime($this->initArray($row)));
}
}
public function findPendingForScheduledDataflow(ScheduledDataflow $scheduled): ?Job
{
return $this->findOneBy([
'scheduledDataflow' => $scheduled->getId(),
'status' => Job::STATUS_PENDING,
]);
$qb = $this->createQueryBuilder();
$qb
->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($scheduled->getId(), \PDO::PARAM_INT)))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, \PDO::PARAM_INT)));
return $this->returnFirstOrNull($qb);
}
public function findNextPendingDataflow(): ?Job
{
$criteria = (new Criteria())
->where(Criteria::expr()->lte('requestedDate', new \DateTime()))
->andWhere(Criteria::expr()->eq('status', Job::STATUS_PENDING))
->orderBy(['requestedDate' => Criteria::ASC])
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->lte('requested_date', $qb->createNamedParameter(new \DateTime(), 'datetime')))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, \PDO::PARAM_INT)))
->orderBy('requested_date', 'ASC')
->setMaxResults(1)
;
return $this->matching($criteria)->first() ?: null;
return $this->returnFirstOrNull($qb);
}
public function findLastForDataflowId(int $dataflowId): ?Job
{
return $this->findOneBy(['scheduledDataflow' => $dataflowId], ['requestedDate' => 'desc']);
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($dataflowId, \PDO::PARAM_INT)))
->orderBy('requested_date', 'DESC')
->setMaxResults(1)
;
return $this->returnFirstOrNull($qb);
}
public function findLatests(): iterable
{
return $this->findBy([], ['requestedDate' => 'desc'], 20);
$qb = $this->createQueryBuilder();
$qb
->orderBy('requested_date', 'DESC')
->setMaxResults(20);
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
yield Job::createFromArray($row);
}
}
public function findForScheduled(int $id): iterable
{
return $this->findBy(['scheduledDataflow' => $id], ['requestedDate' => 'desc'], 20);
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($id, \PDO::PARAM_INT)))
->orderBy('requested_date', 'DESC')
->setMaxResults(20);
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
yield Job::createFromArray($row);
}
}
public function save(Job $job)
{
$this->_em->persist($job);
$this->_em->flush();
$datas = $job->toArray();
unset($datas['id']);
if (is_array($datas['options'])) {
$datas['options'] = json_encode($datas['options']);
}
if (is_array($datas['exceptions'])) {
$datas['exceptions'] = json_encode($datas['exceptions']);
}
if (null === $job->getId()) {
$this->connection->insert(static::TABLE_NAME, $datas, static::FIELDS_TYPE);
$job->setId((int) $this->connection->lastInsertId());
return;
}
$this->connection->update(static::TABLE_NAME, $datas, ['id' => $job->getId()], static::FIELDS_TYPE);
}
public function createQueryBuilder($alias = null): QueryBuilder
{
$qb = $this->connection->createQueryBuilder();
$qb->select('*')
->from(static::TABLE_NAME, $alias);
return $qb;
}
private function returnFirstOrNull(QueryBuilder $qb): ?Job
{
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return null;
}
return Job::createFromArray($this->initDateTime($this->initArray($stmt->fetch(\PDO::FETCH_ASSOC))));
}
}

View File

@@ -5,14 +5,44 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Repository;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use Doctrine\Common\Collections\Criteria;
use Doctrine\ORM\EntityRepository;
use Doctrine\DBAL\Driver\Connection;
use Doctrine\DBAL\Query\QueryBuilder;
/**
* Repository for the ScheduledDataflow entity.
*
* @codeCoverageIgnore
*/
class ScheduledDataflowRepository extends EntityRepository
class ScheduledDataflowRepository
{
use InitFromDbTrait;
public const TABLE_NAME = 'cr_dataflow_scheduled';
private const FIELDS_TYPE = [
'id' => \PDO::PARAM_INT,
'label' => \PDO::PARAM_STR,
'dataflow_type' => \PDO::PARAM_STR,
'options' => \PDO::PARAM_STR,
'frequency' => \PDO::PARAM_STR,
'next' => 'datetime',
'enabled' => \PDO::PARAM_BOOL,
];
/**
* @var \Doctrine\DBAL\Connection
*/
private $connection;
/**
* JobRepository constructor.
*
* @param Connection $connection
*/
public function __construct(Connection $connection)
{
$this->connection = $connection;
}
/**
* Finds all enabled scheduled dataflows with a passed next run date.
*
@@ -20,42 +50,105 @@ class ScheduledDataflowRepository extends EntityRepository
*/
public function findReadyToRun(): iterable
{
$criteria = (new Criteria())
->where(Criteria::expr()->lte('next', new \DateTime()))
->andWhere(Criteria::expr()->eq('enabled', 1))
->orderBy(['next' => Criteria::ASC])
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->lte('next', $qb->createNamedParameter(new \DateTime(), 'datetime')))
->andWhere($qb->expr()->eq('enabled', 1))
->orderBy('next', 'ASC')
;
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
yield ScheduledDataflow::createFromArray($this->initDateTime($this->initArray($row)));
}
}
public function find(int $scheduleId): ?ScheduledDataflow
{
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->eq('id', $qb->createNamedParameter($scheduleId, \PDO::PARAM_INT)))
->setMaxResults(1)
;
return $this->matching($criteria);
return $this->returnFirstOrNull($qb);
}
public function findAllOrderedByLabel(): iterable
{
return $this->findBy([], ['label' => 'asc']);
$qb = $this->createQueryBuilder();
$qb->orderBy('label', 'ASC');
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
yield ScheduledDataflow::createFromArray($this->initDateTime($this->initOptions($row)));
}
}
public function listAllOrderedByLabel(): array
{
$query = $this->createQueryBuilder('w')
->select('w.id', 'w.label', 'w.enabled', 'w.next', 'max(j.startTime) as startTime')
->leftJoin('w.jobs', 'j')
$query = $this->connection->createQueryBuilder()
->from(static::TABLE_NAME, 'w')
->select('w.id', 'w.label', 'w.enabled', 'w.next', 'max(j.start_time) as startTime')
->leftJoin('w', JobRepository::TABLE_NAME, 'j', 'j.scheduled_dataflow_id = w.id')
->orderBy('w.label', 'ASC')
->groupBy('w.id');
return $query->getQuery()->execute();
return $query->execute()->fetchAll(\PDO::FETCH_ASSOC);
}
public function save(ScheduledDataflow $scheduledDataflow)
{
$this->_em->persist($scheduledDataflow);
$this->_em->flush();
$datas = $scheduledDataflow->toArray();
unset($datas['id']);
if (is_array($datas['options'])) {
$datas['options'] = json_encode($datas['options']);
}
if (null === $scheduledDataflow->getId()) {
$this->connection->insert(static::TABLE_NAME, $datas, static::FIELDS_TYPE);
$scheduledDataflow->setId((int) $this->connection->lastInsertId());
return;
}
$this->connection->update(static::TABLE_NAME, $datas, ['id' => $scheduledDataflow->getId()], static::FIELDS_TYPE);
}
public function delete(int $id): void
{
$dataflow = $this->find($id);
$this->connection->beginTransaction();
try {
$this->connection->delete(JobRepository::TABLE_NAME, ['scheduled_dataflow_id' => $id]);
$this->connection->delete(static::TABLE_NAME, ['id' => $id]);
} catch (\Throwable $e) {
$this->connection->rollBack();
throw $e;
}
$this->_em->remove($dataflow);
$this->_em->flush();
$this->connection->commit();
}
public function createQueryBuilder($alias = null): QueryBuilder
{
$qb = $this->connection->createQueryBuilder();
$qb->select('*')
->from(static::TABLE_NAME, $alias);
return $qb;
}
private function returnFirstOrNull(QueryBuilder $qb): ?ScheduledDataflow
{
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return null;
}
return ScheduledDataflow::createFromArray($this->initDateTime($this->initArray($stmt->fetch(\PDO::FETCH_ASSOC))));
}
}

View File

@@ -10,53 +10,73 @@ services:
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
$scheduledDataflowRepository: '@CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository'
$validator: '@validator'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\ChangeScheduleStatusCommand:
arguments:
$scheduledDataflowRepository: '@CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\ExecuteDataflowCommand:
arguments:
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\JobShowCommand:
arguments:
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\RunPendingDataflowsCommand:
arguments:
$manager: '@CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManagerInterface'
$runner: '@CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\ScheduleListCommand:
arguments:
$scheduledDataflowRepository: '@CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\SchemaCommand:
arguments:
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository:
factory: ['@doctrine.orm.default_entity_manager', 'getRepository']
arguments: ['CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow']
lazy: true
arguments: ['@coderhapsodie.dataflow.connection']
CodeRhapsodie\DataflowBundle\Repository\JobRepository:
factory: ['@doctrine.orm.default_entity_manager', 'getRepository']
arguments: ['CodeRhapsodie\DataflowBundle\Entity\Job']
lazy: true
arguments: ['@coderhapsodie.dataflow.connection']
coderhapsodie.dataflow.connection: "@coderhapsodie.dataflow.connection.internal"
coderhapsodie.dataflow.connection.internal:
lazy: true
class: Doctrine\DBAL\Driver\Connection
factory: ['@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory', 'getConnection']
CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory:
arguments: ['@service_container', '%coderhapsodie.dataflow.dbal_default_connection%']
CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManagerInterface: '@CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManager'
CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManager:
arguments:
$em: '@doctrine.orm.default_entity_manager'
$connection: '@coderhapsodie.dataflow.connection'
$scheduledDataflowRepository: '@CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository'
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface: '@CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner'
CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner:
arguments:
$em: '@doctrine.orm.default_entity_manager'
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
$dispatcher: '@event_dispatcher'

Binary file not shown.

After

Width:  |  Height:  |  Size: 130 KiB

View File

@@ -10,14 +10,10 @@ use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class PendingDataflowRunner implements PendingDataflowRunnerInterface
{
/** @var EntityManagerInterface */
private $em;
/** @var JobRepository */
private $repository;
@@ -27,9 +23,8 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
/** @var EventDispatcherInterface */
private $dispatcher;
public function __construct(EntityManagerInterface $em, JobRepository $repository, DataflowTypeRegistryInterface $registry, EventDispatcherInterface $dispatcher)
public function __construct(JobRepository $repository, DataflowTypeRegistryInterface $registry, EventDispatcherInterface $dispatcher)
{
$this->em = $em;
$this->repository = $repository;
$this->registry = $registry;
$this->dispatcher = $dispatcher;
@@ -61,7 +56,7 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
->setStatus(Job::STATUS_RUNNING)
->setStartTime(new \DateTime())
;
$this->em->flush();
$this->repository->save($job);
}
/**
@@ -82,7 +77,7 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
->setCount($result->getSuccessCount())
->setExceptions($exceptions)
;
$this->em->flush();
$this->repository->save($job);
$this->dispatcher->dispatch(Events::AFTER_PROCESSING, new ProcessingEvent($job));
}

View File

@@ -0,0 +1,54 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\SchemaProvider;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Doctrine\DBAL\Schema\Schema;
/**
* Class JobSchemaProvider.
*
* @codeCoverageIgnore
*/
class DataflowSchemaProvider
{
public function createSchema()
{
$schema = new Schema();
$tableJob = $schema->createTable(JobRepository::TABLE_NAME);
$tableJob->addColumn('id', 'integer', array(
'autoincrement' => true,
));
$tableJob->setPrimaryKey(array('id'));
$tableJob->addColumn('scheduled_dataflow_id', 'integer', ['notnull' => false]);
$tableJob->addColumn('status', 'integer', ['notnull' => true]);
$tableJob->addColumn('label', 'string', ['notnull' => true, 'length' => 255]);
$tableJob->addColumn('dataflow_type', 'string', ['notnull' => true, 'length' => 255]);
$tableJob->addColumn('options', 'json', ['notnull' => true]);
$tableJob->addColumn('requested_date', 'datetime', ['notnull' => false]);
$tableJob->addColumn('count', 'integer', ['notnull' => false]);
$tableJob->addColumn('exceptions', 'json', ['notnull' => false]);
$tableJob->addColumn('start_time', 'datetime', ['notnull' => false]);
$tableJob->addColumn('end_time', 'datetime', ['notnull' => false]);
$tableSchedule = $schema->createTable(ScheduledDataflowRepository::TABLE_NAME);
$tableSchedule->addColumn('id', 'integer', array(
'autoincrement' => true,
));
$tableSchedule->setPrimaryKey(array('id'));
$tableSchedule->addColumn('label', 'string', ['notnull' => true, 'length' => 255]);
$tableSchedule->addColumn('dataflow_type', 'string', ['notnull' => true, 'length' => 255]);
$tableSchedule->addColumn('options', 'json', ['notnull' => true]);
$tableSchedule->addColumn('frequency', 'string', ['notnull' => true, 'length' => 255]);
$tableSchedule->addColumn('next', 'datetime', ['notnull' => false]);
$tableSchedule->addColumn('enabled', 'boolean', ['notnull' => true]);
$tableJob->addForeignKeyConstraint($tableSchedule, ['scheduled_dataflow_id'], ['id']);
return $schema;
}
}

View File

@@ -8,10 +8,10 @@ use Symfony\Component\Validator\Constraint;
/**
* @Annotation
*
* @codeCoverageIgnore
*/
class Frequency extends Constraint
{
public $invalidMessage = 'The provided frequency "{{ string }}" must be a valid parameter for DateInterval::createFromDateString()';
public $negativeIntervalMessage = 'The provided frequency "{{ string }}" mustn\'t represent a negative interval';
public $message = 'The provided frequency "{{ string }}" must be a valid parameter for DateInterval::createFromDateString() and must not represent a negative value';
}

View File

@@ -25,7 +25,7 @@ class FrequencyValidator extends ConstraintValidator
$interval = @\DateInterval::createFromDateString($value);
if (!$interval) {
$this->context->buildViolation($constraint->invalidMessage)
$this->context->buildViolation($constraint->message)
->setParameter('{{ string }}', $value)
->addViolation()
;
@@ -38,7 +38,7 @@ class FrequencyValidator extends ConstraintValidator
$dt->add($interval);
if ($dt <= $now) {
$this->context->buildViolation($constraint->negativeIntervalMessage)
$this->context->buildViolation($constraint->message)
->setParameter('{{ string }}', $value)
->addViolation()
;