10 Commits

Author SHA1 Message Date
jeremycr
d89d7b72b0 Updated changelog for 3.1.0 (#64) 2021-04-21 10:40:27 +02:00
jeremycr
3cf05b555d Added messenger mode (#63) 2021-04-21 09:35:39 +02:00
Mathieu Ledru
9624f68675 Introduce the possibility to add asynchronous steps (#61) 2021-04-21 09:20:48 +02:00
jeremycr
b191e33c47 Added PHP 8 support and DBAL 3 support (#59) 2021-03-23 09:06:04 +01:00
jbcr
8bb55b1303 Update CHANGELOG.md (#56) 2021-01-15 15:18:36 +01:00
jeremycr
f0459462f7 Improved logging (#55)
* Improved logging

* Better handling of default logger

* Update src/DataflowType/Dataflow/Dataflow.php

* Updated README

Co-authored-by: jbcr <51637606+jbcr@users.noreply.github.com>
2021-01-15 14:38:48 +01:00
jeremycr
5a76c11bc6 Changelog for v2.1.1 (#54) 2020-12-02 15:44:23 +01:00
mdavid1297
d7efd85c8e Fix bug DateTime Oneshot (#53)
Co-authored-by: Marc DAVID <marc@Mac.local>
2020-12-02 15:13:27 +01:00
Jean-Baptiste Nahan
b0d17c31cc Fix some error with Symfony 5 and Symfoy 3.4 (#52)
* add Symfony 5 configuration initialisation with backward compatibility

* fix return value at end of command
2020-09-16 14:23:40 +02:00
Arnaud Lafon
e72d0d5e8d Bumped sf/di requirements to 4.1.12 minimum when 4.x used (#50) 2020-04-01 11:21:00 +02:00
35 changed files with 1279 additions and 316 deletions

View File

@@ -23,66 +23,76 @@ env:
matrix:
fast_finish: true
include:
- php: '7.1'
- php: '7.2'
- php: '7.3'
- php: '7.4'
- php: '8.0'
# Enable code coverage with the previous supported PHP version
# - php: '7.4'
# env:
# - SYMFONY_VERSION=3.4.*
# - COVERALLS_ENABLED="true"
# - PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
# Enable code coverage with the latest supported PHP version
- php: '7.3'
env:
- SYMFONY_VERSION=3.4.*
- COVERALLS_ENABLED="true"
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
# - php: '8.0'
# env:
# - SYMFONY_VERSION=3.4.*
# - COVERALLS_ENABLED="true"
# - PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
# Minimum supported dependencies with the latest and oldest supported PHP versions
- php: '7.1'
env:
- COMPOSER_FLAGS="--prefer-lowest"
- php: '7.3'
env:
- COMPOSER_FLAGS="--prefer-lowest"
# Incompatibility between lowest symfony testing utils and phpunit
# - php: '8.0'
# env:
# - COMPOSER_FLAGS="--prefer-lowest"
# Test each supported Symfony version with lowest supported PHP version
- php: '7.1'
env:
- SYMFONY_VERSION=3.4.*
- php: '7.1'
env:
- SYMFONY_VERSION=4.3.*
- php: '7.1'
# - php: '7.3'
# env:
# - SYMFONY_VERSION=3.4.*
- php: '7.3'
env:
- SYMFONY_VERSION=4.4.*
- php: '7.2'
- php: '7.3'
env:
- COVERALLS_ENABLED="true"
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
- SYMFONY_VERSION=5.0.*
- SYMFONY_VERSION=5.2.*
# Test unsupported versions of Symfony
- php: '7.1'
# - php: '7.3'
# env:
# - SYMFONY_VERSION=4.1.*
# - php: '7.3'
# env:
# - SYMFONY_VERSION=4.2.*
# - php: '7.3'
# env:
# - SYMFONY_VERSION=4.3.*
- php: '7.3'
env:
- SYMFONY_VERSION=4.0.*
- php: '7.1'
- SYMFONY_VERSION=5.0.*
- php: '7.3'
env:
- SYMFONY_VERSION=4.1.*
- php: '7.1'
env:
- SYMFONY_VERSION=4.2.*
- SYMFONY_VERSION=5.1.*
# Test upcoming Symfony versions with lowest supported PHP version and dev dependencies
- php: '7.2'
env:
- STABILITY=dev
- SYMFONY_VERSION=5.1.*
# - php: '7.2'
# env:
# - STABILITY=dev
# - SYMFONY_VERSION=5.3.*
# Test upcoming PHP versions with dev dependencies
- php: '7.4snapshot'
env:
- STABILITY=dev
- COMPOSER_FLAGS="--ignore-platform-reqs --prefer-stable"
#- php: '7.5snapshot'
# env:
# - STABILITY=dev
# - COMPOSER_FLAGS="--ignore-platform-reqs --prefer-stable"
allow_failures:
# 4.0 not supported because of https://github.com/advisories/GHSA-pgwj-prpq-jpc2
- env:
- SYMFONY_VERSION=4.0.*
- env:
@@ -95,6 +105,9 @@ matrix:
- env:
- STABILITY=dev
- SYMFONY_VERSION=5.1.*
- env:
- STABILITY=dev
- SYMFONY_VERSION=5.2.*
before_install:
- if [[ "$SYMFONY_VERSION" != "" ]]; then
@@ -108,11 +121,11 @@ before_install:
phpenv config-rm xdebug.ini || true;
fi
- if [[ "$COVERALLS_ENABLED" == "true" ]]; then
travis_retry composer require --dev satooshi/php-coveralls:^2.0 --no-update $COMPOSER_FLAGS;
travis_retry composer require --dev php-coveralls/php-coveralls:^2.0 --no-update $COMPOSER_FLAGS;
fi
install:
- travis_retry composer update --prefer-dist --no-interaction --no-suggest --no-progress --ansi $COMPOSER_FLAGS
- travis_retry composer update --prefer-dist --no-interaction --no-progress --ansi $COMPOSER_FLAGS
script: ./vendor/bin/phpunit $PHPUNIT_FLAGS

View File

@@ -1,3 +1,23 @@
# Version 3.1.0
* Added optional "messenger mode", to delegate jobs execution to workers from the Symfony messenger component
* Added support for asynchronous steps execution, using the AMPHP library (contribution from [matyo91](https://github.com/matyo91))
# Version 3.0.0
* Added PHP 8 support
* PHP minimum requirements bumped to 7.3
* Added Doctrine DBAL 3 support
* Doctrine DBAL minimum requirements bumped to 2.12
# Version 2.2.0
* Improve logging Dataflow job
# Version 2.1.1
* Fixed some Symfony 5 compatibility issues
# Version 2.1.0
* Added CollectionWriter and DelegatorWriter

View File

@@ -3,13 +3,13 @@
DataflowBundle is a bundle for Symfony 3.4+
providing an easy way to create import / export dataflow.
[![Build Status](https://travis-ci.org/code-rhapsodie/dataflow-bundle.svg?branch=master)](https://travis-ci.org/code-rhapsodie/dataflow-bundle)
[![Build Status](https://travis-ci.com/code-rhapsodie/dataflow-bundle.svg?branch=master)](https://travis-ci.com/code-rhapsodie/dataflow-bundle)
[![Coverage Status](https://coveralls.io/repos/github/code-rhapsodie/dataflow-bundle/badge.svg)](https://coveralls.io/github/code-rhapsodie/dataflow-bundle)
Dataflow uses a linear generic workflow in three parts:
* one reader
* any number of steps
* any number of steps that can be synchronous or asynchronous
* one or more writers
The reader can read data from anywhere and return data row by row. Each step processes the current row data.
@@ -35,6 +35,10 @@ As the following schema shows, you can define more than one dataflow:
## Installation
Security notice: Symfony 4.x is not supported before 4.1.12, see https://github.com/advisories/GHSA-pgwj-prpq-jpc2
And basically, every allowed-to-failed jobs in our travis configuration are not fully supported.
### Add the dependency
To install this bundle, run this command :
@@ -126,6 +130,39 @@ code_rhapsodie_dataflow:
dbal_default_connection: test #Name of the default connection used by Dataflow bundle
```
By default, the `logger` service will be used to log all exceptions and custom messages.
If you want to use another logger, like a specific Monolog handler, Add this configuration:
```yaml
code_rhapsodie_dataflow:
default_logger: monolog.logger.custom #Service ID of the logger you want Dataflow to use
```
### Messenger mode
Dataflow can delegate the execution of its jobs to the Symfony messenger component, if available.
This allows jobs to be executed concurrently by workers instead of sequentially.
To enable messenger mode:
```yaml
code_rhapsodie_dataflow:
messenger_mode:
enabled: true
# bus: 'messenger.default_bus' #Service ID of the bus you want Dataflow to use, if not the default one
```
You also need to route Dataflow messages to the proper transport:
```yaml
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage: async
```
## Define a dataflow type
This bundle uses a fixed and simple workflow structure in order to let you focus on the data processing logic part of your dataflow.
@@ -204,6 +241,7 @@ If you're using Symfony auto-configuration for your services, this tag will be a
Otherwise, manually add the tag `coderhapsodie.dataflow.type` in your dataflow type service configuration:
```yaml
```yaml
CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType:
tags:
@@ -235,6 +273,34 @@ class MyFirstDataflowType extends AbstractDataflowType
With this configuration, the option `fileName` is required. For an advanced usage of the option resolver, read the [Symfony documentation](https://symfony.com/doc/current/components/options_resolver.html).
For asynchronous management, `AbstractDataflowType` come with two default options :
- loopInterval : default to 0. Update this interval if you wish customise the `tick` loop duration.
- emitInterval : default to 0. Update this interval to have a control when reader must emit new data in the flow pipeline.
### Logging
All exceptions will be caught and written in the logger.
If you want to add custom messages in the log, you can inject the logger in your readers / steps / writers.
If your DataflowType class extends `AbstractDataflowType`, the logger is accessible as `$this->logger`.
```php
<?php
// ...
use Symfony\Component\OptionsResolver\OptionsResolver;
class MyDataflowType extends AbstractDataflowType
{
// ...
protected function buildDataflow(DataflowBuilder $builder, array $options): void
{
$this->myWriter->setLogger($this->logger);
}
}
```
When using the `code-rhapsodie:dataflow:run-pending` command, this logger will also be used to save the log in the corresponding job in the database.
### Check if your DataflowType is ready
Execute this command to check if your DataflowType is correctly registered:
@@ -304,6 +370,7 @@ $builder->setReader(($this->myReader)())
*Steps* are operations performed on the elements before they are handled by the *Writers*. Usually, steps are either:
- converters, that alter the element
- filters, that conditionally prevent further operations on the element
- generators, that can include asynchronous operations
A *Step* can be any callable, taking the element as its argument, and returning either:
- the element, possibly altered
@@ -321,6 +388,16 @@ $builder->addStep(function ($item) {
return $item;
});
// asynchronous step with 2 scale factor
$builder->addStep(function ($item): \Generator {
yield new \Amp\Delayed(1000); // asynchronous processing for 1 second long
// Titles are changed to all caps before export
$item['title'] = strtolower($item['title']);
return $item;
}, 2);
$builder->addStep(function ($item) {
// Private items are not exported
if ($item['private']) {
@@ -332,6 +409,8 @@ $builder->addStep(function ($item) {
//[...]
```
Note : you can ensure writing order for asynchronous operations if all steps are scaled at 1 factor.
### Writers
*Writers* perform the actual import / export operations.
@@ -491,6 +570,8 @@ Several commands are provided to manage schedules and run jobs.
`code-rhapsodie:dataflow:run-pending` Executes job in the queue according to their schedule.
When messenger mode is enabled, jobs will still be created according to their schedule, but execution will be handled by the messenger component instead.
`code-rhapsodie:dataflow:schedule:list` Display the list of dataflows scheduled.
`code-rhapsodie:dataflow:schedule:change-status` Enable or disable a scheduled dataflow

View File

@@ -0,0 +1,49 @@
<?php
namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType\Dataflow;
use Amp\Delayed;
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\AMPAsyncDataflow;
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\Dataflow;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use PHPUnit\Framework\TestCase;
class AMPAsyncDataflowTest extends TestCase
{
public function testProcess()
{
$reader = [1, 2, 3];
$result = [];
$dataflow = new AMPAsyncDataflow($reader, 'simple');
$dataflow->addStep(static function($item) {
return $item + 1;
});
$dataflow->addStep(static function($item): \Generator {
yield new Delayed(10); //delay 10 milliseconds
return $item * 2;
});
$dataflow->addWriter(new class($result) implements WriterInterface {
private $buffer;
public function __construct(&$buffer) {
$this->buffer = &$buffer;
}
public function prepare()
{
}
public function write($item)
{
$this->buffer[] = $item;
}
public function finish()
{
}
});
$dataflow->process();
self::assertSame([4, 6, 8], $result);
}
}

View File

@@ -0,0 +1,58 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Tests\MessengerMode;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage;
use CodeRhapsodie\DataflowBundle\MessengerMode\JobMessageHandler;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
class JobMessageHandlerTest extends TestCase
{
/** @var JobRepository|MockObject */
private $repository;
/** @var JobProcessorInterface|MockObject */
private $processor;
/** @var JobMessageHandler */
private $handler;
protected function setUp(): void
{
$this->repository = $this->createMock(JobRepository::class);
$this->processor = $this->createMock(JobProcessorInterface::class);
$this->handler = new JobMessageHandler($this->repository, $this->processor);
}
public function testGetHandledMessages()
{
$this->assertSame([JobMessage::class], JobMessageHandler::getHandledMessages());
}
public function testInvoke()
{
$message = new JobMessage($id = 32);
$this->repository
->expects($this->once())
->method('find')
->with($id)
->willReturn($job = new Job())
;
$this->processor
->expects($this->once())
->method('process')
->with($job)
;
($this->handler)($message);
}
}

View File

@@ -0,0 +1,120 @@
<?php
namespace CodeRhapsodie\DataflowBundle\Tests\Processor;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowTypeInterface;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessor;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class JobProcessorTest extends TestCase
{
/** @var JobProcessor */
private $processor;
/** @var JobRepository|MockObject */
private $repository;
/** @var DataflowTypeRegistryInterface|MockObject */
private $registry;
/** @var EventDispatcherInterface|MockObject */
private $dispatcher;
protected function setUp(): void
{
$this->repository = $this->createMock(JobRepository::class);
$this->registry = $this->createMock(DataflowTypeRegistryInterface::class);
$this->dispatcher = $this->createMock(EventDispatcherInterface::class);
$this->processor = new JobProcessor($this->repository, $this->registry, $this->dispatcher);
}
public function testProcess()
{
$now = new \DateTimeImmutable();
$job = (new Job())
->setStatus(Job::STATUS_PENDING)
->setDataflowType($type = 'type')
->setOptions($options = ['option1' => 'value1'])
;
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher
->expects($this->exactly(2))
->method('dispatch')
->withConsecutive(
[
Events::BEFORE_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job) {
return $event->getJob() === $job;
})
],
[
Events::AFTER_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job) {
return $event->getJob() === $job;
})
],
);
} else { // Symfony 5.0+
$this->dispatcher
->expects($this->exactly(2))
->method('dispatch')
->withConsecutive(
[
$this->callback(function (ProcessingEvent $event) use ($job) {
return $event->getJob() === $job;
}),
Events::BEFORE_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job) {
return $event->getJob() === $job;
}),
Events::AFTER_PROCESSING,
],
);
}
$dataflowType = $this->createMock(DataflowTypeInterface::class);
$this->registry
->expects($this->once())
->method('getDataflowType')
->with($type)
->willReturn($dataflowType)
;
$bag = [new \Exception('message1')];
$result = new Result('name', new \DateTimeImmutable(), $end = new \DateTimeImmutable(), $count = 10, $bag);
$dataflowType
->expects($this->once())
->method('process')
->with($options)
->willReturn($result)
;
$this->repository
->expects($this->exactly(2))
->method('save')
;
$this->processor->process($job);
$this->assertGreaterThanOrEqual($now, $job->getStartTime());
$this->assertSame(Job::STATUS_COMPLETED, $job->getStatus());
$this->assertSame($end, $job->getEndTime());
$this->assertSame($count - count($bag), $job->getCount());
}
}

View File

@@ -0,0 +1,72 @@
<?php
namespace CodeRhapsodie\DataflowBundle\Tests\Runner;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Runner\MessengerDataflowRunner;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
class MessengerDataflowRunnerTest extends TestCase
{
/** @var MessengerDataflowRunner */
private $runner;
/** @var JobRepository|MockObject */
private $repository;
/** @var MessageBusInterface|MockObject */
private $bus;
protected function setUp(): void
{
$this->repository = $this->createMock(JobRepository::class);
$this->bus = $this->createMock(MessageBusInterface::class);
$this->runner = new MessengerDataflowRunner($this->repository, $this->bus);
}
public function testRunPendingDataflows()
{
$job1 = (new Job())->setId($id1 = 10);
$job2 = (new Job())->setId($id2 = 20);
$this->repository
->expects($this->exactly(3))
->method('findNextPendingDataflow')
->willReturnOnConsecutiveCalls($job1, $job2, null)
;
$this->repository
->expects($this->exactly(2))
->method('save')
->withConsecutive([$job1], [$job2])
;
$this->bus
->expects($this->exactly(2))
->method('dispatch')
->withConsecutive([
$this->callback(function ($message) use ($id1) {
return $message instanceof JobMessage && $message->getJobId() === $id1;
})
], [
$this->callback(function ($message) use ($id2) {
return $message instanceof JobMessage && $message->getJobId() === $id2;
})
])
->willReturnOnConsecutiveCalls(
new Envelope(new JobMessage($id1)),
new Envelope(new JobMessage($id2))
)
;
$this->runner->runPendingDataflows();
$this->assertSame(Job::STATUS_QUEUED, $job1->getStatus());
$this->assertSame(Job::STATUS_QUEUED, $job2->getStatus());
}
}

View File

@@ -2,18 +2,12 @@
namespace CodeRhapsodie\DataflowBundle\Tests\Runner;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowTypeInterface;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner;
use Doctrine\ORM\EntityManagerInterface;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class PendingDataflowRunnerTest extends TestCase
{
@@ -23,34 +17,21 @@ class PendingDataflowRunnerTest extends TestCase
/** @var JobRepository|MockObject */
private $repository;
/** @var DataflowTypeRegistryInterface|MockObject */
private $registry;
/** @var EventDispatcherInterface|MockObject */
private $dispatcher;
/** @var JobProcessorInterface|MockObject */
private $processor;
protected function setUp(): void
{
$this->repository = $this->createMock(JobRepository::class);
$this->registry = $this->createMock(DataflowTypeRegistryInterface::class);
$this->dispatcher = $this->createMock(EventDispatcherInterface::class);
$this->processor = $this->createMock(JobProcessorInterface::class);
$this->runner = new PendingDataflowRunner($this->repository, $this->registry, $this->dispatcher);
$this->runner = new PendingDataflowRunner($this->repository, $this->processor);
}
public function testRunPendingDataflows()
{
$now = new \DateTime();
$job1 = (new Job())
->setStatus(Job::STATUS_PENDING)
->setDataflowType($type1 = 'type1')
->setOptions($options1 = ['option1' => 'value1'])
;
$job2 = (new Job())
->setStatus(Job::STATUS_PENDING)
->setDataflowType($type2 = 'type2')
->setOptions($options2 = ['option2' => 'value2'])
;
$job1 = new Job();
$job2 = new Job();
$this->repository
->expects($this->exactly(3))
@@ -58,113 +39,12 @@ class PendingDataflowRunnerTest extends TestCase
->willReturnOnConsecutiveCalls($job1, $job2, null)
;
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher
->expects($this->exactly(4))
->method('dispatch')
->withConsecutive(
[
Events::BEFORE_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
})
],
[
Events::AFTER_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
})
],
[
Events::BEFORE_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
})
],
[
Events::AFTER_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
})
]
);
} else { // Symfony 5.0+
$this->dispatcher
->expects($this->exactly(4))
->method('dispatch')
->withConsecutive(
[
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
}),
Events::BEFORE_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
}),
Events::AFTER_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
}),
Events::BEFORE_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
}),
Events::AFTER_PROCESSING,
]
);
}
$dataflowType1 = $this->createMock(DataflowTypeInterface::class);
$dataflowType2 = $this->createMock(DataflowTypeInterface::class);
$this->registry
$this->processor
->expects($this->exactly(2))
->method('getDataflowType')
->withConsecutive([$type1], [$type2])
->willReturnOnConsecutiveCalls($dataflowType1, $dataflowType2)
;
$bag1 = [new \Exception('message1')];
$bag2 = [new \Exception('message2')];
$result1 = new Result('name', new \DateTime(), $end1 = new \DateTime(), $count1 = 10, $bag1);
$result2 = new Result('name', new \DateTime(), $end2 = new \DateTime(), $count2 = 20, $bag2);
$dataflowType1
->expects($this->once())
->method('process')
->with($options1)
->willReturn($result1)
;
$dataflowType2
->expects($this->once())
->method('process')
->with($options2)
->willReturn($result2)
;
$this->repository
->expects($this->exactly(4))
->method('save')
->withConsecutive([$job1], [$job2])
;
$this->runner->runPendingDataflows();
$this->assertGreaterThanOrEqual($now, $job1->getStartTime());
$this->assertSame(Job::STATUS_COMPLETED, $job1->getStatus());
$this->assertSame($end1, $job1->getEndTime());
$this->assertSame($count1 - count($bag1), $job1->getCount());
$this->assertGreaterThanOrEqual($now, $job2->getStartTime());
$this->assertSame(Job::STATUS_COMPLETED, $job2->getStatus());
$this->assertSame($end2, $job2->getEndTime());
$this->assertSame($count2 - count($bag2), $job2->getCount());
}
}

