3 Commits

Author SHA1 Message Date
jeremycr
d89d7b72b0 Updated changelog for 3.1.0 (#64) 2021-04-21 10:40:27 +02:00
jeremycr
3cf05b555d Added messenger mode (#63) 2021-04-21 09:35:39 +02:00
Mathieu Ledru
9624f68675 Introduce the possibility to add asynchronous steps (#61) 2021-04-21 09:20:48 +02:00
28 changed files with 953 additions and 240 deletions

View File

@@ -28,18 +28,18 @@ matrix:
- php: '8.0'
# Enable code coverage with the previous supported PHP version
- php: '7.4'
env:
- SYMFONY_VERSION=3.4.*
- COVERALLS_ENABLED="true"
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
# - php: '7.4'
# env:
# - SYMFONY_VERSION=3.4.*
# - COVERALLS_ENABLED="true"
# - PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
# Enable code coverage with the latest supported PHP version
- php: '8.0'
env:
- SYMFONY_VERSION=3.4.*
- COVERALLS_ENABLED="true"
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
# - php: '8.0'
# env:
# - SYMFONY_VERSION=3.4.*
# - COVERALLS_ENABLED="true"
# - PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
# Minimum supported dependencies with the latest and oldest supported PHP versions
- php: '7.3'
@@ -51,9 +51,9 @@ matrix:
# - COMPOSER_FLAGS="--prefer-lowest"
# Test each supported Symfony version with lowest supported PHP version
- php: '7.3'
env:
- SYMFONY_VERSION=3.4.*
# - php: '7.3'
# env:
# - SYMFONY_VERSION=3.4.*
- php: '7.3'
env:
- SYMFONY_VERSION=4.4.*
@@ -63,15 +63,15 @@ matrix:
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
- SYMFONY_VERSION=5.2.*
# Test unsupported versions of Symfony
- php: '7.3'
env:
- SYMFONY_VERSION=4.1.*
- php: '7.3'
env:
- SYMFONY_VERSION=4.2.*
- php: '7.3'
env:
- SYMFONY_VERSION=4.3.*
# - php: '7.3'
# env:
# - SYMFONY_VERSION=4.1.*
# - php: '7.3'
# env:
# - SYMFONY_VERSION=4.2.*
# - php: '7.3'
# env:
# - SYMFONY_VERSION=4.3.*
- php: '7.3'
env:
- SYMFONY_VERSION=5.0.*

View File

@@ -1,3 +1,8 @@
# Version 3.1.0
* Added optional "messenger mode", to delegate jobs execution to workers from the Symfony messenger component
* Added support for asynchronous steps execution, using the AMPHP library (contribution from [matyo91](https://github.com/matyo91))
# Version 3.0.0
* Added PHP 8 support

View File

@@ -9,7 +9,7 @@ providing an easy way to create import / export dataflow.
Dataflow uses a linear generic workflow in three parts:
* one reader
* any number of steps
* any number of steps that can be synchronous or asynchronous
* one or more writers
The reader can read data from anywhere and return data row by row. Each step processes the current row data.
@@ -138,6 +138,31 @@ code_rhapsodie_dataflow:
default_logger: monolog.logger.custom #Service ID of the logger you want Dataflow to use
```
### Messenger mode
Dataflow can delegate the execution of its jobs to the Symfony messenger component, if available.
This allows jobs to be executed concurrently by workers instead of sequentially.
To enable messenger mode:
```yaml
code_rhapsodie_dataflow:
messenger_mode:
enabled: true
# bus: 'messenger.default_bus' #Service ID of the bus you want Dataflow to use, if not the default one
```
You also need to route Dataflow messages to the proper transport:
```yaml
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage: async
```
## Define a dataflow type
This bundle uses a fixed and simple workflow structure in order to let you focus on the data processing logic part of your dataflow.
@@ -216,6 +241,7 @@ If you're using Symfony auto-configuration for your services, this tag will be a
Otherwise, manually add the tag `coderhapsodie.dataflow.type` in your dataflow type service configuration:
```yaml
```yaml
CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType:
tags:
@@ -247,6 +273,10 @@ class MyFirstDataflowType extends AbstractDataflowType
With this configuration, the option `fileName` is required. For an advanced usage of the option resolver, read the [Symfony documentation](https://symfony.com/doc/current/components/options_resolver.html).
For asynchronous management, `AbstractDataflowType` come with two default options :
- loopInterval : default to 0. Update this interval if you wish customise the `tick` loop duration.
- emitInterval : default to 0. Update this interval to have a control when reader must emit new data in the flow pipeline.
### Logging
All exceptions will be caught and written in the logger.
@@ -340,6 +370,7 @@ $builder->setReader(($this->myReader)())
*Steps* are operations performed on the elements before they are handled by the *Writers*. Usually, steps are either:
- converters, that alter the element
- filters, that conditionally prevent further operations on the element
- generators, that can include asynchronous operations
A *Step* can be any callable, taking the element as its argument, and returning either:
- the element, possibly altered
@@ -357,6 +388,16 @@ $builder->addStep(function ($item) {
return $item;
});
// asynchronous step with 2 scale factor
$builder->addStep(function ($item): \Generator {
yield new \Amp\Delayed(1000); // asynchronous processing for 1 second long
// Titles are changed to all caps before export
$item['title'] = strtolower($item['title']);
return $item;
}, 2);
$builder->addStep(function ($item) {
// Private items are not exported
if ($item['private']) {
@@ -368,6 +409,8 @@ $builder->addStep(function ($item) {
//[...]
```
Note : you can ensure writing order for asynchronous operations if all steps are scaled at 1 factor.
### Writers
*Writers* perform the actual import / export operations.
@@ -527,6 +570,8 @@ Several commands are provided to manage schedules and run jobs.
`code-rhapsodie:dataflow:run-pending` Executes job in the queue according to their schedule.
When messenger mode is enabled, jobs will still be created according to their schedule, but execution will be handled by the messenger component instead.
`code-rhapsodie:dataflow:schedule:list` Display the list of dataflows scheduled.
`code-rhapsodie:dataflow:schedule:change-status` Enable or disable a scheduled dataflow

View File

@@ -0,0 +1,49 @@
<?php
namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType\Dataflow;
use Amp\Delayed;
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\AMPAsyncDataflow;
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\Dataflow;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use PHPUnit\Framework\TestCase;
class AMPAsyncDataflowTest extends TestCase
{
public function testProcess()
{
$reader = [1, 2, 3];
$result = [];
$dataflow = new AMPAsyncDataflow($reader, 'simple');
$dataflow->addStep(static function($item) {
return $item + 1;
});
$dataflow->addStep(static function($item): \Generator {
yield new Delayed(10); //delay 10 milliseconds
return $item * 2;
});
$dataflow->addWriter(new class($result) implements WriterInterface {
private $buffer;
public function __construct(&$buffer) {
$this->buffer = &$buffer;
}
public function prepare()
{
}
public function write($item)
{
$this->buffer[] = $item;
}
public function finish()
{
}
});
$dataflow->process();
self::assertSame([4, 6, 8], $result);
}
}

View File

@@ -0,0 +1,58 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Tests\MessengerMode;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage;
use CodeRhapsodie\DataflowBundle\MessengerMode\JobMessageHandler;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
class JobMessageHandlerTest extends TestCase
{
/** @var JobRepository|MockObject */
private $repository;
/** @var JobProcessorInterface|MockObject */
private $processor;
/** @var JobMessageHandler */
private $handler;
protected function setUp(): void
{
$this->repository = $this->createMock(JobRepository::class);
$this->processor = $this->createMock(JobProcessorInterface::class);
$this->handler = new JobMessageHandler($this->repository, $this->processor);
}
public function testGetHandledMessages()
{
$this->assertSame([JobMessage::class], JobMessageHandler::getHandledMessages());
}
public function testInvoke()
{
$message = new JobMessage($id = 32);
$this->repository
->expects($this->once())
->method('find')
->with($id)
->willReturn($job = new Job())
;
$this->processor
->expects($this->once())
->method('process')
->with($job)
;
($this->handler)($message);
}
}

View File

@@ -0,0 +1,120 @@
<?php
namespace CodeRhapsodie\DataflowBundle\Tests\Processor;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowTypeInterface;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessor;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class JobProcessorTest extends TestCase
{
/** @var JobProcessor */
private $processor;
/** @var JobRepository|MockObject */
private $repository;
/** @var DataflowTypeRegistryInterface|MockObject */
private $registry;
/** @var EventDispatcherInterface|MockObject */
private $dispatcher;
protected function setUp(): void
{
$this->repository = $this->createMock(JobRepository::class);
$this->registry = $this->createMock(DataflowTypeRegistryInterface::class);
$this->dispatcher = $this->createMock(EventDispatcherInterface::class);
$this->processor = new JobProcessor($this->repository, $this->registry, $this->dispatcher);
}
public function testProcess()
{
$now = new \DateTimeImmutable();
$job = (new Job())
->setStatus(Job::STATUS_PENDING)
->setDataflowType($type = 'type')
->setOptions($options = ['option1' => 'value1'])
;
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher
->expects($this->exactly(2))
->method('dispatch')
->withConsecutive(
[
Events::BEFORE_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job) {
return $event->getJob() === $job;
})
],
[
Events::AFTER_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job) {
return $event->getJob() === $job;
})
],
);
} else { // Symfony 5.0+
$this->dispatcher
->expects($this->exactly(2))
->method('dispatch')
->withConsecutive(
[
$this->callback(function (ProcessingEvent $event) use ($job) {
return $event->getJob() === $job;
}),
Events::BEFORE_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job) {
return $event->getJob() === $job;
}),
Events::AFTER_PROCESSING,
],
);
}
$dataflowType = $this->createMock(DataflowTypeInterface::class);
$this->registry
->expects($this->once())
->method('getDataflowType')
->with($type)
->willReturn($dataflowType)
;
$bag = [new \Exception('message1')];
$result = new Result('name', new \DateTimeImmutable(), $end = new \DateTimeImmutable(), $count = 10, $bag);
$dataflowType
->expects($this->once())
->method('process')
->with($options)
->willReturn($result)
;
$this->repository
->expects($this->exactly(2))
->method('save')
;
$this->processor->process($job);
$this->assertGreaterThanOrEqual($now, $job->getStartTime());
$this->assertSame(Job::STATUS_COMPLETED, $job->getStatus());
$this->assertSame($end, $job->getEndTime());
$this->assertSame($count - count($bag), $job->getCount());
}
}

