10 Commits

Author SHA1 Message Date
AUDUL
47af0e226c Added custom index for job status (#77)
* Added custom index for job status
2025-07-04 09:26:51 +02:00
AUDUL
6d86ba16a0 SchemaDump command (#75)
* Added possibility to install/update database from command #37
2024-10-31 16:40:09 +01:00
AUDUL
95124acc26 * Fix compatibility with doctrine 4 (#73)
* Remove some deprecated
2024-10-31 15:38:10 +01:00
AUDUL
4011f39510 V5 (#72)
* Added Symfony 7 support

* Removed Symfony 6 compatibility

* Removed Symfony 5 compatibility

* Removed Symfony 4 compatibility

* Removed Symfony 3 compatibility

* Changed README.md

* Added CI

---------

Co-authored-by: Jérémy J <jeremy@code-rhapsodie.fr>
2024-10-31 13:01:43 +01:00
Matt Mankins
db37c4bdd1 Update Kudos Github Action to support generation from source repo only (#71)
* Update semicolons-kudos.yaml

* Update GitHub Action workflow for Semicolons Kudos Action

* Update semicolons-kudos.yaml

* Update GitHub Action workflow for Semicolons Kudos Action

* Update GitHub Action workflow for Semicolons Kudos Action

---------

Co-authored-by: semicolons-for-kudos[bot] <145267638+semicolons-for-kudos[bot]@users.noreply.github.com>
2023-12-27 17:29:46 +01:00
Olivier PORTIER
f20cd96ec5 Update semicolons-kudos.yaml 2023-12-20 11:49:50 +01:00
Olivier PORTIER
fd2c6aaab5 Update semicolons-kudos.yaml (#70) 2023-12-20 11:36:51 +01:00
Olivier PORTIER
4efd310a6e Initiate Kudos on dataflow-bundle by creating new file semicolons-kudos.yaml (#69) 2023-12-20 11:11:26 +01:00
Jérémy J
cec42a3337 Fix log exception argument typing 2023-12-06 13:56:16 +01:00
jbcr
d440ad008b add sonar config 2023-11-16 16:50:24 +01:00
45 changed files with 528 additions and 362 deletions

27
.github/workflows/build.yml vendored Normal file
View File

@@ -0,0 +1,27 @@
name: Build
on:
push:
branches:
- master
jobs:
build:
name: Build
runs-on: ubuntu-latest
permissions: read-all
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis
- uses: sonarsource/sonarqube-scan-action@master
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_HOST_URL: ${{ secrets.SONAR_HOST_URL }}
# If you wish to fail your job when the Quality Gate is red, uncomment the
# following lines. This would typically be used to fail a deployment.
# - uses: sonarsource/sonarqube-quality-gate-action@master
# timeout-minutes: 5
# env:
# SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}

12
.github/workflows/ci.yml vendored Normal file
View File

@@ -0,0 +1,12 @@
name: CI
on: [push]
jobs:
build-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: php-actions/composer@v6 # or alternative dependency management
- uses: php-actions/phpunit@v4

26
.github/workflows/semicolons-kudos.yaml vendored Normal file
View File

@@ -0,0 +1,26 @@
name: Kudos for Code
on:
push:
branches: ["master"]
workflow_dispatch:
jobs:
kudos:
name: Semicolons Kudos
permissions: write-all
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: LoremLabs/kudos-for-code-action@latest
with:
search-dir: "."
destination: "artifact"
generate-nomerges: true
generate-validemails: true
generate-limitdepth: 0
generate-fromrepo: true
analyze-repo: false
skip-ids: ""

3
.gitignore vendored
View File

@@ -3,3 +3,6 @@ composer.lock
.phpunit.result.cache
.php_cs.cache
.php_cs
.idea
.phpunit.cache
.php-version

View File

@@ -1,3 +1,28 @@
# Version 5.2.0
* Added custom index for job status
# Version 5.1.0
* Refactor SchemaDump command
# Version 5.0.1
* Fix compatibility with doctrine 4
# Version 5.0.0
* Initiate Kudos on dataflow-bundle
* Added Symfony 7 support
* Removed Symfony 6 compatibility
* Removed Symfony 5 compatibility
* Removed Symfony 4 compatibility
* Removed Symfony 3 compatibility
* Changed README.md
* Added CI
# Version 4.1.3
* Fix log exception argument typing
# Version 4.1.2
* Fix DBAL 2.12 compatibility break
# Version 4.1.0
* Added custom index for exception log

113
README.md
View File

@@ -1,14 +1,23 @@
# Code Rhapsodie Dataflow Bundle
DataflowBundle is a bundle for Symfony 3.4+
DataflowBundle is a bundle for Symfony 3.4+
providing an easy way to create import / export dataflow.
Dataflow uses a linear generic workflow in three parts:
* one reader
* any number of steps that can be synchronous or asynchronous
* one or more writers
| Dataflow | Symfony | Support |
|----------|--------------------------|---------|
| 5.x | 7.x | yes |
| 4.x | 3.4 \| 4.x \| 5.x \| 6.x | yes |
| 3.x | 3.4 \| 4.x \| 5.x | no |
| 2.x | 3.4 \| 4.x | no |
| 1.x | 3.4 \| 4.x | no |
The reader can read data from anywhere and return data row by row. Each step processes the current row data.
Dataflow uses a linear generic workflow in three parts:
* one reader
* any number of steps that can be synchronous or asynchronous
* one or more writers
The reader can read data from anywhere and return data row by row. Each step processes the current row data.
The steps are executed in the order in which they are added.
And, one or more writers save the row anywhere you want.
@@ -27,7 +36,6 @@ As the following schema shows, you can define more than one dataflow:
* Display the result for the last Job for a Dataflow from the command line
* Work with multiple Doctrine DBAL connections
## Installation
Security notice: Symfony 4.x is not supported before 4.1.12, see https://github.com/advisories/GHSA-pgwj-prpq-jpc2
@@ -44,7 +52,8 @@ $ composer require code-rhapsodie/dataflow-bundle
You can use the generic readers, writers and steps from [PortPHP](https://github.com/portphp/portphp).
For the writers, you must use the adapter `CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter` like this:
For the writers, you must use the adapter `CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter` like
this:
```php
<?php
@@ -57,9 +66,7 @@ $builder->addWriter(new \CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWr
### Register the bundle
#### Symfony 4 (new tree)
For Symfony 4, add `CodeRhapsodie\DataflowBundle\CodeRhapsodieDataflowBundle::class => ['all' => true],
Add `CodeRhapsodie\DataflowBundle\CodeRhapsodieDataflowBundle::class => ['all' => true],
` in the `config/bundles.php` file.
Like this:
@@ -74,32 +81,13 @@ return [
];
```
#### Symfony 3.4 (old tree)
For Symfony 3.4, add a new line in the `app/AppKernel.php` file.
Like this:
```php
<?php
// app/AppKernel.php
public function registerBundles()
{
$bundles = [
// ...
new CodeRhapsodie\DataflowBundle\CodeRhapsodieDataflowBundle(),
// ...
];
}
```
### Update the database
This bundle uses Doctrine DBAL to store Dataflow schedule into the database table (`cr_dataflow_scheduled`)
and jobs (`cr_dataflow_job`).
If you use [Doctrine Migration Bundle](https://symfony.com/doc/master/bundles/DoctrineMigrationsBundle/index.html) or [Phinx](https://phinx.org/)
If you use [Doctrine Migration Bundle](https://symfony.com/doc/master/bundles/DoctrineMigrationsBundle/index.html)
or [Phinx](https://phinx.org/)
or [Kaliop Migration Bundle](https://github.com/kaliop-uk/ezmigrationbundle) or whatever,
you can add a new migration with the generated SQL query from this command:
@@ -137,6 +125,7 @@ Dataflow can delegate the execution of its jobs to the Symfony messenger compone
This allows jobs to be executed concurrently by workers instead of sequentially.
To enable messenger mode:
```yaml
code_rhapsodie_dataflow:
messenger_mode:
@@ -145,6 +134,7 @@ code_rhapsodie_dataflow:
```
You also need to route Dataflow messages to the proper transport:
```yaml
# config/packages/messenger.yaml
framework:
@@ -158,9 +148,11 @@ framework:
## Define a dataflow type
This bundle uses a fixed and simple workflow structure in order to let you focus on the data processing logic part of your dataflow.
This bundle uses a fixed and simple workflow structure in order to let you focus on the data processing logic part of
your dataflow.
A dataflow type defines the different parts of your dataflow. A dataflow is made of:
- exactly one *Reader*
- any number of *Steps*
- one or more *Writers*
@@ -169,8 +161,10 @@ Dataflow types can be configured with options.
A dataflow type must implement `CodeRhapsodie\DataflowBundle\DataflowType\DataflowTypeInterface`.
To help with creating your dataflow types, an abstract class `CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType`
is provided, allowing you to define your dataflow through a handy builder `CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder`.
To help with creating your dataflow types, an abstract
class `CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType`
is provided, allowing you to define your dataflow through a handy
builder `CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder`.
This is an example to define one class DataflowType:
@@ -230,15 +224,16 @@ class MyFirstDataflowType extends AbstractDataflowType
Dataflow types must be tagged with `coderhapsodie.dataflow.type`.
If you're using Symfony auto-configuration for your services, this tag will be automatically added to all services implementing `DataflowTypeInterface`.
If you're using Symfony auto-configuration for your services, this tag will be automatically added to all services
implementing `DataflowTypeInterface`.
Otherwise, manually add the tag `coderhapsodie.dataflow.type` in your dataflow type service configuration:
```yaml
```yaml
CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType:
tags:
- { name: coderhapsodie.dataflow.type }
CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType:
tags:
- { name: coderhapsodie.dataflow.type }
```
### Use options for your dataflow type
@@ -264,11 +259,14 @@ class MyFirstDataflowType extends AbstractDataflowType
}
```
With this configuration, the option `fileName` is required. For an advanced usage of the option resolver, read the [Symfony documentation](https://symfony.com/doc/current/components/options_resolver.html).
With this configuration, the option `fileName` is required. For an advanced usage of the option resolver, read
the [Symfony documentation](https://symfony.com/doc/current/components/options_resolver.html).
For asynchronous management, `AbstractDataflowType` come with two default options :
- loopInterval : default to 0. Update this interval if you wish customise the `tick` loop duration.
- emitInterval : default to 0. Update this interval to have a control when reader must emit new data in the flow pipeline.
- emitInterval : default to 0. Update this interval to have a control when reader must emit new data in the flow
pipeline.
### Logging
@@ -292,14 +290,15 @@ class MyDataflowType extends AbstractDataflowType
}
```
When using the `code-rhapsodie:dataflow:run-pending` command, this logger will also be used to save the log in the corresponding job in the database.
When using the `code-rhapsodie:dataflow:run-pending` command, this logger will also be used to save the log in the
corresponding job in the database.
### Check if your DataflowType is ready
Execute this command to check if your DataflowType is correctly registered:
```shell script
$ bin/console debug:container --tag coderhapsodie.dataflow.type --show-private
$ bin/console debug:container --tag coderhapsodie.dataflow.type
```
The result is like this:
@@ -316,10 +315,10 @@ Symfony Container Public and Private Services Tagged with "coderhapsodie.dataflo
```
### Readers
*Readers* provide the dataflow with elements to import / export. Usually, elements are read from an external resource (file, database, webservice, etc).
*Readers* provide the dataflow with elements to import / export. Usually, elements are read from an external resource (
file, database, webservice, etc).
A *Reader* can be any `iterable`.
@@ -357,15 +356,16 @@ You can set up this reader as follows:
$builder->setReader(($this->myReader)())
```
### Steps
*Steps* are operations performed on the elements before they are handled by the *Writers*. Usually, steps are either:
- converters, that alter the element
- filters, that conditionally prevent further operations on the element
- generators, that can include asynchronous operations
A *Step* can be any callable, taking the element as its argument, and returning either:
- the element, possibly altered
- `false`, if no further operations should be performed on this element
@@ -409,7 +409,8 @@ Note : you can ensure writing order for asynchronous operations if all steps are
*Writers* perform the actual import / export operations.
A *Writer* must implement `CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface`.
As this interface is not compatible with `Port\Writer`, the adapter `CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter` is provided.
As this interface is not compatible with `Port\Writer`, the
adapter `CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter` is provided.
This example show how to use the predefined PhpPort Writer :
@@ -460,7 +461,9 @@ class FileWriter implements WriterInterface
#### CollectionWriter
If you want to write multiple items from a single item read, you can use the generic `CollectionWriter`. This writer will iterate over any `iterable` it receives, and pass each item from that collection to your own writer that handles single items.
If you want to write multiple items from a single item read, you can use the generic `CollectionWriter`. This writer
will iterate over any `iterable` it receives, and pass each item from that collection to your own writer that handles
single items.
```php
$builder->addWriter(new CollectionWriter($mySingleItemWriter));
@@ -470,9 +473,11 @@ $builder->addWriter(new CollectionWriter($mySingleItemWriter));
If you want to call different writers depending on what item is read, you can use the generic `DelegatorWriter`.
As an example, let's suppose our items are arrays with the first entry being either `product` or `order`. We want to use a different writer based on that value.
As an example, let's suppose our items are arrays with the first entry being either `product` or `order`. We want to use
a different writer based on that value.
First, create your writers implementing `DelegateWriterInterface` (this interface extends `WriterInterface` so your writers can still be used without the `DelegatorWriter`).
First, create your writers implementing `DelegateWriterInterface` (this interface extends `WriterInterface` so your
writers can still be used without the `DelegatorWriter`).
```php
<?php
@@ -545,7 +550,8 @@ Then, configure your `DelegatorWriter` and add it to your dataflow type.
}
```
During execution, the `DelegatorWriter` will simply pass each item received to its first delegate (in the order those were added) that supports it. If no delegate supports an item, an exception will be thrown.
During execution, the `DelegatorWriter` will simply pass each item received to its first delegate (in the order those
were added) that supports it. If no delegate supports an item, an exception will be thrown.
## Queue
@@ -563,7 +569,8 @@ Several commands are provided to manage schedules and run jobs.
`code-rhapsodie:dataflow:run-pending` Executes job in the queue according to their schedule.
When messenger mode is enabled, jobs will still be created according to their schedule, but execution will be handled by the messenger component instead.
When messenger mode is enabled, jobs will still be created according to their schedule, but execution will be handled by
the messenger component instead.
`code-rhapsodie:dataflow:schedule:list` Display the list of dataflows scheduled.
@@ -602,7 +609,7 @@ $ bin/console code-rhapsodie:dataflow:run-pending --connection=default
Please report issues and request features at https://github.com/code-rhapsodie/dataflow-bundle/issues.
Please note that only the last release of the 3.x and the 4.x versions of this bundle are actively supported.
Please note that only the last release of the 4.x and the 5.x versions of this bundle are actively supported.
# Contributing

View File

@@ -4,7 +4,6 @@ namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType\Dataflow;
use Amp\Delayed;
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\AMPAsyncDataflow;
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\Dataflow;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use PHPUnit\Framework\TestCase;

View File

@@ -40,10 +40,13 @@ class CollectionWriterTest extends TestCase
->expects($this->once())
->method('finish')
;
$matcher = $this->exactly(count($values));
$embeddedWriter
->expects($this->exactly(count($values)))
->expects($matcher)
->method('write')
->withConsecutive(...array_map(fn($item) => [$item], $values))
->with($this->callback(function ($arg) use ($matcher, $values) {
return $arg === $values[$matcher->numberOfInvocations() - 1];
}))
;
$writer = new CollectionWriter($embeddedWriter);

View File

@@ -10,13 +10,10 @@ use PHPUnit\Framework\TestCase;
class DelegatorWriterTest extends TestCase
{
private \CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegatorWriter $delegatorWriter;
private \CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegateWriterInterface|\PHPUnit\Framework\MockObject\MockObject $delegateInt;
private \CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegateWriterInterface|\PHPUnit\Framework\MockObject\MockObject $delegateString;
private \CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegateWriterInterface|\PHPUnit\Framework\MockObject\MockObject $delegateArray;
private DelegatorWriter $delegatorWriter;
private DelegateWriterInterface|MockObject $delegateInt;
private DelegateWriterInterface|MockObject $delegateString;
private DelegateWriterInterface|MockObject $delegateArray;
protected function setUp(): void
{

View File

@@ -12,7 +12,7 @@ class PortWriterAdapterTest extends TestCase
$value = 'not an array';
$writer = $this->getMockBuilder('\Port\Writer')
->setMethods(['prepare', 'finish', 'writeItem'])
->onlyMethods(['prepare', 'finish', 'writeItem'])
->getMock()
;
$writer

View File

@@ -2,10 +2,8 @@
namespace CodeRhapsodie\DataflowBundle\Tests\Manager;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowTypeInterface;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use CodeRhapsodie\DataflowBundle\Exceptions\UnknownDataflowTypeException;
use CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManager;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
@@ -15,13 +13,10 @@ use PHPUnit\Framework\TestCase;
class ScheduledDataflowManagerTest extends TestCase
{
private \CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManager $manager;
private \Doctrine\DBAL\Connection|\PHPUnit\Framework\MockObject\MockObject $connection;
private \CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository|\PHPUnit\Framework\MockObject\MockObject $scheduledDataflowRepository;
private \CodeRhapsodie\DataflowBundle\Repository\JobRepository|\PHPUnit\Framework\MockObject\MockObject $jobRepository;
private ScheduledDataflowManager $manager;
private Connection|MockObject $connection;
private ScheduledDataflowRepository|MockObject $scheduledDataflowRepository;
private JobRepository|MockObject $jobRepository;
protected function setUp(): void
{
@@ -50,10 +45,20 @@ class ScheduledDataflowManagerTest extends TestCase
->willReturn([$scheduled1, $scheduled2])
;
$matcher = $this->exactly(2);
$this->jobRepository
->expects($this->exactly(2))
->expects($matcher)
->method('findPendingForScheduledDataflow')
->withConsecutive([$scheduled1], [$scheduled2])
->with($this->callback(function ($arg) use ($matcher, $scheduled1, $scheduled2) {
switch ($matcher->numberOfInvocations()) {
case 1:
return $arg === $scheduled1;
case 2:
return $arg === $scheduled2;
default:
return false;
}
}))
->willReturnOnConsecutiveCalls(new Job(), null)
;
@@ -103,7 +108,7 @@ class ScheduledDataflowManagerTest extends TestCase
$this->jobRepository
->expects($this->exactly(1))
->method('findPendingForScheduledDataflow')
->withConsecutive([$scheduled1])
->with($scheduled1)
->willThrowException(new \Exception())
;

View File

@@ -14,11 +14,9 @@ use PHPUnit\Framework\TestCase;
class JobMessageHandlerTest extends TestCase
{
private \CodeRhapsodie\DataflowBundle\Repository\JobRepository|\PHPUnit\Framework\MockObject\MockObject $repository;
private \CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface|\PHPUnit\Framework\MockObject\MockObject $processor;
private \CodeRhapsodie\DataflowBundle\MessengerMode\JobMessageHandler $handler;
private JobRepository|MockObject $repository;
private JobProcessorInterface|MockObject $processor;
private JobMessageHandler $handler;
protected function setUp(): void
{
@@ -28,11 +26,6 @@ class JobMessageHandlerTest extends TestCase
$this->handler = new JobMessageHandler($this->repository, $this->processor);
}
public function testGetHandledMessages()
{
$this->assertSame([JobMessage::class], JobMessageHandler::getHandledMessages());
}
public function testInvoke()
{
$message = new JobMessage($id = 32);

View File

@@ -16,13 +16,10 @@ use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class JobProcessorTest extends TestCase
{
private \CodeRhapsodie\DataflowBundle\Processor\JobProcessor $processor;
private \CodeRhapsodie\DataflowBundle\Repository\JobRepository|\PHPUnit\Framework\MockObject\MockObject $repository;
private \CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface|\PHPUnit\Framework\MockObject\MockObject $registry;
private \Symfony\Component\EventDispatcher\EventDispatcherInterface|\PHPUnit\Framework\MockObject\MockObject $dispatcher;
private JobProcessor $processor;
private JobRepository|MockObject $repository;
private DataflowTypeRegistryInterface|MockObject $registry;
private EventDispatcherInterface|MockObject $dispatcher;
protected function setUp(): void
{
@@ -42,36 +39,25 @@ class JobProcessorTest extends TestCase
->setOptions($options = ['option1' => 'value1'])
;
// Symfony 3.4 to 4.4 call
if (!class_exists(\Symfony\Contracts\EventDispatcher\Event::class)) {
$this->dispatcher
->expects($this->exactly(2))
->method('dispatch')
->withConsecutive(
[
Events::BEFORE_PROCESSING,
$this->callback(fn(ProcessingEvent $event) => $event->getJob() === $job)
],
[
Events::AFTER_PROCESSING,
$this->callback(fn(ProcessingEvent $event) => $event->getJob() === $job)
],
);
} else { // Symfony 5.0+
$this->dispatcher
->expects($this->exactly(2))
->method('dispatch')
->withConsecutive(
[
$this->callback(fn(ProcessingEvent $event) => $event->getJob() === $job),
Events::BEFORE_PROCESSING,
],
[
$this->callback(fn(ProcessingEvent $event) => $event->getJob() === $job),
Events::AFTER_PROCESSING,
],
);
}
$matcher = $this->exactly(2);
$this->dispatcher
->expects($matcher)
->method('dispatch')
->with(
$this->callback(function ($arg) use ($job) {
return $arg instanceof ProcessingEvent && $arg->getJob() === $job;
}),
$this->callback(function ($arg) use ($matcher) {
switch ($matcher->numberOfInvocations()) {
case 1:
return $arg === Events::BEFORE_PROCESSING;
case 2:
return $arg === Events::AFTER_PROCESSING;
default:
return false;
}
})
);
$dataflowType = $this->createMock(DataflowTypeInterface::class);

View File

@@ -10,7 +10,7 @@ use PHPUnit\Framework\TestCase;
class DataflowTypeRegistryTest extends TestCase
{
private \CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry $registry;
private DataflowTypeRegistry $registry;
protected function setUp(): void
{

View File

@@ -13,11 +13,9 @@ use Symfony\Component\Messenger\MessageBusInterface;
class MessengerDataflowRunnerTest extends TestCase
{
private \CodeRhapsodie\DataflowBundle\Runner\MessengerDataflowRunner $runner;
private \CodeRhapsodie\DataflowBundle\Repository\JobRepository|\PHPUnit\Framework\MockObject\MockObject $repository;
private \Symfony\Component\Messenger\MessageBusInterface|\PHPUnit\Framework\MockObject\MockObject $bus;
private MessengerDataflowRunner $runner;
private JobRepository|MockObject $repository;
private MessageBusInterface|MockObject $bus;
protected function setUp(): void
{
@@ -37,20 +35,36 @@ class MessengerDataflowRunnerTest extends TestCase
->method('findNextPendingDataflow')
->willReturnOnConsecutiveCalls($job1, $job2, null)
;
$matcher = $this->exactly(2);
$this->repository
->expects($this->exactly(2))
->expects($matcher)
->method('save')
->withConsecutive([$job1], [$job2])
->with($this->callback(function ($arg) use ($matcher, $job1, $job2) {
switch ($matcher->numberOfInvocations()) {
case 1:
return $arg === $job1;
case 2:
return $arg === $job2;
default:
return false;
}
}))
;
$matcher = $this->exactly(2);
$this->bus
->expects($this->exactly(2))
->expects($matcher)
->method('dispatch')
->withConsecutive([
$this->callback(fn($message) => $message instanceof JobMessage && $message->getJobId() === $id1)
], [
$this->callback(fn($message) => $message instanceof JobMessage && $message->getJobId() === $id2)
])
->with($this->callback(function ($arg) use ($matcher, $id1, $id2) {
switch ($matcher->numberOfInvocations()) {
case 1:
return $arg instanceof JobMessage && $arg->getJobId() === $id1;
case 2:
return $arg instanceof JobMessage && $arg->getJobId() === $id2;
default:
return false;
}
}))
->willReturnOnConsecutiveCalls(
new Envelope(new JobMessage($id1)),
new Envelope(new JobMessage($id2))

View File

@@ -11,11 +11,9 @@ use PHPUnit\Framework\TestCase;
class PendingDataflowRunnerTest extends TestCase
{
private \CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner $runner;
private \CodeRhapsodie\DataflowBundle\Repository\JobRepository|\PHPUnit\Framework\MockObject\MockObject $repository;
private \CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface|\PHPUnit\Framework\MockObject\MockObject $processor;
private PendingDataflowRunner $runner;
private JobRepository|MockObject $repository;
private JobProcessorInterface|MockObject $processor;
protected function setUp(): void
{
@@ -36,10 +34,20 @@ class PendingDataflowRunnerTest extends TestCase
->willReturnOnConsecutiveCalls($job1, $job2, null)
;
$matcher = $this->exactly(2);
$this->processor
->expects($this->exactly(2))
->expects($matcher)
->method('process')
->withConsecutive([$job1], [$job2])
->with($this->callback(function ($arg) use ($matcher, $job1, $job2) {
switch ($matcher->numberOfInvocations()) {
case 1:
return $arg === $job1;
case 2:
return $arg === $job2;
default:
return false;
}
}))
;
$this->runner->runPendingDataflows();

View File

@@ -4,18 +4,18 @@ namespace CodeRhapsodie\DataflowBundle\Tests\Validator\Constraints;
use CodeRhapsodie\DataflowBundle\Validator\Constraints\Frequency;
use CodeRhapsodie\DataflowBundle\Validator\Constraints\FrequencyValidator;
use PHPUnit\Framework\Attributes\DataProvider;
use Symfony\Component\Validator\ConstraintValidatorInterface;
use Symfony\Component\Validator\Test\ConstraintValidatorTestCase;
class FrequencyValidatorTest extends ConstraintValidatorTestCase
{
protected function createValidator()
protected function createValidator(): ConstraintValidatorInterface
{
return new FrequencyValidator();
}
/**
* @dataProvider getValidValues
*/
#[DataProvider('getValidValues')]
public function testValidValues($value)
{
$this->validator->validate($value, new Frequency());
@@ -23,7 +23,7 @@ class FrequencyValidatorTest extends ConstraintValidatorTestCase
$this->assertNoViolation();
}
public function getValidValues()
public static function getValidValues()
{
return [
['3 days'],
@@ -47,9 +47,7 @@ class FrequencyValidatorTest extends ConstraintValidatorTestCase
;
}
/**
* @dataProvider getNegativeValues
*/
#[DataProvider('getNegativeValues')]
public function testNegativeIntervals($value)
{
$constraint = new Frequency([
@@ -64,7 +62,7 @@ class FrequencyValidatorTest extends ConstraintValidatorTestCase
;
}
public function getNegativeValues()
public static function getNegativeValues()
{
return [
['now'],

View File

@@ -43,26 +43,27 @@
"require": {
"php": "^8.0",
"ext-json": "*",
"doctrine/dbal": "^2.12||^3.0",
"doctrine/doctrine-bundle": "^1.0||^2.0",
"monolog/monolog": "^1.0||^2.0",
"doctrine/dbal": "^3.0||^4.0",
"doctrine/doctrine-bundle": "^2.0",
"monolog/monolog": "^2.0||^3.0",
"psr/log": "^1.1||^2.0||^3.0",
"symfony/config": "^3.4||^4.0||^5.0||^6.0",
"symfony/console": "^3.4||^4.0||^5.0||^6.0",
"symfony/dependency-injection": "^3.4||>=4.1.12||^5.0||^6.0",
"symfony/event-dispatcher": "^3.4||^4.0||^5.0||^6.0",
"symfony/http-kernel": "^3.4||^4.0||^5.0||^6.0",
"symfony/lock": "^3.4||^4.0||^5.0||^6.0",
"symfony/monolog-bridge": "^3.4||^4.0||^5.0||^6.0",
"symfony/options-resolver": "^3.4||^4.0||^5.0||^6.0",
"symfony/validator": "^3.4||^4.0||^5.0||^6.0",
"symfony/yaml": "^3.4||^4.0||^5.0||^6.0"
"symfony/config": "^7.0",
"symfony/console": "^7.0",
"symfony/dependency-injection": "^7.0",
"symfony/event-dispatcher": "^7.0",
"symfony/http-kernel": "^7.0",
"symfony/lock": "^7.0",
"symfony/monolog-bridge": "^7.0",
"symfony/options-resolver": "^7.0",
"symfony/validator": "^7.0",
"symfony/yaml": "^7.0"
},
"require-dev": {
"amphp/amp": "^2.5",
"phpunit/phpunit": "^7||^8||^9",
"rector/rector": "^0.13.10",
"symfony/messenger": "^4.4||^5.0||^6.0"
"phpunit/phpunit": "^11",
"portphp/portphp": "^1.9",
"rector/rector": "^1.0",
"symfony/messenger": "^7.0"
},
"suggest": {
"amphp/amp": "Provide asynchronous steps for your dataflows",

View File

@@ -1,15 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- http://www.phpunit.de/manual/current/en/appendixes.configuration.html -->
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" backupGlobals="false" backupStaticAttributes="false" bootstrap="Tests/bootstrap.php" convertErrorsToExceptions="true" convertNoticesToExceptions="true" convertWarningsToExceptions="true" colors="false" xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.3/phpunit.xsd">
<coverage>
<include>
<directory>./src/</directory>
</include>
<exclude>
<directory>Tests/</directory>
<directory>vendor/</directory>
</exclude>
</coverage>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" backupGlobals="false" bootstrap="Tests/bootstrap.php" colors="false" xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/11.1/phpunit.xsd" cacheDirectory=".phpunit.cache" backupStaticProperties="false">
<php>
<ini name="error_reporting" value="-1"/>
</php>
@@ -18,4 +9,13 @@
<directory suffix="Test.php">./Tests</directory>
</testsuite>
</testsuites>
<source>
<include>
<directory>./src/</directory>
</include>
<exclude>
<directory>Tests/</directory>
<directory>vendor/</directory>
</exclude>
</source>
</phpunit>

4
sonar-project.properties Normal file
View File

@@ -0,0 +1,4 @@
sonar.projectKey=code-rhapsodie_dataflow-bundle_AYvYuaAwWE9sbcQmD1vw
sonar.sources=src
sonar.tests=Tests

View File

@@ -17,14 +17,14 @@ use Symfony\Component\HttpKernel\Bundle\Bundle;
*/
class CodeRhapsodieDataflowBundle extends Bundle
{
protected $name = 'CodeRhapsodieDataflowBundle';
protected string $name = 'CodeRhapsodieDataflowBundle';
public function getContainerExtension(): ?ExtensionInterface
{
return new CodeRhapsodieDataflowExtension();
}
public function build(ContainerBuilder $container)
public function build(ContainerBuilder $container): void
{
$container
->addCompilerPass(new DataflowTypeCompilerPass())

View File

@@ -8,6 +8,7 @@ use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
@@ -18,10 +19,9 @@ use Symfony\Component\Validator\Validator\ValidatorInterface;
/**
* @codeCoverageIgnore
*/
#[AsCommand('code-rhapsodie:dataflow:schedule:add', 'Create a scheduled dataflow')]
class AddScheduledDataflowCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:add';
public function __construct(private DataflowTypeRegistryInterface $registry, private ScheduledDataflowRepository $scheduledDataflowRepository, private ValidatorInterface $validator, private ConnectionFactory $connectionFactory)
{
parent::__construct();
@@ -30,10 +30,9 @@ class AddScheduledDataflowCommand extends Command
/**
* {@inheritdoc}
*/
protected function configure()
protected function configure(): void
{
$this
->setDescription('Create a scheduled dataflow')
->setHelp('The <info>%command.name%</info> allows you to create a new scheduled dataflow.')
->addOption('label', null, InputOption::VALUE_REQUIRED, 'Label of the scheduled dataflow')
->addOption('type', null, InputOption::VALUE_REQUIRED, 'Type of the scheduled dataflow (FQCN)')
@@ -95,7 +94,7 @@ class AddScheduledDataflowCommand extends Command
'dataflow_type' => $type,
'options' => json_decode($options, true, 512, JSON_THROW_ON_ERROR),
'frequency' => $frequency,
'next' => new \DateTimeImmutable($firstRun),
'next' => new \DateTime($firstRun),
'enabled' => $enabled,
]);

View File

@@ -7,6 +7,7 @@ namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
@@ -17,10 +18,9 @@ use Symfony\Component\Console\Style\SymfonyStyle;
/**
* @codeCoverageIgnore
*/
#[AsCommand('code-rhapsodie:dataflow:schedule:change-status', 'Change schedule status')]
class ChangeScheduleStatusCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:change-status';
public function __construct(private ScheduledDataflowRepository $scheduledDataflowRepository, private ConnectionFactory $connectionFactory)
{
parent::__construct();
@@ -29,10 +29,9 @@ class ChangeScheduleStatusCommand extends Command
/**
* {@inheritdoc}
*/
protected function configure()
protected function configure(): void
{
$this
->setDescription('Change schedule status')
->setHelp('The <info>%command.name%</info> command able you to change schedule status.')
->addArgument('schedule-id', InputArgument::REQUIRED, 'Id of the schedule')
->addOption('enable', null, InputOption::VALUE_NONE, 'Enable the schedule')

View File

@@ -0,0 +1,113 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use CodeRhapsodie\DataflowBundle\SchemaProvider\DataflowSchemaProvider;
use Doctrine\DBAL\Schema\Comparator;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Table;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\ConfirmationQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
#[AsCommand(name: 'code-rhapsodie:dataflow:database-schema', description: 'Generates schema create / update SQL queries')]
class DatabaseSchemaCommand extends Command
{
public function __construct(private ConnectionFactory $connectionFactory)
{
parent::__construct();
}
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
->setHelp('The <info>%command.name%</info> help you to generate SQL Query to create or update your database schema for this bundle')
->addOption('dump-sql', null, InputOption::VALUE_NONE, 'Dump only the update SQL queries.')
->addOption('update', null, InputOption::VALUE_NONE, 'Dump/execute only the update SQL queries.')
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$connection = $this->connectionFactory->getConnection();
$schemaProvider = new DataflowSchemaProvider();
$schema = $schemaProvider->createSchema();
$sqls = $schema->toSql($connection->getDatabasePlatform());
if ($input->getOption('update')) {
$sm = $connection->createSchemaManager();
$tableArray = [JobRepository::TABLE_NAME, ScheduledDataflowRepository::TABLE_NAME];
$tables = [];
foreach ($sm->listTables() as $table) {
/** @var Table $table */
if (in_array($table->getName(), $tableArray)) {
$tables[] = $table;
}
}
$namespaces = [];
if ($connection->getDatabasePlatform()->supportsSchemas()) {
$namespaces = $sm->listSchemaNames();
}
$sequences = [];
if ($connection->getDatabasePlatform()->supportsSequences()) {
$sequences = $sm->listSequences();
}
$oldSchema = new Schema($tables, $sequences, $sm->createSchemaConfig(), $namespaces);
$sqls = $connection->getDatabasePlatform()->getAlterSchemaSQL((new Comparator($connection->getDatabasePlatform()))->compareSchemas($oldSchema, $schema));
if (empty($sqls)) {
$io->info('There is no update SQL queries.');
}
}
if ($input->getOption('dump-sql')) {
$io->text('Execute these SQL Queries on your database:');
foreach ($sqls as $sql) {
$io->text($sql . ';');
}
return Command::SUCCESS;
}
if (!$io->askQuestion(new ConfirmationQuestion('Are you sure to update database ?', true))) {
$io->text("Execution canceled.");
return Command::SUCCESS;
}
foreach ($sqls as $sql) {
$connection->executeQuery($sql);
}
$io->success(sprintf('%d queries executed.', \count($sqls)));
return parent::SUCCESS;
}
}

View File

@@ -8,6 +8,7 @@ use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
@@ -20,12 +21,11 @@ use Symfony\Component\Console\Style\SymfonyStyle;
*
* @codeCoverageIgnore
*/
#[AsCommand('code-rhapsodie:dataflow:execute', 'Runs one dataflow type with provided options')]
class ExecuteDataflowCommand extends Command implements LoggerAwareInterface
{
use LoggerAwareTrait;
protected static $defaultName = 'code-rhapsodie:dataflow:execute';
public function __construct(private DataflowTypeRegistryInterface $registry, private ConnectionFactory $connectionFactory)
{
parent::__construct();
@@ -34,10 +34,9 @@ class ExecuteDataflowCommand extends Command implements LoggerAwareInterface
/**
* {@inheritdoc}
*/
protected function configure()
protected function configure(): void
{
$this
->setDescription('Runs one dataflow type with provided options')
->setHelp(<<<'EOF'
The <info>%command.name%</info> command runs one dataflow with the provided options.

View File

@@ -7,6 +7,7 @@ namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
@@ -16,6 +17,7 @@ use Symfony\Component\Console\Style\SymfonyStyle;
/**
* @codeCoverageIgnore
*/
#[AsCommand('code-rhapsodie:dataflow:job:show', 'Display job details for schedule or specific job')]
class JobShowCommand extends Command
{
private const STATUS_MAPPING = [
@@ -24,8 +26,6 @@ class JobShowCommand extends Command
Job::STATUS_COMPLETED => 'Completed',
];
protected static $defaultName = 'code-rhapsodie:dataflow:job:show';
public function __construct(private JobRepository $jobRepository, private ConnectionFactory $connectionFactory)
{
parent::__construct();
@@ -34,10 +34,9 @@ class JobShowCommand extends Command
/**
* {@inheritdoc}
*/
protected function configure()
protected function configure(): void
{
$this
->setDescription('Display job details for schedule or specific job')
->setHelp('The <info>%command.name%</info> display job details for schedule or specific job.')
->addOption('job-id', null, InputOption::VALUE_REQUIRED, 'Id of the job to get details')
->addOption('schedule-id', null, InputOption::VALUE_REQUIRED, 'Id of schedule for last execution details')

View File

@@ -7,6 +7,7 @@ namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManagerInterface;
use CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Command\LockableTrait;
use Symfony\Component\Console\Input\InputInterface;
@@ -18,12 +19,11 @@ use Symfony\Component\Console\Output\OutputInterface;
*
* @codeCoverageIgnore
*/
#[AsCommand('code-rhapsodie:dataflow:run-pending', 'Runs dataflows based on the scheduled defined in the UI.')]
class RunPendingDataflowsCommand extends Command
{
use LockableTrait;
protected static $defaultName = 'code-rhapsodie:dataflow:run-pending';
public function __construct(private ScheduledDataflowManagerInterface $manager, private PendingDataflowRunnerInterface $runner, private ConnectionFactory $connectionFactory)
{
parent::__construct();
@@ -32,10 +32,9 @@ class RunPendingDataflowsCommand extends Command
/**
* {@inheritdoc}
*/
protected function configure()
protected function configure(): void
{
$this
->setDescription('Runs dataflows based on the scheduled defined in the UI.')
->setHelp(<<<'EOF'
The <info>%command.name%</info> command runs dataflows according to the schedule defined in the UI by the user.
EOF

View File

@@ -6,6 +6,7 @@ namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
@@ -15,10 +16,9 @@ use Symfony\Component\Console\Style\SymfonyStyle;
/**
* @codeCoverageIgnore
*/
#[AsCommand('code-rhapsodie:dataflow:schedule:list', 'List scheduled dataflows')]
class ScheduleListCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:list';
public function __construct(private ScheduledDataflowRepository $scheduledDataflowRepository, private ConnectionFactory $connectionFactory)
{
parent::__construct();
@@ -27,10 +27,9 @@ class ScheduleListCommand extends Command
/**
* {@inheritdoc}
*/
protected function configure()
protected function configure(): void
{
$this
->setDescription('List scheduled dataflows')
->setHelp('The <info>%command.name%</info> lists all scheduled dataflows.')
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use');
}

View File

@@ -8,9 +8,12 @@ use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use CodeRhapsodie\DataflowBundle\SchemaProvider\DataflowSchemaProvider;
use Doctrine\DBAL\Schema\Comparator;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Table;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\ArrayInput;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
@@ -18,23 +21,17 @@ use Symfony\Component\Console\Style\SymfonyStyle;
/**
* @codeCoverageIgnore
* @deprecated This command is deprecated and will be removed in 6.0, use this command "code-rhapsodie:dataflow:database-schema" instead.
*/
#[AsCommand('code-rhapsodie:dataflow:dump-schema', 'Generates schema create / update SQL queries')]
class SchemaCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:dump-schema';
public function __construct(private ConnectionFactory $connectionFactory)
{
parent::__construct();
}
/**
* {@inheritdoc}
*/
protected function configure()
protected function configure(): void
{
$this
->setDescription('Generates schema create / update SQL queries')
->setHelp('The <info>%command.name%</info> help you to generate SQL Query to create or update your database schema for this bundle')
->addOption('update', null, InputOption::VALUE_NONE, 'Dump only the update SQL queries.')
->addOption('connection', null, InputOption::VALUE_REQUIRED, 'Define the DBAL connection to use')
@@ -46,51 +43,24 @@ class SchemaCommand extends Command
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$connection = $this->connectionFactory->getConnection();
$schemaProvider = new DataflowSchemaProvider();
$schema = $schemaProvider->createSchema();
$sqls = $schema->toSql($connection->getDatabasePlatform());
if ($input->getOption('update')) {
$sm = $connection->getSchemaManager();
$tableArray = [JobRepository::TABLE_NAME, ScheduledDataflowRepository::TABLE_NAME];
$tables = [];
foreach ($sm->listTables() as $table) {
/** @var Table $table */
if (in_array($table->getName(), $tableArray)) {
$tables[] = $table;
}
}
$namespaces = [];
if ($connection->getDatabasePlatform()->supportsSchemas()) {
$namespaces = $sm->listNamespaceNames();
}
$sequences = [];
if ($connection->getDatabasePlatform()->supportsSequences()) {
$sequences = $sm->listSequences();
}
$oldSchema = new Schema($tables, $sequences, $sm->createSchemaConfig(), $namespaces);
$sqls = $schema->getMigrateFromSql($oldSchema, $connection->getDatabasePlatform());
}
$io = new SymfonyStyle($input, $output);
$io->text('Execute these SQL Queries on your database:');
foreach ($sqls as $sql) {
$io->text($sql.';');
}
$io->warning('This command is deprecated and will be removed in 6.0, use this command "code-rhapsodie:dataflow:database-schema" instead.');
return 0;
$options = array_filter($input->getOptions());
//add -- before each keys
$options = array_combine(
array_map(fn($key) => '--' . $key, array_keys($options)),
array_values($options)
);
$options['--dump-sql'] = true;
$inputArray = new ArrayInput([
'command' => 'code-rhapsodie:dataflow:database-schema',
...$options
]);
return $this->getApplication()->doRun($inputArray, $output);
}
}

View File

@@ -11,19 +11,17 @@ use Symfony\Component\OptionsResolver\OptionsResolver;
class AMPAsyncDataflowBuilder extends DataflowBuilder
{
public function __construct(protected ?int $loopInterval = 0, protected ?int $emitInterval = 0)
{
}
private ?string $name = null;
private ?iterable $reader = null;
private array $steps = [];
/** @var WriterInterface[] */
private array $writers = [];
public function __construct(protected ?int $loopInterval = 0, protected ?int $emitInterval = 0)
{
}
public function setName(string $name): self
{
$this->name = $name;

View File

@@ -68,7 +68,7 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
{
$count = 0;
$exceptions = [];
$startTime = new \DateTimeImmutable();
$startTime = new \DateTime();
try {
foreach ($this->writers as $writer) {
@@ -110,7 +110,7 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
$this->logException($e);
}
return new Result($this->name, $startTime, new \DateTimeImmutable(), $count, $exceptions);
return new Result($this->name, $startTime, new \DateTime(), $count, $exceptions);
}
/**

View File

@@ -62,7 +62,7 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
{
$count = 0;
$exceptions = [];
$startTime = new \DateTimeImmutable();
$startTime = new \DateTime();
try {
foreach ($this->writers as $writer) {
@@ -97,7 +97,7 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
$this->logException($e);
}
return new Result($this->name, $startTime, new \DateTimeImmutable(), $count, $exceptions);
return new Result($this->name, $startTime, new \DateTime(), $count, $exceptions);
}
private function processItem(mixed $item): void
@@ -115,7 +115,7 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
}
}
private function logException(\Throwable $e, ?string $index = null): void
private function logException(\Throwable $e, string|int|null $index = null): void
{
if (!isset($this->logger)) {
return;

View File

@@ -11,9 +11,7 @@ use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
class DataflowBuilder
{
private ?string $name = null;
private ?iterable $reader = null;
private array $steps = [];
/** @var WriterInterface[] */

View File

@@ -13,12 +13,7 @@ class Configuration implements ConfigurationInterface
public function getConfigTreeBuilder(): \Symfony\Component\Config\Definition\Builder\TreeBuilder
{
$treeBuilder = new TreeBuilder('code_rhapsodie_dataflow');
if (method_exists($treeBuilder, 'getRootNode')) {
$rootNode = $treeBuilder->getRootNode();
} else {
// BC for symfony/config < 4.2
$rootNode = $treeBuilder->root('code_rhapsodie_dataflow');
}
$rootNode = $treeBuilder->getRootNode();
$rootNode
->children()

View File

@@ -72,10 +72,6 @@ class Job
->setScheduledDataflowId($scheduled->getId());
}
public function __construct()
{
}
public static function createFromArray(array $datas)
{
$lost = array_diff(static::KEYS, array_keys($datas));

View File

@@ -4,17 +4,11 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Event;
/*
use Symfony\Contracts\EventDispatcher\Event;
/**
* @codeCoverageIgnore
*/
if (class_exists(\Symfony\Contracts\EventDispatcher\Event::class)) {
// For Symfony 5.0+
abstract class CrEvent extends \Symfony\Contracts\EventDispatcher\Event
{
}
} else {
// For Symfony 3.4 to 4.4
abstract class CrEvent extends \Symfony\Component\EventDispatcher\Event
{
}
abstract class CrEvent extends Event
{
}

View File

@@ -8,6 +8,7 @@ use Monolog\Formatter\FormatterInterface;
use Monolog\Formatter\LineFormatter;
use Monolog\Handler\AbstractProcessingHandler;
use Monolog\Logger;
use Monolog\LogRecord;
class BufferHandler extends AbstractProcessingHandler
{
@@ -15,11 +16,6 @@ class BufferHandler extends AbstractProcessingHandler
private array $buffer = [];
public function __construct($level = Logger::DEBUG, bool $bubble = true)
{
parent::__construct($level, $bubble);
}
public function clearBuffer(): array
{
$logs = $this->buffer;
@@ -28,7 +24,7 @@ class BufferHandler extends AbstractProcessingHandler
return $logs;
}
protected function write(array $record): void
protected function write(array|LogRecord $record): void
{
$this->buffer[] = $record['formatted'];
}

View File

@@ -6,9 +6,10 @@ namespace CodeRhapsodie\DataflowBundle\MessengerMode;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
class JobMessageHandler implements MessageSubscriberInterface
#[AsMessageHandler]
class JobMessageHandler
{
public function __construct(private JobRepository $repository, private JobProcessorInterface $processor)
{
@@ -18,9 +19,4 @@ class JobMessageHandler implements MessageSubscriberInterface
{
$this->processor->process($this->repository->find($message->getJobId()));
}
public static function getHandledMessages(): iterable
{
return [JobMessage::class];
}
}

View File

@@ -53,12 +53,7 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
private function beforeProcessing(Job $job): void
{
// Symfony 3.4 to 4.4 call
if (!class_exists(\Symfony\Contracts\EventDispatcher\Event::class)) {
$this->dispatcher->dispatch(Events::BEFORE_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::BEFORE_PROCESSING);
}
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::BEFORE_PROCESSING);
$job
->setStatus(Job::STATUS_RUNNING)
@@ -77,11 +72,6 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
;
$this->repository->save($job);
// Symfony 3.4 to 4.4 call
if (!class_exists(\Symfony\Contracts\EventDispatcher\Event::class)) {
$this->dispatcher->dispatch(Events::AFTER_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING);
}
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING);
}
}

View File

@@ -9,9 +9,11 @@ namespace CodeRhapsodie\DataflowBundle\Repository;
*/
trait InitFromDbTrait
{
abstract private function getFields(): array;
private function initDateTime(array $datas): array
{
foreach (static::FIELDS_TYPE as $key => $type) {
foreach ($this->getFields() as $key => $type) {
if ('datetime' === $type && null !== $datas[$key]) {
$datas[$key] = new \DateTime($datas[$key]);
}

View File

@@ -21,20 +21,6 @@ class JobRepository
public const TABLE_NAME = 'cr_dataflow_job';
private const FIELDS_TYPE = [
'id' => ParameterType::INTEGER,
'status' => ParameterType::INTEGER,
'label' => ParameterType::STRING,
'dataflow_type' => ParameterType::STRING,
'options' => ParameterType::STRING,
'requested_date' => 'datetime',
'scheduled_dataflow_id' => ParameterType::INTEGER,
'count' => ParameterType::INTEGER,
'exceptions' => ParameterType::STRING,
'start_time' => 'datetime',
'end_time' => 'datetime',
];
/**
* JobRepository constructor.
*/
@@ -58,7 +44,7 @@ class JobRepository
$qb
->andWhere($qb->expr()->isNull('scheduled_dataflow_id'))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, ParameterType::INTEGER)));
$stmt = $qb->execute();
$stmt = $qb->executeQuery();
if (0 === $stmt->rowCount()) {
return [];
}
@@ -106,7 +92,7 @@ class JobRepository
$qb
->orderBy('requested_date', 'DESC')
->setMaxResults(20);
$stmt = $qb->execute();
$stmt = $qb->executeQuery();
if (0 === $stmt->rowCount()) {
return [];
}
@@ -121,7 +107,7 @@ class JobRepository
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($id, ParameterType::INTEGER)))
->orderBy('requested_date', 'DESC')
->setMaxResults(20);
$stmt = $qb->execute();
$stmt = $qb->executeQuery();
if (0 === $stmt->rowCount()) {
return [];
}
@@ -143,12 +129,12 @@ class JobRepository
}
if (null === $job->getId()) {
$this->connection->insert(static::TABLE_NAME, $datas, static::FIELDS_TYPE);
$this->connection->insert(static::TABLE_NAME, $datas, $this->getFields());
$job->setId((int) $this->connection->lastInsertId());
return;
}
$this->connection->update(static::TABLE_NAME, $datas, ['id' => $job->getId()], static::FIELDS_TYPE);
$this->connection->update(static::TABLE_NAME, $datas, ['id' => $job->getId()], $this->getFields());
}
public function createQueryBuilder($alias = null): QueryBuilder
@@ -162,11 +148,28 @@ class JobRepository
private function returnFirstOrNull(QueryBuilder $qb): ?Job
{
$stmt = $qb->execute();
$stmt = $qb->executeQuery();
if (0 === $stmt->rowCount()) {
return null;
}
return Job::createFromArray($this->initDateTime($this->initArray($stmt->fetchAssociative())));
}
private function getFields(): array
{
return [
'id' => ParameterType::INTEGER,
'status' => ParameterType::INTEGER,
'label' => ParameterType::STRING,
'dataflow_type' => ParameterType::STRING,
'options' => ParameterType::STRING,
'requested_date' => 'datetime',
'scheduled_dataflow_id' => ParameterType::INTEGER,
'count' => ParameterType::INTEGER,
'exceptions' => ParameterType::STRING,
'start_time' => 'datetime',
'end_time' => 'datetime',
];
}
}

View File

@@ -20,16 +20,6 @@ class ScheduledDataflowRepository
public const TABLE_NAME = 'cr_dataflow_scheduled';
private const FIELDS_TYPE = [
'id' => ParameterType::INTEGER,
'label' => ParameterType::STRING,
'dataflow_type' => ParameterType::STRING,
'options' => ParameterType::STRING,
'frequency' => ParameterType::STRING,
'next' => 'datetime',
'enabled' => ParameterType::BOOLEAN,
];
/**
* JobRepository constructor.
*/
@@ -46,11 +36,11 @@ class ScheduledDataflowRepository
{
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->lte('next', $qb->createNamedParameter(new \DateTime(), 'datetime')))
->andWhere($qb->expr()->eq('enabled', 1))
->andWhere($qb->expr()->eq('enabled', $qb->createNamedParameter(1, ParameterType::INTEGER)))
->orderBy('next', 'ASC')
;
$stmt = $qb->execute();
$stmt = $qb->executeQuery();
if (0 === $stmt->rowCount()) {
return [];
}
@@ -74,7 +64,7 @@ class ScheduledDataflowRepository
$qb = $this->createQueryBuilder();
$qb->orderBy('label', 'ASC');
$stmt = $qb->execute();
$stmt = $qb->executeQuery();
if (0 === $stmt->rowCount()) {
return [];
}
@@ -92,7 +82,7 @@ class ScheduledDataflowRepository
->orderBy('w.label', 'ASC')
->groupBy('w.id');
return $query->execute()->fetchAllAssociative();
return $query->executeQuery()->fetchAllAssociative();
}
public function save(ScheduledDataflow $scheduledDataflow)
@@ -105,12 +95,12 @@ class ScheduledDataflowRepository
}
if (null === $scheduledDataflow->getId()) {
$this->connection->insert(static::TABLE_NAME, $datas, static::FIELDS_TYPE);
$this->connection->insert(static::TABLE_NAME, $datas, $this->getFields());
$scheduledDataflow->setId((int) $this->connection->lastInsertId());
return;
}
$this->connection->update(static::TABLE_NAME, $datas, ['id' => $scheduledDataflow->getId()], static::FIELDS_TYPE);
$this->connection->update(static::TABLE_NAME, $datas, ['id' => $scheduledDataflow->getId()], $this->getFields());
}
public function delete(int $id): void
@@ -138,11 +128,24 @@ class ScheduledDataflowRepository
private function returnFirstOrNull(QueryBuilder $qb): ?ScheduledDataflow
{
$stmt = $qb->execute();
$stmt = $qb->executeQuery();
if (0 === $stmt->rowCount()) {
return null;
}
return ScheduledDataflow::createFromArray($this->initDateTime($this->initArray($stmt->fetchAssociative())));
}
private function getFields(): array
{
return [
'id' => ParameterType::INTEGER,
'label' => ParameterType::STRING,
'dataflow_type' => ParameterType::STRING,
'options' => ParameterType::STRING,
'frequency' => ParameterType::STRING,
'next' => 'datetime',
'enabled' => ParameterType::BOOLEAN,
];
}
}

View File

@@ -45,9 +45,15 @@ services:
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\SchemaCommand:
deprecated:
package: 'code-rhapsodie/dataflow-bundle'
version: '5.0'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\DatabaseSchemaCommand:
arguments:
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
tags: ['console.command']
tags: [ 'console.command' ]
CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository:
lazy: true

View File

@@ -47,7 +47,8 @@ class DataflowSchemaProvider
$tableSchedule->addColumn('next', 'datetime', ['notnull' => false]);
$tableSchedule->addColumn('enabled', 'boolean', ['notnull' => true]);
$tableJob->addForeignKeyConstraint($tableSchedule, ['scheduled_dataflow_id'], ['id']);
$tableJob->addForeignKeyConstraint($tableSchedule->getName(), ['scheduled_dataflow_id'], ['id']);
$tableJob->addIndex(['status'], 'idx_status');
return $schema;
}

View File

@@ -13,7 +13,7 @@ class FrequencyValidator extends ConstraintValidator
/**
* {@inheritdoc}
*/
public function validate($value, Constraint $constraint)
public function validate(mixed $value, Constraint $constraint)
{
if (!$constraint instanceof Frequency) {
throw new UnexpectedTypeException($constraint, Frequency::class);
@@ -23,7 +23,12 @@ class FrequencyValidator extends ConstraintValidator
return;
}
$interval = @\DateInterval::createFromDateString($value);
try {
$interval = \DateInterval::createFromDateString($value);
} catch (\Exception){
$interval = false;
}
if (!$interval) {
$this->context->buildViolation($constraint->message)
->setParameter('{{ string }}', $value)
@@ -42,8 +47,6 @@ class FrequencyValidator extends ConstraintValidator
->setParameter('{{ string }}', $value)
->addViolation()
;
return;
}
}
}