View File

@@ -2,7 +2,12 @@
"name": "code-rhapsodie/dataflow-bundle",
"description": "Data processing framework inspired by PortPHP",
"type": "symfony-bundle",
"keywords": ["dataflow", "import", "export", "data processing"],
"keywords": [
"dataflow",
"import",
"export",
"data processing"
],
"license": "MIT",
"authors": [
{
@@ -36,24 +41,31 @@
}
},
"require": {
"php": "^7.1",
"doctrine/dbal": "^2.0",
"php": "^7.3||^8.0",
"ext-json": "*",
"doctrine/dbal": "^2.12||^3.0",
"doctrine/doctrine-bundle": "^1.0||^2.0",
"psr/log": "^1.1",
"symfony/config": "^3.4||^4.0||^5.0",
"symfony/console": "^3.4||^4.0||^5.0",
"symfony/dependency-injection": "^3.4||^4.0||^5.0",
"symfony/dependency-injection": "^3.4||>=4.1.12||^5.0",
"symfony/event-dispatcher": "^3.4||^4.0||^5.0",
"symfony/http-kernel": "^3.4||^4.0||^5.0",
"symfony/lock": "^3.4||^4.0||^5.0",
"symfony/monolog-bridge": "^3.4||^4.0||^5.0",
"symfony/options-resolver": "^3.4||^4.0||^5.0",
"symfony/validator": "^3.4||^4.0||^5.0",
"symfony/yaml": "^3.4||^4.0||^5.0",
"doctrine/doctrine-bundle": "^1.0||^2.0"
"symfony/yaml": "^3.4||^4.0||^5.0"
},
"require-dev": {
"phpunit/phpunit": "^7||^8"
"amphp/amp": "^2.5",
"phpunit/phpunit": "^7||^8||^9",
"symfony/messenger": "^4.4||^5.0"
},
"suggest": {
"portphp/portphp": "Provides generic readers, steps and writers for your dataflows."
"amphp/amp": "Provide asynchronous steps for your dataflows",
"portphp/portphp": "Provides generic readers, steps and writers for your dataflows.",
"symfony/messenger": "Allows messenger mode, i.e. letting workers run jobs"
},
"config": {
"sort-packages": true

View File

@@ -5,7 +5,9 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle;
use CodeRhapsodie\DataflowBundle\DependencyInjection\CodeRhapsodieDataflowExtension;
use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\BusCompilerPass;
use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\DataflowTypeCompilerPass;
use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\DefaultLoggerCompilerPass;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\HttpKernel\Bundle\Bundle;
@@ -23,6 +25,10 @@ class CodeRhapsodieDataflowBundle extends Bundle
public function build(ContainerBuilder $container)
{
$container->addCompilerPass(new DataflowTypeCompilerPass());
$container
->addCompilerPass(new DataflowTypeCompilerPass())
->addCompilerPass(new DefaultLoggerCompilerPass())
->addCompilerPass(new BusCompilerPass())
;
}
}