View File

@@ -0,0 +1,72 @@
<?php
namespace CodeRhapsodie\DataflowBundle\Tests\Runner;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Runner\MessengerDataflowRunner;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
class MessengerDataflowRunnerTest extends TestCase
{
/** @var MessengerDataflowRunner */
private $runner;
/** @var JobRepository|MockObject */
private $repository;
/** @var MessageBusInterface|MockObject */
private $bus;
protected function setUp(): void
{
$this->repository = $this->createMock(JobRepository::class);
$this->bus = $this->createMock(MessageBusInterface::class);
$this->runner = new MessengerDataflowRunner($this->repository, $this->bus);
}
public function testRunPendingDataflows()
{
$job1 = (new Job())->setId($id1 = 10);
$job2 = (new Job())->setId($id2 = 20);
$this->repository
->expects($this->exactly(3))
->method('findNextPendingDataflow')
->willReturnOnConsecutiveCalls($job1, $job2, null)
;
$this->repository
->expects($this->exactly(2))
->method('save')
->withConsecutive([$job1], [$job2])
;
$this->bus
->expects($this->exactly(2))
->method('dispatch')
->withConsecutive([
$this->callback(function ($message) use ($id1) {
return $message instanceof JobMessage && $message->getJobId() === $id1;
})
], [
$this->callback(function ($message) use ($id2) {
return $message instanceof JobMessage && $message->getJobId() === $id2;
})
])
->willReturnOnConsecutiveCalls(
new Envelope(new JobMessage($id1)),
new Envelope(new JobMessage($id2))
)
;
$this->runner->runPendingDataflows();
$this->assertSame(Job::STATUS_QUEUED, $job1->getStatus());
$this->assertSame(Job::STATUS_QUEUED, $job2->getStatus());
}
}