View File

@@ -6,20 +6,24 @@ namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
/**
* Runs one dataflow.
*
* @codeCoverageIgnore
*/
class ExecuteDataflowCommand extends Command
class ExecuteDataflowCommand extends Command implements LoggerAwareInterface
{
use LoggerAwareTrait;
protected static $defaultName = 'code-rhapsodie:dataflow:execute';
/** @var DataflowTypeRegistryInterface */
@@ -28,16 +32,12 @@ class ExecuteDataflowCommand extends Command
/** @var ConnectionFactory */
private $connectionFactory;
/** @var LoggerInterface */
private $logger;
public function __construct(DataflowTypeRegistryInterface $registry, ConnectionFactory $connectionFactory, LoggerInterface $logger)
public function __construct(DataflowTypeRegistryInterface $registry, ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->registry = $registry;
$this->connectionFactory = $connectionFactory;
$this->logger = $logger;
}
/**
@@ -68,22 +68,22 @@ EOF
}
$fqcnOrAlias = $input->getArgument('fqcn');
$options = json_decode($input->getArgument('options'), true);
$io = new SymfonyStyle($input, $output);
$dataflowType = $this->registry->getDataflowType($fqcnOrAlias);
if ($dataflowType instanceof LoggerAwareInterface && isset($this->logger)) {
$dataflowType->setLogger($this->logger);
}
$result = $dataflowType->process($options);
$output->writeln('Executed: '.$result->getName());
$output->writeln('Start time: '.$result->getStartTime()->format('Y/m/d H:i:s'));
$output->writeln('End time: '.$result->getEndTime()->format('Y/m/d H:i:s'));
$output->writeln('Success: '.$result->getSuccessCount());
$io->writeln('Executed: '.$result->getName());
$io->writeln('Start time: '.$result->getStartTime()->format('Y/m/d H:i:s'));
$io->writeln('End time: '.$result->getEndTime()->format('Y/m/d H:i:s'));
$io->writeln('Success: '.$result->getSuccessCount());
if ($result->hasErrors() > 0) {
$output->writeln('<error> Errors: '.$result->getErrorCount().' </error>');
$output->writeln('<error> Exceptions traces are available in the logs. </error>');
foreach ($result->getExceptions() as $e) {
$this->logger->error('Error during processing : '.$e->getMessage(), ['exception' => $e]);
}
if ($result->hasErrors()) {
$io->error("Errors: {$result->getErrorCount()}\nExceptions traces are available in the logs.");
return 1;
}

View File

@@ -95,5 +95,7 @@ class SchemaCommand extends Command
foreach ($sqls as $sql) {
$io->text($sql.';');
}
return 0;
}
}

View File

@@ -0,0 +1,83 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\AMPAsyncDataflow;
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\DataflowInterface;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
class AMPAsyncDataflowBuilder extends DataflowBuilder
{
/** @var int */
protected $loopInterval;
/** @var int */
protected $emitInterval;
public function __construct(?int $loopInterval = 0, ?int $emitInterval = 0)
{
$this->loopInterval = $loopInterval;
$this->emitInterval = $emitInterval;
}
/** @var string */
private $name;
/** @var iterable */
private $reader;
/** @var array */
private $steps = [];
/** @var WriterInterface[] */
private $writers = [];
public function setName(string $name): self
{
$this->name = $name;
return $this;
}
public function setReader(iterable $reader): self
{
$this->reader = $reader;
return $this;
}
public function addStep(callable $step, int $priority = 0, int $scale = 1): self
{
$this->steps[$priority][] = ['step' => $step, 'scale' => $scale];
return $this;
}
public function addWriter(WriterInterface $writer): self
{
$this->writers[] = $writer;
return $this;
}
public function getDataflow(): DataflowInterface
{
$dataflow = new AMPAsyncDataflow($this->reader, $this->name, $this->loopInterval, $this->emitInterval);
krsort($this->steps);
foreach ($this->steps as $stepArray) {
foreach ($stepArray as $step) {
$dataflow->addStep($step['step'], $step['scale']);
}
}
foreach ($this->writers as $writer) {
$dataflow->addWriter($writer);
}
return $dataflow;
}
}

View File

@@ -4,10 +4,15 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
abstract class AbstractDataflowType implements DataflowTypeInterface
abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
/**
* @codeCoverageIgnore
*/
@@ -22,15 +27,22 @@ abstract class AbstractDataflowType implements DataflowTypeInterface
$this->configureOptions($optionsResolver);
$options = $optionsResolver->resolve($options);
$builder = (new DataflowBuilder())
->setName($this->getLabel())
;
$builder = $this->createDataflowBuilder();
$builder->setName($this->getLabel());
$this->buildDataflow($builder, $options);
$dataflow = $builder->getDataflow();
if ($dataflow instanceof LoggerAwareInterface && $this->logger instanceof LoggerInterface) {
$dataflow->setLogger($this->logger);
}
return $dataflow->process();
}
protected function createDataflowBuilder(): DataflowBuilder
{
return new DataflowBuilder();
}
/**
* @codeCoverageIgnore
*/

View File

@@ -0,0 +1,188 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType\Dataflow;
use function Amp\coroutine;
use Amp\Deferred;
use Amp\Delayed;
use Amp\Loop;
use Amp\Producer;
use Amp\Promise;
use function Amp\Promise\wait;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use RuntimeException;
use Throwable;
class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
/** @var string */
private $name;
/** @var iterable */
private $reader;
/** @var callable[] */
private $steps;
/** @var WriterInterface[] */
private $writers;
/** @var int */
private $loopInterval;
/** @var int */
private $emitInterval;
/** @var array */
private $states;
/** @var array */
private $stepsJobs;
public function __construct(iterable $reader, ?string $name, ?int $loopInterval = 0, ?int $emitInterval = 0)
{
$this->reader = $reader;
$this->name = $name;
$this->steps = [];
$this->writers = [];
$this->loopInterval = $loopInterval;
$this->emitInterval = $emitInterval;
$this->states = [];
$this->stepsJobs = [];
if (!function_exists('Amp\\Promise\\wait')) {
throw new RuntimeException('Amp is not loaded. Suggest install it with composer require amphp/amp');
}
}
/**
* @param int $scale
*
* @return $this
*/
public function addStep(callable $step, $scale = 1): self
{
$this->steps[] = [$step, $scale];
return $this;
}
/**
* @return $this
*/
public function addWriter(WriterInterface $writer): self
{
$this->writers[] = $writer;
return $this;
}
/**
* {@inheritdoc}
*/
public function process(): Result
{
$count = 0;
$exceptions = [];
$startTime = new \DateTimeImmutable();
try {
foreach ($this->writers as $writer) {
$writer->prepare();
}
$deferred = new Deferred();
$resolved = false; //missing $deferred->isResolved() in version 2.5
$producer = new Producer(function (callable $emit) {
foreach ($this->reader as $index => $item) {
yield new Delayed($this->emitInterval);
yield $emit([$index, $item]);
}
});
$watcherId = Loop::repeat($this->loopInterval, function () use ($deferred, &$resolved, $producer, &$count, &$exceptions) {
if (yield $producer->advance()) {
$it = $producer->getCurrent();
[$index, $item] = $it;
$this->states[$index] = [$index, 0, $item];
} elseif (!$resolved && 0 === count($this->states)) {
$resolved = true;
$deferred->resolve();
}
foreach ($this->states as $state) {
$this->processState($state, $count, $exceptions);
}
});
wait($deferred->promise());
Loop::cancel($watcherId);
foreach ($this->writers as $writer) {
$writer->finish();
}
} catch (\Throwable $e) {
$exceptions[] = $e;
$this->logException($e);
}
return new Result($this->name, $startTime, new \DateTimeImmutable(), $count, $exceptions);
}
/**
* @param mixed $state
* @param int $count internal count reference
* @param array $exceptions internal exceptions
*/
private function processState($state, int &$count, array &$exceptions): void
{
[$readIndex, $stepIndex, $item] = $state;
if ($stepIndex < count($this->steps)) {
if (!isset($this->stepsJobs[$stepIndex])) {
$this->stepsJobs[$stepIndex] = [];
}
[$step, $scale] = $this->steps[$stepIndex];
if (count($this->stepsJobs[$stepIndex]) < $scale && !isset($this->stepsJobs[$stepIndex][$readIndex])) {
$this->stepsJobs[$stepIndex][$readIndex] = true;
/** @var Promise<void> $promise */
$promise = coroutine($step)($item);
$promise->onResolve(function (?Throwable $exception = null, $newItem = null) use ($stepIndex, $readIndex, &$exceptions) {
if ($exception) {
$exceptions[$stepIndex] = $exception;
$this->logException($exception, (string) $stepIndex);
} elseif (false === $newItem) {
unset($this->states[$readIndex]);
} else {
$this->states[$readIndex] = [$readIndex, $stepIndex + 1, $newItem];
}
unset($this->stepsJobs[$stepIndex][$readIndex]);
});
}
} else {
unset($this->states[$readIndex]);
foreach ($this->writers as $writer) {
$writer->write($item);
}
++$count;
}
}
private function logException(Throwable $e, ?string $index = null): void
{
if (!isset($this->logger)) {
return;
}
$this->logger->error($e, ['exception' => $e, 'index' => $index]);
}
}