View File

@@ -2,18 +2,12 @@
namespace CodeRhapsodie\DataflowBundle\Tests\Runner;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowTypeInterface;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner;
use Doctrine\ORM\EntityManagerInterface;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class PendingDataflowRunnerTest extends TestCase
{
@@ -23,34 +17,21 @@ class PendingDataflowRunnerTest extends TestCase
/** @var JobRepository|MockObject */
private $repository;
/** @var DataflowTypeRegistryInterface|MockObject */
private $registry;
/** @var EventDispatcherInterface|MockObject */
private $dispatcher;
/** @var JobProcessorInterface|MockObject */
private $processor;
protected function setUp(): void
{
$this->repository = $this->createMock(JobRepository::class);
$this->registry = $this->createMock(DataflowTypeRegistryInterface::class);
$this->dispatcher = $this->createMock(EventDispatcherInterface::class);
$this->processor = $this->createMock(JobProcessorInterface::class);
$this->runner = new PendingDataflowRunner($this->repository, $this->registry, $this->dispatcher);
$this->runner = new PendingDataflowRunner($this->repository, $this->processor);
}
public function testRunPendingDataflows()
{
$now = new \DateTime();
$job1 = (new Job())
->setStatus(Job::STATUS_PENDING)
->setDataflowType($type1 = 'type1')
->setOptions($options1 = ['option1' => 'value1'])
;
$job2 = (new Job())
->setStatus(Job::STATUS_PENDING)
->setDataflowType($type2 = 'type2')
->setOptions($options2 = ['option2' => 'value2'])
;
$job1 = new Job();
$job2 = new Job();
$this->repository
->expects($this->exactly(3))
@@ -58,113 +39,12 @@ class PendingDataflowRunnerTest extends TestCase
->willReturnOnConsecutiveCalls($job1, $job2, null)
;
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher
->expects($this->exactly(4))
->method('dispatch')
->withConsecutive(
[
Events::BEFORE_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
})
],
[
Events::AFTER_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
})
],
[
Events::BEFORE_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
})
],
[
Events::AFTER_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
})
]
);
} else { // Symfony 5.0+
$this->dispatcher
->expects($this->exactly(4))
->method('dispatch')
->withConsecutive(
[
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
}),
Events::BEFORE_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
}),
Events::AFTER_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
}),
Events::BEFORE_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
}),
Events::AFTER_PROCESSING,
]
);
}
$dataflowType1 = $this->createMock(DataflowTypeInterface::class);
$dataflowType2 = $this->createMock(DataflowTypeInterface::class);
$this->registry
$this->processor
->expects($this->exactly(2))
->method('getDataflowType')
->withConsecutive([$type1], [$type2])
->willReturnOnConsecutiveCalls($dataflowType1, $dataflowType2)
;
$bag1 = [new \Exception('message1')];
$bag2 = [new \Exception('message2')];
$result1 = new Result('name', new \DateTime(), $end1 = new \DateTime(), $count1 = 10, $bag1);
$result2 = new Result('name', new \DateTime(), $end2 = new \DateTime(), $count2 = 20, $bag2);
$dataflowType1
->expects($this->once())
->method('process')
->with($options1)
->willReturn($result1)
;
$dataflowType2
->expects($this->once())
->method('process')
->with($options2)
->willReturn($result2)
;
$this->repository
->expects($this->exactly(4))
->method('save')
->withConsecutive([$job1], [$job2])
;
$this->runner->runPendingDataflows();
$this->assertGreaterThanOrEqual($now, $job1->getStartTime());
$this->assertSame(Job::STATUS_COMPLETED, $job1->getStatus());
$this->assertSame($end1, $job1->getEndTime());
$this->assertSame($count1 - count($bag1), $job1->getCount());
$this->assertGreaterThanOrEqual($now, $job2->getStartTime());
$this->assertSame(Job::STATUS_COMPLETED, $job2->getStatus());
$this->assertSame($end2, $job2->getEndTime());
$this->assertSame($count2 - count($bag2), $job2->getCount());
}
}