View File

@@ -6,9 +6,13 @@ namespace CodeRhapsodie\DataflowBundle\DataflowType\Dataflow;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
class Dataflow implements DataflowInterface
class Dataflow implements DataflowInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
/** @var string */
private $name;
@@ -16,15 +20,17 @@ class Dataflow implements DataflowInterface
private $reader;
/** @var callable[] */
private $steps = [];
private $steps;
/** @var WriterInterface[] */
private $writers = [];
private $writers;
public function __construct(iterable $reader, ?string $name)
{
$this->reader = $reader;
$this->name = $name;
$this->steps = [];
$this->writers = [];
}
/**
@@ -54,27 +60,33 @@ class Dataflow implements DataflowInterface
{
$count = 0;
$exceptions = [];
$startTime = new \DateTime();
$startTime = new \DateTimeImmutable();
foreach ($this->writers as $writer) {
$writer->prepare();
}
foreach ($this->reader as $index => $item) {
try {
$this->processItem($item);
} catch (\Throwable $e) {
$exceptions[$index] = $e;
try {
foreach ($this->writers as $writer) {
$writer->prepare();
}
++$count;
foreach ($this->reader as $index => $item) {
try {
$this->processItem($item);
} catch (\Throwable $e) {
$exceptions[$index] = $e;
$this->logException($e, (string) $index);
}
++$count;
}
foreach ($this->writers as $writer) {
$writer->finish();
}
} catch (\Throwable $e) {
$exceptions[] = $e;
$this->logException($e);
}
foreach ($this->writers as $writer) {
$writer->finish();
}
return new Result($this->name, $startTime, new \DateTime(), $count, $exceptions);
return new Result($this->name, $startTime, new \DateTimeImmutable(), $count, $exceptions);
}
/**
@@ -94,4 +106,13 @@ class Dataflow implements DataflowInterface
$writer->write($item);
}
}
private function logException(\Throwable $e, ?string $index = null): void
{
if (!isset($this->logger)) {
return;
}
$this->logger->error($e, ['exception' => $e, 'index' => $index]);
}
}

View File

@@ -28,5 +28,11 @@ class CodeRhapsodieDataflowExtension extends Extension
$config = $this->processConfiguration($configuration, $configs);
$container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']);
$container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']);
if ($config['messenger_mode']['enabled']) {
$container->setParameter('coderhapsodie.dataflow.bus', $config['messenger_mode']['bus']);
$loader->load('messenger_services.yaml');
}
}
}

View File

@@ -0,0 +1,36 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler;
use CodeRhapsodie\DataflowBundle\Runner\MessengerDataflowRunner;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Exception\InvalidArgumentException;
use Symfony\Component\DependencyInjection\Reference;
class BusCompilerPass implements CompilerPassInterface
{
/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
{
if (!$container->hasParameter('coderhapsodie.dataflow.bus')) {
return;
}
$bus = $container->getParameter('coderhapsodie.dataflow.bus');
if (!$container->has($bus)) {
throw new InvalidArgumentException(sprintf('Service "%s" not found', $bus));
}
if (!$container->has(MessengerDataflowRunner::class)) {
return;
}
$definition = $container->findDefinition(MessengerDataflowRunner::class);
$definition->setArgument('$bus', new Reference($bus));
}
}

View File

@@ -0,0 +1,34 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler;
use CodeRhapsodie\DataflowBundle\Command\ExecuteDataflowCommand;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessor;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Reference;
class DefaultLoggerCompilerPass implements CompilerPassInterface
{
/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
{
$defaultLogger = $container->getParameter('coderhapsodie.dataflow.default_logger');
if (!$container->has($defaultLogger)) {
return;
}
foreach ([ExecuteDataflowCommand::class, JobProcessor::class] as $serviceId) {
if (!$container->has($serviceId)) {
continue;
}
$definition = $container->findDefinition($serviceId);
$definition->addMethodCall('setLogger', [new Reference($defaultLogger)]);
}
}
}

View File

@@ -6,19 +6,43 @@ namespace CodeRhapsodie\DataflowBundle\DependencyInjection;
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use Symfony\Component\Config\Definition\ConfigurationInterface;
use Symfony\Component\Messenger\MessageBusInterface;
class Configuration implements ConfigurationInterface
{
public function getConfigTreeBuilder()
{
$treeBuilder = new TreeBuilder();
$rootNode = $treeBuilder->root('code_rhapsodie_dataflow');
$treeBuilder = new TreeBuilder('code_rhapsodie_dataflow');
if (method_exists($treeBuilder, 'getRootNode')) {
$rootNode = $treeBuilder->getRootNode();
} else {
// BC for symfony/config < 4.2
$rootNode = $treeBuilder->root('code_rhapsodie_dataflow');
}
$rootNode
->children()
->scalarNode('dbal_default_connection')
->defaultValue('default')
->end()
->scalarNode('default_logger')
->defaultValue('logger')
->end()
->arrayNode('messenger_mode')
->addDefaultsIfNotSet()
->children()
->booleanNode('enabled')
->defaultFalse()
->end()
->scalarNode('bus')
->defaultValue('messenger.default_bus')
->end()
->end()
->validate()
->ifTrue(static function ($v): bool { return $v['enabled'] && !interface_exists(MessageBusInterface::class); })
->thenInvalid('You need "symfony/messenger" in order to use Dataflow messenger mode.')
->end()
->end()
->end()
;

View File

@@ -16,6 +16,7 @@ class Job
const STATUS_PENDING = 0;
const STATUS_RUNNING = 1;
const STATUS_COMPLETED = 2;
const STATUS_QUEUED = 3;
private const KEYS = [
'id',
@@ -68,8 +69,6 @@ class Job
/**
* @var \DateTimeInterface|null
*
* @Asserts\DateTime()
*/
private $requestedDate;