View File

@@ -58,10 +58,14 @@
"symfony/yaml": "^3.4||^4.0||^5.0"
},
"require-dev": {
"phpunit/phpunit": "^7||^8"
"amphp/amp": "^2.5",
"phpunit/phpunit": "^7||^8||^9",
"symfony/messenger": "^4.4||^5.0"
},
"suggest": {
"portphp/portphp": "Provides generic readers, steps and writers for your dataflows."
"amphp/amp": "Provide asynchronous steps for your dataflows",
"portphp/portphp": "Provides generic readers, steps and writers for your dataflows.",
"symfony/messenger": "Allows messenger mode, i.e. letting workers run jobs"
},
"config": {
"sort-packages": true

View File

@@ -5,6 +5,7 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle;
use CodeRhapsodie\DataflowBundle\DependencyInjection\CodeRhapsodieDataflowExtension;
use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\BusCompilerPass;
use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\DataflowTypeCompilerPass;
use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\DefaultLoggerCompilerPass;
use Symfony\Component\DependencyInjection\ContainerBuilder;
@@ -27,6 +28,7 @@ class CodeRhapsodieDataflowBundle extends Bundle
$container
->addCompilerPass(new DataflowTypeCompilerPass())
->addCompilerPass(new DefaultLoggerCompilerPass())
->addCompilerPass(new BusCompilerPass())
;
}
}

View File

@@ -8,7 +8,6 @@ use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;

View File

@@ -0,0 +1,83 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\AMPAsyncDataflow;
use CodeRhapsodie\DataflowBundle\DataflowType\Dataflow\DataflowInterface;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
class AMPAsyncDataflowBuilder extends DataflowBuilder
{
/** @var int */
protected $loopInterval;
/** @var int */
protected $emitInterval;
public function __construct(?int $loopInterval = 0, ?int $emitInterval = 0)
{
$this->loopInterval = $loopInterval;
$this->emitInterval = $emitInterval;
}
/** @var string */
private $name;
/** @var iterable */
private $reader;
/** @var array */
private $steps = [];
/** @var WriterInterface[] */
private $writers = [];
public function setName(string $name): self
{
$this->name = $name;
return $this;
}
public function setReader(iterable $reader): self
{
$this->reader = $reader;
return $this;
}
public function addStep(callable $step, int $priority = 0, int $scale = 1): self
{
$this->steps[$priority][] = ['step' => $step, 'scale' => $scale];
return $this;
}
public function addWriter(WriterInterface $writer): self
{
$this->writers[] = $writer;
return $this;
}
public function getDataflow(): DataflowInterface
{
$dataflow = new AMPAsyncDataflow($this->reader, $this->name, $this->loopInterval, $this->emitInterval);
krsort($this->steps);
foreach ($this->steps as $stepArray) {
foreach ($stepArray as $step) {
$dataflow->addStep($step['step'], $step['scale']);
}
}
foreach ($this->writers as $writer) {
$dataflow->addWriter($writer);
}
return $dataflow;
}
}

View File

@@ -27,9 +27,8 @@ abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwar
$this->configureOptions($optionsResolver);
$options = $optionsResolver->resolve($options);
$builder = (new DataflowBuilder())
->setName($this->getLabel())
;
$builder = $this->createDataflowBuilder();
$builder->setName($this->getLabel());
$this->buildDataflow($builder, $options);
$dataflow = $builder->getDataflow();
if ($dataflow instanceof LoggerAwareInterface && $this->logger instanceof LoggerInterface) {
@@ -39,6 +38,11 @@ abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwar
return $dataflow->process();
}
protected function createDataflowBuilder(): DataflowBuilder
{
return new DataflowBuilder();
}
/**
* @codeCoverageIgnore
*/

View File