View File

@@ -9,4 +9,12 @@ namespace CodeRhapsodie\DataflowBundle\Exceptions;
*/
class UnknownDataflowTypeException extends \Exception
{
public static function create(string $aliasOrFqcn, array $knownDataflowTypes): self
{
return new self(sprintf(
'Unknown dataflow type FQCN or alias "%s". Registered dataflow types FQCN and aliases are %s.',
$aliasOrFqcn,
implode(', ', $knownDataflowTypes)
));
}
}

View File

@@ -0,0 +1,42 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Logger;
use Monolog\Formatter\FormatterInterface;
use Monolog\Formatter\LineFormatter;
use Monolog\Handler\AbstractProcessingHandler;
use Monolog\Logger;
class BufferHandler extends AbstractProcessingHandler
{
private const FORMAT = "[%datetime%] %level_name% when processing item %context.index%: %message% %context% %extra%\n";
private $buffer;
public function __construct($level = Logger::DEBUG, bool $bubble = true)
{
parent::__construct($level, $bubble);
$this->buffer = [];
}
public function clearBuffer(): array
{
$logs = $this->buffer;
$this->buffer = [];
return $logs;
}
protected function write(array $record): void
{
$this->buffer[] = $record['formatted'];
}
protected function getDefaultFormatter(): FormatterInterface
{
return new LineFormatter(self::FORMAT);
}
}

View File

@@ -0,0 +1,32 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Logger;
use Psr\Log\AbstractLogger;
use Psr\Log\LoggerInterface;
final class DelegatingLogger extends AbstractLogger
{
/** @var LoggerInterface[] */
private $loggers;
public function __construct(iterable $loggers)
{
foreach ($loggers as $logger) {
if (!$logger instanceof LoggerInterface) {
throw new \InvalidArgumentException(sprintf('Only instances of %s should be passed to the constructor of %s. An instance of %s was passed instead.', LoggerInterface::class, self::class, get_class($logger)));
}
$this->loggers[] = $logger;
}
}
public function log($level, $message, array $context = [])
{
foreach ($this->loggers as $logger) {
$logger->log($level, $message, $context);
}
}
}