@@ -0,0 +1,188 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType\Dataflow;
use function Amp\coroutine;
use Amp\Deferred;
use Amp\Delayed;
use Amp\Loop;
use Amp\Producer;
use Amp\Promise;
use function Amp\Promise\wait;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use RuntimeException;
use Throwable;
class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
/** @var string */
private $name;
/** @var iterable */
private $reader;
/** @var callable[] */
private $steps;
/** @var WriterInterface[] */
private $writers;
/** @var int */
private $loopInterval;
/** @var int */
private $emitInterval;
/** @var array */
private $states;
/** @var array */
private $stepsJobs;
public function __construct(iterable $reader, ?string $name, ?int $loopInterval = 0, ?int $emitInterval = 0)
{
$this->reader = $reader;
$this->name = $name;
$this->steps = [];
$this->writers = [];
$this->loopInterval = $loopInterval;
$this->emitInterval = $emitInterval;
$this->states = [];
$this->stepsJobs = [];
if (!function_exists('Amp\\Promise\\wait')) {
throw new RuntimeException('Amp is not loaded. Suggest install it with composer require amphp/amp');
}
}
/**
* @param int $scale
*
* @return $this
*/
public function addStep(callable $step, $scale = 1): self
{
$this->steps[] = [$step, $scale];
return $this;
}
/**
* @return $this
*/
public function addWriter(WriterInterface $writer): self
{
$this->writers[] = $writer;
return $this;
}
/**
* {@inheritdoc}
*/
public function process(): Result
{
$count = 0;
$exceptions = [];
$startTime = new \DateTimeImmutable();
try {
foreach ($this->writers as $writer) {
$writer->prepare();
}
$deferred = new Deferred();
$resolved = false; //missing $deferred->isResolved() in version 2.5
$producer = new Producer(function (callable $emit) {
foreach ($this->reader as $index => $item) {
yield new Delayed($this->emitInterval);
yield $emit([$index, $item]);
}
});
$watcherId = Loop::repeat($this->loopInterval, function () use ($deferred, &$resolved, $producer, &$count, &$exceptions) {
if (yield $producer->advance()) {
$it = $producer->getCurrent();
[$index, $item] = $it;
$this->states[$index] = [$index, 0, $item];
} elseif (!$resolved && 0 === count($this->states)) {
$resolved = true;
$deferred->resolve();
}
foreach ($this->states as $state) {
$this->processState($state, $count, $exceptions);
}
});
wait($deferred->promise());
Loop::cancel($watcherId);
foreach ($this->writers as $writer) {
$writer->finish();
}
} catch (\Throwable $e) {
$exceptions[] = $e;
$this->logException($e);
}
return new Result($this->name, $startTime, new \DateTimeImmutable(), $count, $exceptions);
}
/**
* @param mixed $state
* @param int $count internal count reference
* @param array $exceptions internal exceptions
*/
private function processState($state, int &$count, array &$exceptions): void
{
[$readIndex, $stepIndex, $item] = $state;
if ($stepIndex < count($this->steps)) {
if (!isset($this->stepsJobs[$stepIndex])) {
$this->stepsJobs[$stepIndex] = [];
}
[$step, $scale] = $this->steps[$stepIndex];
if (count($this->stepsJobs[$stepIndex]) < $scale && !isset($this->stepsJobs[$stepIndex][$readIndex])) {
$this->stepsJobs[$stepIndex][$readIndex] = true;
/** @var Promise<void> $promise */
$promise = coroutine($step)($item);
$promise->onResolve(function (?Throwable $exception = null, $newItem = null) use ($stepIndex, $readIndex, &$exceptions) {
if ($exception) {
$exceptions[$stepIndex] = $exception;
$this->logException($exception, (string) $stepIndex);
} elseif (false === $newItem) {
unset($this->states[$readIndex]);
} else {
$this->states[$readIndex] = [$readIndex, $stepIndex + 1, $newItem];
}
unset($this->stepsJobs[$stepIndex][$readIndex]);
});
}
} else {
unset($this->states[$readIndex]);
foreach ($this->writers as $writer) {
$writer->write($item);
}
++$count;
}
}
private function logException(Throwable $e, ?string $index = null): void
{
if (!isset($this->logger)) {
return;
}
$this->logger->error($e, ['exception' => $e, 'index' => $index]);
}
}

View File

@@ -29,5 +29,10 @@ class CodeRhapsodieDataflowExtension extends Extension
$container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']);
$container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']);
if ($config['messenger_mode']['enabled']) {
$container->setParameter('coderhapsodie.dataflow.bus', $config['messenger_mode']['bus']);
$loader->load('messenger_services.yaml');
}
}
}

View File

@@ -0,0 +1,36 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler;
use CodeRhapsodie\DataflowBundle\Runner\MessengerDataflowRunner;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Exception\InvalidArgumentException;
use Symfony\Component\DependencyInjection\Reference;
class BusCompilerPass implements CompilerPassInterface
{
/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
{
if (!$container->hasParameter('coderhapsodie.dataflow.bus')) {
return;
}
$bus = $container->getParameter('coderhapsodie.dataflow.bus');
if (!$container->has($bus)) {
throw new InvalidArgumentException(sprintf('Service "%s" not found', $bus));
}
if (!$container->has(MessengerDataflowRunner::class)) {
return;
}
$definition = $container->findDefinition(MessengerDataflowRunner::class);
$definition->setArgument('$bus', new Reference($bus));
}
}

View File

@@ -5,7 +5,7 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler;
use CodeRhapsodie\DataflowBundle\Command\ExecuteDataflowCommand;
use CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessor;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Reference;
@@ -22,7 +22,7 @@ class DefaultLoggerCompilerPass implements CompilerPassInterface
return;
}
foreach ([ExecuteDataflowCommand::class, PendingDataflowRunner::class] as $serviceId) {
foreach ([ExecuteDataflowCommand::class, JobProcessor::class] as $serviceId) {
if (!$container->has($serviceId)) {
continue;
}

View File

@@ -6,6 +6,7 @@ namespace CodeRhapsodie\DataflowBundle\DependencyInjection;
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use Symfony\Component\Config\Definition\ConfigurationInterface;
use Symfony\Component\Messenger\MessageBusInterface;
class Configuration implements ConfigurationInterface
{
@@ -27,6 +28,21 @@ class Configuration implements ConfigurationInterface
->scalarNode('default_logger')
->defaultValue('logger')
->end()
->arrayNode('messenger_mode')
->addDefaultsIfNotSet()
->children()
->booleanNode('enabled')
->defaultFalse()
->end()
->scalarNode('bus')
->defaultValue('messenger.default_bus')
->end()
->end()
->validate()
->ifTrue(static function ($v): bool { return $v['enabled'] && !interface_exists(MessageBusInterface::class); })
->thenInvalid('You need "symfony/messenger" in order to use Dataflow messenger mode.')
->end()
->end()
->end()
;

View File

@@ -16,6 +16,7 @@ class Job
const STATUS_PENDING = 0;
const STATUS_RUNNING = 1;
const STATUS_COMPLETED = 2;
const STATUS_QUEUED = 3;
private const KEYS = [
'id',

View File

@@ -0,0 +1,21 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\MessengerMode;
class JobMessage
{
/** @var int */
private $jobId;
public function __construct(int $jobId)
{
$this->jobId = $jobId;
}
public function getJobId(): int
{
return $this->jobId;
}
}

View File

@@ -0,0 +1,34 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\MessengerMode;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
class JobMessageHandler implements MessageSubscriberInterface
{
/** @var JobRepository */
private $repository;
/** @var JobProcessorInterface */
private $processor;
public function __construct(JobRepository $repository, JobProcessorInterface $processor)
{
$this->repository = $repository;
$this->processor = $processor;
}
public function __invoke(JobMessage $message)
{
$this->processor->process($this->repository->find($message->getJobId()));
}
public static function getHandledMessages(): iterable
{
return [JobMessage::class];
}
}

View File

@@ -0,0 +1,99 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Processor;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Logger\BufferHandler;
use CodeRhapsodie\DataflowBundle\Logger\DelegatingLogger;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Monolog\Logger;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
/** @var JobRepository */
private $repository;
/** @var DataflowTypeRegistryInterface */
private $registry;
/** @var EventDispatcherInterface */
private $dispatcher;
public function __construct(JobRepository $repository, DataflowTypeRegistryInterface $registry, EventDispatcherInterface $dispatcher)
{
$this->repository = $repository;
$this->registry = $registry;
$this->dispatcher = $dispatcher;
}
public function process(Job $job): void
{
$this->beforeProcessing($job);
$dataflowType = $this->registry->getDataflowType($job->getDataflowType());
$loggers = [new Logger('dataflow_internal', [$bufferHandler = new BufferHandler()])];
if (isset($this->logger)) {
$loggers[] = $this->logger;
}
$logger = new DelegatingLogger($loggers);
if ($dataflowType instanceof LoggerAwareInterface) {
$dataflowType->setLogger($logger);
}
$result = $dataflowType->process($job->getOptions());
if (!$dataflowType instanceof LoggerAwareInterface) {
foreach ($result->getExceptions() as $index => $e) {
$logger->error($e, ['index' => $index]);
}
}
$this->afterProcessing($job, $result, $bufferHandler);
}
private function beforeProcessing(Job $job): void
{
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher->dispatch(Events::BEFORE_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::BEFORE_PROCESSING);
}
$job
->setStatus(Job::STATUS_RUNNING)
->setStartTime(new \DateTime())
;
$this->repository->save($job);
}
private function afterProcessing(Job $job, Result $result, BufferHandler $bufferLogger): void
{
$job
->setEndTime($result->getEndTime())
->setStatus(Job::STATUS_COMPLETED)
->setCount($result->getSuccessCount())
->setExceptions($bufferLogger->clearBuffer())
;
$this->repository->save($job);
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher->dispatch(Events::AFTER_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING);
}
}
}

View File

@@ -0,0 +1,12 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Processor;
use CodeRhapsodie\DataflowBundle\Entity\Job;
interface JobProcessorInterface
{
public function process(Job $job): void;
}

View File

@@ -53,7 +53,7 @@ class ScheduledDataflowRepository
$qb->andWhere($qb->expr()->lte('next', $qb->createNamedParameter(new \DateTime(), 'datetime')))
->andWhere($qb->expr()->eq('enabled', 1))
->orderBy('next', 'ASC')
;
;
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {

View File

@@ -0,0 +1,12 @@
services:
CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface: '@CodeRhapsodie\DataflowBundle\Runner\MessengerDataflowRunner'
CodeRhapsodie\DataflowBundle\Runner\MessengerDataflowRunner:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$bus: ~ # Filled in compiler pass
CodeRhapsodie\DataflowBundle\MessengerMode\JobMessageHandler:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$processor: '@CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface'
tags: ['messenger.message_handler']

View File

@@ -76,6 +76,12 @@ services:
CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface: '@CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner'
CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$processor: '@CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface'
CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface: '@CodeRhapsodie\DataflowBundle\Processor\JobProcessor'
CodeRhapsodie\DataflowBundle\Processor\JobProcessor:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'

View File

@@ -0,0 +1,34 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Runner;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Symfony\Component\Messenger\MessageBusInterface;
class MessengerDataflowRunner implements PendingDataflowRunnerInterface
{
/** @var JobRepository */
private $repository;
/** @var MessageBusInterface */
private $bus;
public function __construct(JobRepository $repository, MessageBusInterface $bus)
{
$this->repository = $repository;
$this->bus = $bus;
}
public function runPendingDataflows(): void
{
while (null !== ($job = $this->repository->findNextPendingDataflow())) {
$this->bus->dispatch(new JobMessage($job->getId()));
$job->setStatus(Job::STATUS_QUEUED);
$this->repository->save($job);
}
}
}

View File

@@ -4,38 +4,21 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Runner;
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Logger\BufferHandler;
use CodeRhapsodie\DataflowBundle\Logger\DelegatingLogger;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Monolog\Logger;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class PendingDataflowRunner implements PendingDataflowRunnerInterface, LoggerAwareInterface
class PendingDataflowRunner implements PendingDataflowRunnerInterface
{
use LoggerAwareTrait;
/** @var JobRepository */
private $repository;
/** @var DataflowTypeRegistryInterface */
private $registry;
/** @var JobProcessorInterface */
private $processor;
/** @var EventDispatcherInterface */
private $dispatcher;
public function __construct(JobRepository $repository, DataflowTypeRegistryInterface $registry, EventDispatcherInterface $dispatcher)
public function __construct(JobRepository $repository, JobProcessorInterface $processor)
{
$this->repository = $repository;
$this->registry = $registry;
$this->dispatcher = $dispatcher;
$this->processor = $processor;
}
/**
@@ -44,62 +27,7 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface, LoggerAwa
public function runPendingDataflows(): void
{
while (null !== ($job = $this->repository->findNextPendingDataflow())) {
$this->beforeProcessing($job);
$dataflowType = $this->registry->getDataflowType($job->getDataflowType());
$loggers = [new Logger('dataflow_internal', [$bufferHandler = new BufferHandler()])];
if (isset($this->logger)) {
$loggers[] = $this->logger;
}
$logger = new DelegatingLogger($loggers);
if ($dataflowType instanceof LoggerAwareInterface) {
$dataflowType->setLogger($logger);
}
$result = $dataflowType->process($job->getOptions());
if (!$dataflowType instanceof LoggerAwareInterface) {
foreach ($result->getExceptions() as $index => $e) {
$logger->error($e, ['index' => $index]);
}
}
$this->afterProcessing($job, $result, $bufferHandler);
}
}
private function beforeProcessing(Job $job): void
{
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher->dispatch(Events::BEFORE_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::BEFORE_PROCESSING);
}
$job
->setStatus(Job::STATUS_RUNNING)
->setStartTime(new \DateTime())
;
$this->repository->save($job);
}
private function afterProcessing(Job $job, Result $result, BufferHandler $bufferLogger): void
{
$job
->setEndTime($result->getEndTime())
->setStatus(Job::STATUS_COMPLETED)
->setCount($result->getSuccessCount())
->setExceptions($bufferLogger->clearBuffer())
;
$this->repository->save($job);
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher->dispatch(Events::AFTER_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING);
$this->processor->process($job);
}
}
}