View File

@@ -0,0 +1,21 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\MessengerMode;
class JobMessage
{
/** @var int */
private $jobId;
public function __construct(int $jobId)
{
$this->jobId = $jobId;
}
public function getJobId(): int
{
return $this->jobId;
}
}

View File

@@ -0,0 +1,34 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\MessengerMode;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
class JobMessageHandler implements MessageSubscriberInterface
{
/** @var JobRepository */
private $repository;
/** @var JobProcessorInterface */
private $processor;
public function __construct(JobRepository $repository, JobProcessorInterface $processor)
{
$this->repository = $repository;
$this->processor = $processor;
}
public function __invoke(JobMessage $message)
{
$this->processor->process($this->repository->find($message->getJobId()));
}
public static function getHandledMessages(): iterable
{
return [JobMessage::class];
}
}

View File

@@ -0,0 +1,99 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Processor;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Logger\BufferHandler;
use CodeRhapsodie\DataflowBundle\Logger\DelegatingLogger;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Monolog\Logger;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
/** @var JobRepository */
private $repository;
/** @var DataflowTypeRegistryInterface */
private $registry;
/** @var EventDispatcherInterface */
private $dispatcher;
public function __construct(JobRepository $repository, DataflowTypeRegistryInterface $registry, EventDispatcherInterface $dispatcher)
{
$this->repository = $repository;
$this->registry = $registry;
$this->dispatcher = $dispatcher;
}
public function process(Job $job): void
{
$this->beforeProcessing($job);
$dataflowType = $this->registry->getDataflowType($job->getDataflowType());
$loggers = [new Logger('dataflow_internal', [$bufferHandler = new BufferHandler()])];
if (isset($this->logger)) {
$loggers[] = $this->logger;
}
$logger = new DelegatingLogger($loggers);
if ($dataflowType instanceof LoggerAwareInterface) {
$dataflowType->setLogger($logger);
}
$result = $dataflowType->process($job->getOptions());
if (!$dataflowType instanceof LoggerAwareInterface) {
foreach ($result->getExceptions() as $index => $e) {
$logger->error($e, ['index' => $index]);
}
}
$this->afterProcessing($job, $result, $bufferHandler);
}
private function beforeProcessing(Job $job): void
{
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher->dispatch(Events::BEFORE_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::BEFORE_PROCESSING);
}
$job
->setStatus(Job::STATUS_RUNNING)
->setStartTime(new \DateTime())
;
$this->repository->save($job);
}
private function afterProcessing(Job $job, Result $result, BufferHandler $bufferLogger): void
{
$job
->setEndTime($result->getEndTime())
->setStatus(Job::STATUS_COMPLETED)
->setCount($result->getSuccessCount())
->setExceptions($bufferLogger->clearBuffer())
;
$this->repository->save($job);
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher->dispatch(Events::AFTER_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING);
}
}
}

View File

@@ -0,0 +1,12 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Processor;
use CodeRhapsodie\DataflowBundle\Entity\Job;
interface JobProcessorInterface
{
public function process(Job $job): void;
}

View File

@@ -31,7 +31,7 @@ class DataflowTypeRegistry implements DataflowTypeRegistryInterface
return $this->aliasesRegistry[$fqcnOrAlias];
}
throw new UnknownDataflowTypeException($fqcnOrAlias);
throw UnknownDataflowTypeException::create($fqcnOrAlias, array_merge(array_keys($this->fqcnRegistry), array_keys($this->aliasesRegistry)));
}
/**

View File

@@ -7,6 +7,7 @@ namespace CodeRhapsodie\DataflowBundle\Repository;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use Doctrine\DBAL\Driver\Connection;
use Doctrine\DBAL\ParameterType;
use Doctrine\DBAL\Query\QueryBuilder;
/**
@@ -21,15 +22,15 @@ class JobRepository
public const TABLE_NAME = 'cr_dataflow_job';
private const FIELDS_TYPE = [
'id' => \PDO::PARAM_INT,
'status' => \PDO::PARAM_INT,
'label' => \PDO::PARAM_STR,
'dataflow_type' => \PDO::PARAM_STR,
'options' => \PDO::PARAM_STR,
'id' => ParameterType::INTEGER,
'status' => ParameterType::INTEGER,
'label' => ParameterType::STRING,
'dataflow_type' => ParameterType::STRING,
'options' => ParameterType::STRING,
'requested_date' => 'datetime',
'scheduled_dataflow_id' => \PDO::PARAM_INT,
'count' => \PDO::PARAM_INT,
'exceptions' => \PDO::PARAM_STR,
'scheduled_dataflow_id' => ParameterType::INTEGER,
'count' => ParameterType::INTEGER,
'exceptions' => ParameterType::STRING,
'start_time' => 'datetime',
'end_time' => 'datetime',
];
@@ -51,7 +52,7 @@ class JobRepository
{
$qb = $this->createQueryBuilder();
$qb
->andWhere($qb->expr()->eq('id', $qb->createNamedParameter($jobId, \PDO::PARAM_INT)))
->andWhere($qb->expr()->eq('id', $qb->createNamedParameter($jobId, ParameterType::INTEGER)))
;
return $this->returnFirstOrNull($qb);
@@ -62,12 +63,12 @@ class JobRepository
$qb = $this->createQueryBuilder();
$qb
->andWhere($qb->expr()->isNull('scheduled_dataflow_id'))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, \PDO::PARAM_INT)));
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, ParameterType::INTEGER)));
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
while (false !== ($row = $stmt->fetchAssociative())) {
yield Job::createFromArray($this->initDateTime($this->initArray($row)));
}
}
@@ -76,8 +77,8 @@ class JobRepository
{
$qb = $this->createQueryBuilder();
$qb
->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($scheduled->getId(), \PDO::PARAM_INT)))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, \PDO::PARAM_INT)));
->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($scheduled->getId(), ParameterType::INTEGER)))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, ParameterType::INTEGER)));
return $this->returnFirstOrNull($qb);
}
@@ -86,7 +87,7 @@ class JobRepository
{
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->lte('requested_date', $qb->createNamedParameter(new \DateTime(), 'datetime')))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, \PDO::PARAM_INT)))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, ParameterType::INTEGER)))
->orderBy('requested_date', 'ASC')
->setMaxResults(1)
;
@@ -97,7 +98,7 @@ class JobRepository
public function findLastForDataflowId(int $dataflowId): ?Job
{
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($dataflowId, \PDO::PARAM_INT)))
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($dataflowId, ParameterType::INTEGER)))
->orderBy('requested_date', 'DESC')
->setMaxResults(1)
;
@@ -115,7 +116,7 @@ class JobRepository
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
while (false !== ($row = $stmt->fetchAssociative())) {
yield Job::createFromArray($row);
}
}
@@ -123,14 +124,14 @@ class JobRepository
public function findForScheduled(int $id): iterable
{
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($id, \PDO::PARAM_INT)))
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($id, ParameterType::INTEGER)))
->orderBy('requested_date', 'DESC')
->setMaxResults(20);
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
while (false !== ($row = $stmt->fetchAssociative())) {
yield Job::createFromArray($row);
}
}
@@ -172,6 +173,6 @@ class JobRepository
return null;
}
return Job::createFromArray($this->initDateTime($this->initArray($stmt->fetch(\PDO::FETCH_ASSOC))));
return Job::createFromArray($this->initDateTime($this->initArray($stmt->fetchAssociative())));
}
}

View File

@@ -6,6 +6,7 @@ namespace CodeRhapsodie\DataflowBundle\Repository;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use Doctrine\DBAL\Driver\Connection;
use Doctrine\DBAL\ParameterType;
use Doctrine\DBAL\Query\QueryBuilder;
/**
@@ -20,13 +21,13 @@ class ScheduledDataflowRepository
public const TABLE_NAME = 'cr_dataflow_scheduled';
private const FIELDS_TYPE = [
'id' => \PDO::PARAM_INT,
'label' => \PDO::PARAM_STR,
'dataflow_type' => \PDO::PARAM_STR,
'options' => \PDO::PARAM_STR,
'frequency' => \PDO::PARAM_STR,
'id' => ParameterType::INTEGER,
'label' => ParameterType::STRING,
'dataflow_type' => ParameterType::STRING,
'options' => ParameterType::STRING,
'frequency' => ParameterType::STRING,
'next' => 'datetime',
'enabled' => \PDO::PARAM_BOOL,
'enabled' => ParameterType::BOOLEAN,
];
/**
* @var \Doctrine\DBAL\Connection
@@ -52,13 +53,13 @@ class ScheduledDataflowRepository
$qb->andWhere($qb->expr()->lte('next', $qb->createNamedParameter(new \DateTime(), 'datetime')))
->andWhere($qb->expr()->eq('enabled', 1))
->orderBy('next', 'ASC')
;
;
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
while (false !== ($row = $stmt->fetchAssociative())) {
yield ScheduledDataflow::createFromArray($this->initDateTime($this->initArray($row)));
}
}
@@ -66,7 +67,7 @@ class ScheduledDataflowRepository
public function find(int $scheduleId): ?ScheduledDataflow
{
$qb = $this->createQueryBuilder();
$qb->andWhere($qb->expr()->eq('id', $qb->createNamedParameter($scheduleId, \PDO::PARAM_INT)))
$qb->andWhere($qb->expr()->eq('id', $qb->createNamedParameter($scheduleId, ParameterType::INTEGER)))
->setMaxResults(1)
;
@@ -82,7 +83,7 @@ class ScheduledDataflowRepository
if (0 === $stmt->rowCount()) {
return [];
}
while (false !== ($row = $stmt->fetch(\PDO::FETCH_ASSOC))) {
while (false !== ($row = $stmt->fetchAssociative())) {
yield ScheduledDataflow::createFromArray($this->initDateTime($this->initOptions($row)));
}
}
@@ -96,7 +97,7 @@ class ScheduledDataflowRepository
->orderBy('w.label', 'ASC')
->groupBy('w.id');
return $query->execute()->fetchAll(\PDO::FETCH_ASSOC);
return $query->execute()->fetchAllAssociative();
}
public function save(ScheduledDataflow $scheduledDataflow)
@@ -147,6 +148,6 @@ class ScheduledDataflowRepository
return null;
}
return ScheduledDataflow::createFromArray($this->initDateTime($this->initArray($stmt->fetch(\PDO::FETCH_ASSOC))));
return ScheduledDataflow::createFromArray($this->initDateTime($this->initArray($stmt->fetchAssociative())));
}
}

View File

@@ -0,0 +1,12 @@
services:
CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface: '@CodeRhapsodie\DataflowBundle\Runner\MessengerDataflowRunner'
CodeRhapsodie\DataflowBundle\Runner\MessengerDataflowRunner:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$bus: ~ # Filled in compiler pass
CodeRhapsodie\DataflowBundle\MessengerMode\JobMessageHandler:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$processor: '@CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface'
tags: ['messenger.message_handler']

View File

@@ -23,7 +23,6 @@ services:
arguments:
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
$logger: '@logger'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\JobShowCommand:
@@ -77,6 +76,12 @@ services:
CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface: '@CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner'
CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$processor: '@CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface'
CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface: '@CodeRhapsodie\DataflowBundle\Processor\JobProcessor'
CodeRhapsodie\DataflowBundle\Processor\JobProcessor:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'

View File

@@ -0,0 +1,34 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Runner;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Symfony\Component\Messenger\MessageBusInterface;
class MessengerDataflowRunner implements PendingDataflowRunnerInterface
{
/** @var JobRepository */
private $repository;
/** @var MessageBusInterface */
private $bus;
public function __construct(JobRepository $repository, MessageBusInterface $bus)
{
$this->repository = $repository;
$this->bus = $bus;
}
public function runPendingDataflows(): void
{
while (null !== ($job = $this->repository->findNextPendingDataflow())) {
$this->bus->dispatch(new JobMessage($job->getId()));
$job->setStatus(Job::STATUS_QUEUED);
$this->repository->save($job);
}
}
}

View File

@@ -4,30 +4,21 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Runner;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class PendingDataflowRunner implements PendingDataflowRunnerInterface
{
/** @var JobRepository */
private $repository;
/** @var DataflowTypeRegistryInterface */
private $registry;
/** @var JobProcessorInterface */
private $processor;
/** @var EventDispatcherInterface */
private $dispatcher;
public function __construct(JobRepository $repository, DataflowTypeRegistryInterface $registry, EventDispatcherInterface $dispatcher)
public function __construct(JobRepository $repository, JobProcessorInterface $processor)
{
$this->repository = $repository;
$this->registry = $registry;
$this->dispatcher = $dispatcher;
$this->processor = $processor;
}
/**
@@ -36,52 +27,7 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
public function runPendingDataflows(): void
{
while (null !== ($job = $this->repository->findNextPendingDataflow())) {
$this->beforeProcessing($job);
$dataflowType = $this->registry->getDataflowType($job->getDataflowType());
$result = $dataflowType->process($job->getOptions());
$this->afterProcessing($job, $result);
}
}
private function beforeProcessing(Job $job): void
{
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher->dispatch(Events::BEFORE_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::BEFORE_PROCESSING);
}
$job
->setStatus(Job::STATUS_RUNNING)
->setStartTime(new \DateTime())
;
$this->repository->save($job);
}
private function afterProcessing(Job $job, Result $result): void
{
$exceptions = [];
/** @var \Exception $exception */
foreach ($result->getExceptions() as $exception) {
$exceptions[] = (string) $exception;
}
$job
->setEndTime($result->getEndTime())
->setStatus(Job::STATUS_COMPLETED)
->setCount($result->getSuccessCount())
->setExceptions($exceptions)
;
$this->repository->save($job);
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher->dispatch(Events::AFTER_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING);
$this->processor->process($job);
}
}
}