mirror of
https://github.com/code-rhapsodie/dataflow-bundle.git
synced 2026-03-24 06:42:23 +01:00
Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5a76c11bc6 | ||
|
|
d7efd85c8e | ||
|
|
b0d17c31cc | ||
|
|
e72d0d5e8d | ||
|
|
a5518c80e2 | ||
|
|
bd9171ad53 | ||
|
|
5b4b3f1b6f | ||
|
|
d1330ae638 | ||
|
|
42b242ee6c | ||
|
|
4d98adfe0a | ||
|
|
c5fc6adf08 | ||
|
|
8efb4bd2d9 | ||
|
|
a9b19d933a | ||
|
|
ca946429b1 | ||
|
|
26ac98eb98 |
43
.travis.yml
43
.travis.yml
@@ -26,10 +26,19 @@ matrix:
|
||||
- php: '7.1'
|
||||
- php: '7.2'
|
||||
- php: '7.3'
|
||||
- php: '7.4'
|
||||
|
||||
# Enable code coverage with the latest supported PHP version
|
||||
# Enable code coverage with the previous supported PHP version
|
||||
- php: '7.3'
|
||||
env:
|
||||
- SYMFONY_VERSION=3.4.*
|
||||
- COVERALLS_ENABLED="true"
|
||||
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
|
||||
|
||||
# Enable code coverage with the latest supported PHP version
|
||||
- php: '7.4'
|
||||
env:
|
||||
- SYMFONY_VERSION=3.4.*
|
||||
- COVERALLS_ENABLED="true"
|
||||
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
|
||||
|
||||
@@ -47,43 +56,51 @@ matrix:
|
||||
- SYMFONY_VERSION=3.4.*
|
||||
- php: '7.1'
|
||||
env:
|
||||
- SYMFONY_VERSION=4.2.*
|
||||
- SYMFONY_VERSION=4.3.*
|
||||
- php: '7.1'
|
||||
env:
|
||||
- SYMFONY_VERSION=4.3.*
|
||||
- SYMFONY_VERSION=4.4.*
|
||||
- php: '7.2'
|
||||
env:
|
||||
- COVERALLS_ENABLED="true"
|
||||
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
|
||||
- SYMFONY_VERSION=5.0.*
|
||||
|
||||
# Test unsupported versions of Symfony
|
||||
- php: '7.1'
|
||||
env:
|
||||
- SYMFONY_VERSION=4.0.*
|
||||
- SYMFONY_VERSION=4.1.*
|
||||
- php: '7.1'
|
||||
env:
|
||||
- SYMFONY_VERSION=4.1.*
|
||||
- SYMFONY_VERSION=4.2.*
|
||||
|
||||
|
||||
# Test upcoming Symfony versions with lowest supported PHP version and dev dependencies
|
||||
- php: '7.1'
|
||||
- php: '7.2'
|
||||
env:
|
||||
- STABILITY=dev
|
||||
- SYMFONY_VERSION=4.4.*
|
||||
- SYMFONY_VERSION=5.1.*
|
||||
|
||||
# Test upcoming PHP versions with dev dependencies
|
||||
- php: '7.4snapshot'
|
||||
env:
|
||||
- STABILITY=dev
|
||||
- COMPOSER_FLAGS="--ignore-platform-reqs --prefer-stable"
|
||||
#- php: '7.5snapshot'
|
||||
# env:
|
||||
# - STABILITY=dev
|
||||
# - COMPOSER_FLAGS="--ignore-platform-reqs --prefer-stable"
|
||||
|
||||
allow_failures:
|
||||
# 4.0 not supported because of https://github.com/advisories/GHSA-pgwj-prpq-jpc2
|
||||
- env:
|
||||
- SYMFONY_VERSION=4.0.*
|
||||
- env:
|
||||
- SYMFONY_VERSION=4.1.*
|
||||
- env:
|
||||
- SYMFONY_VERSION=4.2.*
|
||||
- env:
|
||||
- STABILITY=dev
|
||||
- COMPOSER_FLAGS="--ignore-platform-reqs --prefer-stable"
|
||||
- env:
|
||||
- STABILITY=dev
|
||||
- SYMFONY_VERSION=4.4.*
|
||||
- SYMFONY_VERSION=5.1.*
|
||||
|
||||
before_install:
|
||||
- if [[ "$SYMFONY_VERSION" != "" ]]; then
|
||||
@@ -108,4 +125,4 @@ script: ./vendor/bin/phpunit $PHPUNIT_FLAGS
|
||||
after_success:
|
||||
- if [[ "$PHPUNIT_ENABLED" == "true" && "$COVERALLS_ENABLED" == "true" ]]; then
|
||||
./vendor/bin/php-coveralls -vvv --config .coveralls.yml;
|
||||
fi;
|
||||
fi;
|
||||
|
||||
11
CHANGELOG.md
11
CHANGELOG.md
@@ -1,3 +1,14 @@
|
||||
# Version 2.1.1
|
||||
|
||||
* Fixed some Symfony 5 compatibility issues
|
||||
|
||||
# Version 2.1.0
|
||||
|
||||
* Added CollectionWriter and DelegatorWriter
|
||||
* Adding Symfony 5.0 compatibility
|
||||
* Save all exceptions caught in the log for `code-rhapsodie:dataflow:execute`
|
||||
* Added more output when errors occured during `code-rhapsodie:dataflow:execute`
|
||||
|
||||
# Version 2.0.2
|
||||
|
||||
* Fixed the connection proxy class created by the factory
|
||||
|
||||
95
README.md
95
README.md
@@ -3,7 +3,7 @@
|
||||
DataflowBundle is a bundle for Symfony 3.4+
|
||||
providing an easy way to create import / export dataflow.
|
||||
|
||||
[](https://travis-ci.org/code-rhapsodie/dataflow-bundle)
|
||||
[](https://travis-ci.com/code-rhapsodie/dataflow-bundle)
|
||||
|
||||
[](https://coveralls.io/github/code-rhapsodie/dataflow-bundle)
|
||||
|
||||
@@ -35,6 +35,10 @@ As the following schema shows, you can define more than one dataflow:
|
||||
|
||||
## Installation
|
||||
|
||||
Security notice: Symfony 4.x is not supported before 4.1.12, see https://github.com/advisories/GHSA-pgwj-prpq-jpc2
|
||||
|
||||
And basically, every allowed-to-failed jobs in our travis configuration are not fully supported.
|
||||
|
||||
### Add the dependency
|
||||
|
||||
To install this bundle, run this command :
|
||||
@@ -386,6 +390,95 @@ class FileWriter implements WriterInterface
|
||||
}
|
||||
```
|
||||
|
||||
#### CollectionWriter
|
||||
|
||||
If you want to write multiple items from a single item read, you can use the generic `CollectionWriter`. This writer will iterate over any `iterable` it receives, and pass each item from that collection to your own writer that handles single items.
|
||||
|
||||
```php
|
||||
$builder->addWriter(new CollectionWriter($mySingleItemWriter));
|
||||
```
|
||||
|
||||
#### DelegatorWriter
|
||||
|
||||
If you want to call different writers depending on what item is read, you can use the generic `DelegatorWriter`.
|
||||
|
||||
As an example, let's suppose our items are arrays with the first entry being either `product` or `order`. We want to use a different writer based on that value.
|
||||
|
||||
First, create your writers implementing `DelegateWriterInterface` (this interface extends `WriterInterface` so your writers can still be used without the `DelegatorWriter`).
|
||||
|
||||
```php
|
||||
<?php
|
||||
namespace CodeRhapsodie\DataflowExemple\Writer;
|
||||
|
||||
use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface;
|
||||
|
||||
class ProductWriter implements DelegateWriterInterface
|
||||
{
|
||||
public function supports($item): bool
|
||||
{
|
||||
return 'product' === reset($item);
|
||||
}
|
||||
|
||||
public function prepare()
|
||||
{
|
||||
}
|
||||
|
||||
public function write($item)
|
||||
{
|
||||
// Process your product
|
||||
}
|
||||
|
||||
public function finish()
|
||||
{
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
```php
|
||||
<?php
|
||||
namespace CodeRhapsodie\DataflowExemple\Writer;
|
||||
|
||||
use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface;
|
||||
|
||||
class OrderWriter implements DelegateWriterInterface
|
||||
{
|
||||
public function supports($item): bool
|
||||
{
|
||||
return 'order' === reset($item);
|
||||
}
|
||||
|
||||
public function prepare()
|
||||
{
|
||||
}
|
||||
|
||||
public function write($item)
|
||||
{
|
||||
// Process your order
|
||||
}
|
||||
|
||||
public function finish()
|
||||
{
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Then, configure your `DelegatorWriter` and add it to your dataflow type.
|
||||
|
||||
```php
|
||||
protected function buildDataflow(DataflowBuilder $builder, array $options): void
|
||||
{
|
||||
// Snip add reader and steps
|
||||
|
||||
$delegatorWriter = new DelegatorWriter();
|
||||
$delegatorWriter->addDelegate(new ProductWriter());
|
||||
$delegatorWriter->addDelegate(new OrderWriter());
|
||||
|
||||
$builder->addWriter($delegatorWriter);
|
||||
}
|
||||
```
|
||||
|
||||
During execution, the `DelegatorWriter` will simply pass each item received to its first delegate (in the order those were added) that supports it. If no delegate supports an item, an exception will be thrown.
|
||||
|
||||
## Queue
|
||||
|
||||
All pending dataflow job processes are stored in a queue into the database.
|
||||
|
||||
@@ -4,7 +4,6 @@ namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType;
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder;
|
||||
use PHPUnit\Framework\Constraint\IsIdentical;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\OptionsResolver\OptionsResolver;
|
||||
|
||||
@@ -15,18 +14,21 @@ class AbstractDataflowTypeTest extends TestCase
|
||||
$label = 'Test label';
|
||||
$options = ['testOption' => 'Test value'];
|
||||
$values = [1, 2, 3];
|
||||
$testCase = $this;
|
||||
|
||||
$dataflowType = new class($label, $options, $values) extends AbstractDataflowType
|
||||
$dataflowType = new class($label, $options, $values, $testCase) extends AbstractDataflowType
|
||||
{
|
||||
private $label;
|
||||
private $options;
|
||||
private $values;
|
||||
private $testCase;
|
||||
|
||||
public function __construct(string $label, array $options, array $values)
|
||||
public function __construct(string $label, array $options, array $values, TestCase $testCase)
|
||||
{
|
||||
$this->label = $label;
|
||||
$this->options = $options;
|
||||
$this->values = $values;
|
||||
$this->testCase = $testCase;
|
||||
}
|
||||
|
||||
public function getLabel(): string
|
||||
@@ -42,7 +44,7 @@ class AbstractDataflowTypeTest extends TestCase
|
||||
protected function buildDataflow(DataflowBuilder $builder, array $options): void
|
||||
{
|
||||
$builder->setReader($this->values);
|
||||
(new IsIdentical($this->options))->evaluate($options);
|
||||
$this->testCase->assertSame($this->options, $options);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
56
Tests/DataflowType/Writer/CollectionWriterTest.php
Normal file
56
Tests/DataflowType/Writer/CollectionWriterTest.php
Normal file
@@ -0,0 +1,56 @@
|
||||
<?php
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType\Writer;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\CollectionWriter;
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
|
||||
use CodeRhapsodie\DataflowBundle\Exceptions\UnsupportedItemTypeException;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
class CollectionWriterTest extends TestCase
|
||||
{
|
||||
public function testNotACollection()
|
||||
{
|
||||
$this->expectException(UnsupportedItemTypeException::class);
|
||||
|
||||
$writer = new CollectionWriter($this->createMock(WriterInterface::class));
|
||||
$writer->write('Not an iterable');
|
||||
}
|
||||
|
||||
public function testSupports()
|
||||
{
|
||||
$writer = new CollectionWriter($this->createMock(WriterInterface::class));
|
||||
|
||||
$this->assertTrue($writer->supports([]));
|
||||
$this->assertTrue($writer->supports(new \ArrayIterator([])));
|
||||
$this->assertFalse($writer->supports(''));
|
||||
$this->assertFalse($writer->supports(0));
|
||||
}
|
||||
|
||||
public function testAll()
|
||||
{
|
||||
$values = ['a', 'b', 'c'];
|
||||
|
||||
$embeddedWriter = $this->createMock(WriterInterface::class);
|
||||
$embeddedWriter
|
||||
->expects($this->once())
|
||||
->method('prepare')
|
||||
;
|
||||
$embeddedWriter
|
||||
->expects($this->once())
|
||||
->method('finish')
|
||||
;
|
||||
$embeddedWriter
|
||||
->expects($this->exactly(count($values)))
|
||||
->method('write')
|
||||
->withConsecutive(...array_map(function ($item) {
|
||||
return [$item];
|
||||
}, $values))
|
||||
;
|
||||
|
||||
$writer = new CollectionWriter($embeddedWriter);
|
||||
$writer->prepare();
|
||||
$writer->write($values);
|
||||
$writer->finish();
|
||||
}
|
||||
}
|
||||
160
Tests/DataflowType/Writer/DelegatorWriterTest.php
Normal file
160
Tests/DataflowType/Writer/DelegatorWriterTest.php
Normal file
@@ -0,0 +1,160 @@
|
||||
<?php
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType\Writer;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegateWriterInterface;
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegatorWriter;
|
||||
use CodeRhapsodie\DataflowBundle\Exceptions\UnsupportedItemTypeException;
|
||||
use PHPUnit\Framework\MockObject\MockObject;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
class DelegatorWriterTest extends TestCase
|
||||
{
|
||||
/** @var DelegatorWriter */
|
||||
private $delegatorWriter;
|
||||
|
||||
/** @var DelegateWriterInterface|MockObject */
|
||||
private $delegateInt;
|
||||
|
||||
/** @var DelegateWriterInterface|MockObject */
|
||||
private $delegateString;
|
||||
|
||||
/** @var DelegateWriterInterface|MockObject */
|
||||
private $delegateArray;
|
||||
|
||||
protected function setUp(): void
|
||||
{
|
||||
$this->delegateInt = $this->createMock(DelegateWriterInterface::class);
|
||||
$this->delegateInt->method('supports')->willReturnCallback(function ($argument) {
|
||||
return is_int($argument);
|
||||
});
|
||||
|
||||
$this->delegateString = $this->createMock(DelegateWriterInterface::class);
|
||||
$this->delegateString->method('supports')->willReturnCallback(function ($argument) {
|
||||
return is_string($argument);
|
||||
});
|
||||
|
||||
$this->delegateArray = $this->createMock(DelegateWriterInterface::class);
|
||||
$this->delegateArray->method('supports')->willReturnCallback(function ($argument) {
|
||||
return is_array($argument);
|
||||
});
|
||||
|
||||
$this->delegatorWriter = new DelegatorWriter();
|
||||
$this->delegatorWriter->addDelegates([
|
||||
$this->delegateInt,
|
||||
$this->delegateString,
|
||||
$this->delegateArray,
|
||||
]);
|
||||
}
|
||||
|
||||
public function testUnsupported()
|
||||
{
|
||||
$this->expectException(UnsupportedItemTypeException::class);
|
||||
|
||||
$this->delegatorWriter->write(new \stdClass());
|
||||
}
|
||||
|
||||
public function testStopAtFirstSupportingDelegate()
|
||||
{
|
||||
$value = 0;
|
||||
|
||||
$this->delegateInt->expects($this->once())->method('supports');
|
||||
$this->delegateInt
|
||||
->expects($this->once())
|
||||
->method('write')
|
||||
->with($value)
|
||||
;
|
||||
$this->delegateString->expects($this->never())->method('supports');
|
||||
$this->delegateArray->expects($this->never())->method('supports');
|
||||
$this->delegateString->expects($this->never())->method('write');
|
||||
$this->delegateArray->expects($this->never())->method('write');
|
||||
|
||||
$this->delegatorWriter->write($value);
|
||||
}
|
||||
|
||||
public function testNotSupported()
|
||||
{
|
||||
$value = new \stdClass();
|
||||
|
||||
$this->delegateInt
|
||||
->expects($this->once())
|
||||
->method('supports')
|
||||
->with($value)
|
||||
;
|
||||
$this->delegateString
|
||||
->expects($this->once())
|
||||
->method('supports')
|
||||
->with($value)
|
||||
;
|
||||
$this->delegateArray
|
||||
->expects($this->once())
|
||||
->method('supports')
|
||||
->with($value)
|
||||
;
|
||||
|
||||
$this->assertFalse($this->delegatorWriter->supports($value));
|
||||
}
|
||||
|
||||
public function testSupported()
|
||||
{
|
||||
$value = '';
|
||||
|
||||
$this->delegateInt
|
||||
->expects($this->once())
|
||||
->method('supports')
|
||||
->with($value)
|
||||
;
|
||||
$this->delegateString
|
||||
->expects($this->once())
|
||||
->method('supports')
|
||||
->with($value)
|
||||
;
|
||||
$this->delegateArray
|
||||
->expects($this->never())
|
||||
->method('supports')
|
||||
;
|
||||
|
||||
$this->assertTrue($this->delegatorWriter->supports($value));
|
||||
}
|
||||
|
||||
public function testAll()
|
||||
{
|
||||
$value = ['a'];
|
||||
|
||||
$this->delegateInt
|
||||
->expects($this->once())
|
||||
->method('supports')
|
||||
->with($value)
|
||||
;
|
||||
$this->delegateString
|
||||
->expects($this->once())
|
||||
->method('supports')
|
||||
->with($value)
|
||||
;
|
||||
$this->delegateArray
|
||||
->expects($this->once())
|
||||
->method('supports')
|
||||
->with($value)
|
||||
;
|
||||
|
||||
$this->delegateInt->expects($this->once())->method('prepare');
|
||||
$this->delegateString->expects($this->once())->method('prepare');
|
||||
$this->delegateArray->expects($this->once())->method('prepare');
|
||||
|
||||
$this->delegateInt->expects($this->once())->method('finish');
|
||||
$this->delegateString->expects($this->once())->method('finish');
|
||||
$this->delegateArray->expects($this->once())->method('finish');
|
||||
|
||||
$this->delegateInt->expects($this->never())->method('write');
|
||||
$this->delegateString->expects($this->never())->method('write');
|
||||
$this->delegateArray
|
||||
->expects($this->once())
|
||||
->method('write')
|
||||
->with($value)
|
||||
;
|
||||
|
||||
$this->delegatorWriter->prepare();
|
||||
$this->delegatorWriter->write($value);
|
||||
$this->delegatorWriter->finish();
|
||||
}
|
||||
}
|
||||
@@ -98,4 +98,44 @@ class ScheduledDataflowManagerTest extends TestCase
|
||||
|
||||
$this->assertEquals($next->add(\DateInterval::createFromDateString($frequency)), $scheduled2->getNext());
|
||||
}
|
||||
|
||||
public function testCreateJobsFromScheduledDataflowsWithError()
|
||||
{
|
||||
$scheduled1 = new ScheduledDataflow();
|
||||
|
||||
$this->scheduledDataflowRepository
|
||||
->expects($this->once())
|
||||
->method('findReadyToRun')
|
||||
->willReturn([$scheduled1])
|
||||
;
|
||||
|
||||
$this->jobRepository
|
||||
->expects($this->exactly(1))
|
||||
->method('findPendingForScheduledDataflow')
|
||||
->withConsecutive([$scheduled1])
|
||||
->willThrowException(new \Exception())
|
||||
;
|
||||
|
||||
$this->connection
|
||||
->expects($this->once())
|
||||
->method('beginTransaction')
|
||||
;
|
||||
$this->jobRepository
|
||||
->expects($this->never())
|
||||
->method('save')
|
||||
;
|
||||
|
||||
$this->connection
|
||||
->expects($this->never())
|
||||
->method('commit')
|
||||
;
|
||||
$this->connection
|
||||
->expects($this->once())
|
||||
->method('rollBack')
|
||||
;
|
||||
|
||||
$this->expectException(\Exception::class);
|
||||
|
||||
$this->manager->createJobsFromScheduledDataflows();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,36 +58,68 @@ class PendingDataflowRunnerTest extends TestCase
|
||||
->willReturnOnConsecutiveCalls($job1, $job2, null)
|
||||
;
|
||||
|
||||
$this->dispatcher
|
||||
->expects($this->exactly(4))
|
||||
->method('dispatch')
|
||||
->withConsecutive(
|
||||
[
|
||||
Events::BEFORE_PROCESSING,
|
||||
$this->callback(function (ProcessingEvent $event) use ($job1) {
|
||||
return $event->getJob() === $job1;
|
||||
})
|
||||
],
|
||||
[
|
||||
Events::AFTER_PROCESSING,
|
||||
$this->callback(function (ProcessingEvent $event) use ($job1) {
|
||||
return $event->getJob() === $job1;
|
||||
})
|
||||
],
|
||||
[
|
||||
Events::BEFORE_PROCESSING,
|
||||
$this->callback(function (ProcessingEvent $event) use ($job2) {
|
||||
return $event->getJob() === $job2;
|
||||
})
|
||||
],
|
||||
[
|
||||
Events::AFTER_PROCESSING,
|
||||
$this->callback(function (ProcessingEvent $event) use ($job2) {
|
||||
return $event->getJob() === $job2;
|
||||
})
|
||||
]
|
||||
)
|
||||
;
|
||||
// Symfony 3.4 to 4.4 call
|
||||
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
|
||||
$this->dispatcher
|
||||
->expects($this->exactly(4))
|
||||
->method('dispatch')
|
||||
->withConsecutive(
|
||||
[
|
||||
Events::BEFORE_PROCESSING,
|
||||
$this->callback(function (ProcessingEvent $event) use ($job1) {
|
||||
return $event->getJob() === $job1;
|
||||
})
|
||||
],
|
||||
[
|
||||
Events::AFTER_PROCESSING,
|
||||
$this->callback(function (ProcessingEvent $event) use ($job1) {
|
||||
return $event->getJob() === $job1;
|
||||
})
|
||||
],
|
||||
[
|
||||
Events::BEFORE_PROCESSING,
|
||||
$this->callback(function (ProcessingEvent $event) use ($job2) {
|
||||
return $event->getJob() === $job2;
|
||||
})
|
||||
],
|
||||
[
|
||||
Events::AFTER_PROCESSING,
|
||||
$this->callback(function (ProcessingEvent $event) use ($job2) {
|
||||
return $event->getJob() === $job2;
|
||||
})
|
||||
]
|
||||
);
|
||||
} else { // Symfony 5.0+
|
||||
$this->dispatcher
|
||||
->expects($this->exactly(4))
|
||||
->method('dispatch')
|
||||
->withConsecutive(
|
||||
[
|
||||
$this->callback(function (ProcessingEvent $event) use ($job1) {
|
||||
return $event->getJob() === $job1;
|
||||
}),
|
||||
Events::BEFORE_PROCESSING,
|
||||
],
|
||||
[
|
||||
$this->callback(function (ProcessingEvent $event) use ($job1) {
|
||||
return $event->getJob() === $job1;
|
||||
}),
|
||||
Events::AFTER_PROCESSING,
|
||||
],
|
||||
[
|
||||
$this->callback(function (ProcessingEvent $event) use ($job2) {
|
||||
return $event->getJob() === $job2;
|
||||
}),
|
||||
Events::BEFORE_PROCESSING,
|
||||
],
|
||||
[
|
||||
$this->callback(function (ProcessingEvent $event) use ($job2) {
|
||||
return $event->getJob() === $job2;
|
||||
}),
|
||||
Events::AFTER_PROCESSING,
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
$dataflowType1 = $this->createMock(DataflowTypeInterface::class);
|
||||
$dataflowType2 = $this->createMock(DataflowTypeInterface::class);
|
||||
|
||||
@@ -38,20 +38,18 @@
|
||||
"require": {
|
||||
"php": "^7.1",
|
||||
"doctrine/dbal": "^2.0",
|
||||
"seld/signal-handler": "^1.0",
|
||||
"symfony/config": "^3.4||^4.0",
|
||||
"symfony/console": "^3.4||^4.0",
|
||||
"symfony/dependency-injection": "^3.4||^4.0",
|
||||
"symfony/event-dispatcher": "^3.4||^4.0",
|
||||
"symfony/http-kernel": "^3.4||^4.0",
|
||||
"symfony/lock": "^3.4||^4.0",
|
||||
"symfony/options-resolver": "^3.4||^4.0",
|
||||
"symfony/validator": "^3.4||^4.0",
|
||||
"symfony/yaml": "^3.4||^4.0",
|
||||
"doctrine/doctrine-bundle": "^1.0"
|
||||
"symfony/config": "^3.4||^4.0||^5.0",
|
||||
"symfony/console": "^3.4||^4.0||^5.0",
|
||||
"symfony/dependency-injection": "^3.4||>=4.1.12||^5.0",
|
||||
"symfony/event-dispatcher": "^3.4||^4.0||^5.0",
|
||||
"symfony/http-kernel": "^3.4||^4.0||^5.0",
|
||||
"symfony/lock": "^3.4||^4.0||^5.0",
|
||||
"symfony/options-resolver": "^3.4||^4.0||^5.0",
|
||||
"symfony/validator": "^3.4||^4.0||^5.0",
|
||||
"symfony/yaml": "^3.4||^4.0||^5.0",
|
||||
"doctrine/doctrine-bundle": "^1.0||^2.0"
|
||||
},
|
||||
"require-dev": {
|
||||
"friendsofphp/php-cs-fixer": "^2.15",
|
||||
"phpunit/phpunit": "^7||^8"
|
||||
},
|
||||
"suggest": {
|
||||
@@ -62,7 +60,8 @@
|
||||
},
|
||||
"extra": {
|
||||
"branch-alias": {
|
||||
"dev-master": "2.x-dev",
|
||||
"dev-master": "2.1.x-dev",
|
||||
"dev-v2.0.x": "2.0.x-dev",
|
||||
"dev-v1.x": "1.x-dev"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ declare(strict_types=1);
|
||||
namespace CodeRhapsodie\DataflowBundle\Command;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
|
||||
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
@@ -13,7 +14,6 @@ use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
use Symfony\Component\Validator\Validator\ValidatorInterface;
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
|
||||
@@ -5,6 +5,7 @@ declare(strict_types=1);
|
||||
namespace CodeRhapsodie\DataflowBundle\Command;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\Console\Input\InputArgument;
|
||||
@@ -12,7 +13,6 @@ use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
|
||||
@@ -4,13 +4,14 @@ declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Command;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\Console\Input\InputArgument;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
|
||||
/**
|
||||
* Runs one dataflow.
|
||||
@@ -27,12 +28,16 @@ class ExecuteDataflowCommand extends Command
|
||||
/** @var ConnectionFactory */
|
||||
private $connectionFactory;
|
||||
|
||||
public function __construct(DataflowTypeRegistryInterface $registry, ConnectionFactory $connectionFactory)
|
||||
/** @var LoggerInterface */
|
||||
private $logger;
|
||||
|
||||
public function __construct(DataflowTypeRegistryInterface $registry, ConnectionFactory $connectionFactory, LoggerInterface $logger)
|
||||
{
|
||||
parent::__construct();
|
||||
|
||||
$this->registry = $registry;
|
||||
$this->connectionFactory = $connectionFactory;
|
||||
$this->logger = $logger;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -72,6 +77,17 @@ EOF
|
||||
$output->writeln('End time: '.$result->getEndTime()->format('Y/m/d H:i:s'));
|
||||
$output->writeln('Success: '.$result->getSuccessCount());
|
||||
|
||||
if ($result->hasErrors() > 0) {
|
||||
$output->writeln('<error> Errors: '.$result->getErrorCount().' </error>');
|
||||
$output->writeln('<error> Exceptions traces are available in the logs. </error>');
|
||||
|
||||
foreach ($result->getExceptions() as $e) {
|
||||
$this->logger->error('Error during processing : '.$e->getMessage(), ['exception' => $e]);
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,13 +5,13 @@ declare(strict_types=1);
|
||||
namespace CodeRhapsodie\DataflowBundle\Command;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Entity\Job;
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
|
||||
@@ -4,6 +4,7 @@ declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Command;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
use CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManagerInterface;
|
||||
use CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
@@ -11,7 +12,6 @@ use Symfony\Component\Console\Command\LockableTrait;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
|
||||
/**
|
||||
* Runs dataflows according to user-defined schedule.
|
||||
|
||||
@@ -4,13 +4,13 @@ declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Command;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
|
||||
@@ -4,6 +4,7 @@ declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Command;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
|
||||
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
|
||||
use CodeRhapsodie\DataflowBundle\SchemaProvider\DataflowSchemaProvider;
|
||||
@@ -14,7 +15,6 @@ use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
|
||||
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
@@ -93,7 +93,9 @@ class SchemaCommand extends Command
|
||||
$io = new SymfonyStyle($input, $output);
|
||||
$io->text('Execute these SQL Queries on your database:');
|
||||
foreach ($sqls as $sql) {
|
||||
$io->text($sql);
|
||||
$io->text($sql.';');
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,10 +21,6 @@ class Dataflow implements DataflowInterface
|
||||
/** @var WriterInterface[] */
|
||||
private $writers = [];
|
||||
|
||||
/**
|
||||
* @param iterable $reader
|
||||
* @param string|null $name
|
||||
*/
|
||||
public function __construct(iterable $reader, ?string $name)
|
||||
{
|
||||
$this->reader = $reader;
|
||||
@@ -32,8 +28,6 @@ class Dataflow implements DataflowInterface
|
||||
}
|
||||
|
||||
/**
|
||||
* @param callable $step
|
||||
*
|
||||
* @return $this
|
||||
*/
|
||||
public function addStep(callable $step): self
|
||||
@@ -44,8 +38,6 @@ class Dataflow implements DataflowInterface
|
||||
}
|
||||
|
||||
/**
|
||||
* @param WriterInterface $writer
|
||||
*
|
||||
* @return $this
|
||||
*/
|
||||
public function addWriter(WriterInterface $writer): self
|
||||
@@ -71,7 +63,7 @@ class Dataflow implements DataflowInterface
|
||||
foreach ($this->reader as $index => $item) {
|
||||
try {
|
||||
$this->processItem($item);
|
||||
} catch (\Exception $e) {
|
||||
} catch (\Throwable $e) {
|
||||
$exceptions[$index] = $e;
|
||||
}
|
||||
|
||||
|
||||
@@ -13,8 +13,6 @@ interface DataflowInterface
|
||||
{
|
||||
/**
|
||||
* Processes the data.
|
||||
*
|
||||
* @return Result
|
||||
*/
|
||||
public function process(): Result;
|
||||
}
|
||||
|
||||
@@ -33,13 +33,6 @@ class Result
|
||||
/** @var array */
|
||||
private $exceptions;
|
||||
|
||||
/**
|
||||
* @param string $name
|
||||
* @param \DateTimeInterface $startTime
|
||||
* @param \DateTimeInterface $endTime
|
||||
* @param int $totalCount
|
||||
* @param \SplObjectStorage $exceptions
|
||||
*/
|
||||
public function __construct(string $name, \DateTimeInterface $startTime, \DateTimeInterface $endTime, int $totalCount, array $exceptions)
|
||||
{
|
||||
$this->name = $name;
|
||||
@@ -52,73 +45,46 @@ class Result
|
||||
$this->exceptions = $exceptions;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string
|
||||
*/
|
||||
public function getName(): string
|
||||
{
|
||||
return $this->name;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \DateTimeInterface
|
||||
*/
|
||||
public function getStartTime(): \DateTimeInterface
|
||||
{
|
||||
return $this->startTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \DateTimeInterface
|
||||
*/
|
||||
public function getEndTime(): \DateTimeInterface
|
||||
{
|
||||
return $this->endTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \DateInterval
|
||||
*/
|
||||
public function getElapsed(): \DateInterval
|
||||
{
|
||||
return $this->elapsed;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int
|
||||
*/
|
||||
public function getErrorCount(): int
|
||||
{
|
||||
return $this->errorCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int
|
||||
*/
|
||||
public function getSuccessCount(): int
|
||||
{
|
||||
return $this->successCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int
|
||||
*/
|
||||
public function getTotalProcessedCount(): int
|
||||
{
|
||||
return $this->totalProcessedCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return bool
|
||||
*/
|
||||
public function hasErrors(): bool
|
||||
{
|
||||
return $this->errorCount > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array
|
||||
*/
|
||||
public function getExceptions(): array
|
||||
{
|
||||
return $this->exceptions;
|
||||
|
||||
62
src/DataflowType/Writer/CollectionWriter.php
Normal file
62
src/DataflowType/Writer/CollectionWriter.php
Normal file
@@ -0,0 +1,62 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\DataflowType\Writer;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Exceptions\UnsupportedItemTypeException;
|
||||
|
||||
/**
|
||||
* Delegates the writing of each item in a collection to an embedded writer.
|
||||
*/
|
||||
class CollectionWriter implements DelegateWriterInterface
|
||||
{
|
||||
/** @var WriterInterface */
|
||||
private $writer;
|
||||
|
||||
/**
|
||||
* CollectionWriter constructor.
|
||||
*/
|
||||
public function __construct(WriterInterface $writer)
|
||||
{
|
||||
$this->writer = $writer;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function prepare()
|
||||
{
|
||||
$this->writer->prepare();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function write($collection)
|
||||
{
|
||||
if (!is_iterable($collection)) {
|
||||
throw new UnsupportedItemTypeException(sprintf('Item to write was expected to be an iterable, received %s.', is_object($collection) ? get_class($collection) : gettype($collection)));
|
||||
}
|
||||
|
||||
foreach ($collection as $item) {
|
||||
$this->writer->write($item);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function finish()
|
||||
{
|
||||
$this->writer->finish();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function supports($item): bool
|
||||
{
|
||||
return is_iterable($item);
|
||||
}
|
||||
}
|
||||
18
src/DataflowType/Writer/DelegateWriterInterface.php
Normal file
18
src/DataflowType/Writer/DelegateWriterInterface.php
Normal file
@@ -0,0 +1,18 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\DataflowType\Writer;
|
||||
|
||||
/**
|
||||
* A writer that can be used as a delegate of DelegatorWriter.
|
||||
*/
|
||||
interface DelegateWriterInterface extends WriterInterface
|
||||
{
|
||||
/**
|
||||
* Returns true if the argument is of a supported type.
|
||||
*
|
||||
* @param $item
|
||||
*/
|
||||
public function supports($item): bool;
|
||||
}
|
||||
96
src/DataflowType/Writer/DelegatorWriter.php
Normal file
96
src/DataflowType/Writer/DelegatorWriter.php
Normal file
@@ -0,0 +1,96 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\DataflowType\Writer;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Exceptions\UnsupportedItemTypeException;
|
||||
|
||||
/**
|
||||
* Writer that delegated the actual writing to other writers.
|
||||
*/
|
||||
class DelegatorWriter implements DelegateWriterInterface
|
||||
{
|
||||
/** @var DelegateWriterInterface[] */
|
||||
private $delegates;
|
||||
|
||||
/**
|
||||
* DelegatorWriter constructor.
|
||||
*/
|
||||
public function __construct()
|
||||
{
|
||||
$this->delegates = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function prepare()
|
||||
{
|
||||
foreach ($this->delegates as $delegate) {
|
||||
$delegate->prepare();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function write($item)
|
||||
{
|
||||
foreach ($this->delegates as $delegate) {
|
||||
if (!$delegate->supports($item)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$delegate->write($item);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
throw new UnsupportedItemTypeException(sprintf('None of the registered delegate writers support the received item of type %s', is_object($item) ? get_class($item) : gettype($item)));
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function finish()
|
||||
{
|
||||
foreach ($this->delegates as $delegate) {
|
||||
$delegate->finish();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function supports($item): bool
|
||||
{
|
||||
foreach ($this->delegates as $delegate) {
|
||||
if ($delegate->supports($item)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a collection of delegates.
|
||||
*
|
||||
* @param iterable|DelegateWriterInterface[] $delegates
|
||||
*/
|
||||
public function addDelegates(iterable $delegates): void
|
||||
{
|
||||
foreach ($delegates as $delegate) {
|
||||
$this->addDelegate($delegate);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers one delegate.
|
||||
*/
|
||||
public function addDelegate(DelegateWriterInterface $delegate): void
|
||||
{
|
||||
$this->delegates[] = $delegate;
|
||||
}
|
||||
}
|
||||
@@ -4,11 +4,25 @@ declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\DataflowType\Writer;
|
||||
|
||||
/**
|
||||
* Represents a writer for dataflows.
|
||||
*/
|
||||
interface WriterInterface
|
||||
{
|
||||
/**
|
||||
* Called before the dataflow is processed.
|
||||
*/
|
||||
public function prepare();
|
||||
|
||||
/**
|
||||
* Write an item.
|
||||
*
|
||||
* @param mixed $item
|
||||
*/
|
||||
public function write($item);
|
||||
|
||||
/**
|
||||
* Called after the dataflow is processed.
|
||||
*/
|
||||
public function finish();
|
||||
}
|
||||
|
||||
@@ -11,8 +11,13 @@ class Configuration implements ConfigurationInterface
|
||||
{
|
||||
public function getConfigTreeBuilder()
|
||||
{
|
||||
$treeBuilder = new TreeBuilder();
|
||||
$rootNode = $treeBuilder->root('code_rhapsodie_dataflow');
|
||||
$treeBuilder = new TreeBuilder('code_rhapsodie_dataflow');
|
||||
if (method_exists($treeBuilder, 'getRootNode')) {
|
||||
$rootNode = $treeBuilder->getRootNode();
|
||||
} else {
|
||||
// BC for symfony/config < 4.2
|
||||
$rootNode = $treeBuilder->root('code_rhapsodie_dataflow');
|
||||
}
|
||||
|
||||
$rootNode
|
||||
->children()
|
||||
|
||||
@@ -68,8 +68,6 @@ class Job
|
||||
|
||||
/**
|
||||
* @var \DateTimeInterface|null
|
||||
*
|
||||
* @Asserts\DateTime()
|
||||
*/
|
||||
private $requestedDate;
|
||||
|
||||
@@ -99,8 +97,6 @@ class Job
|
||||
private $endTime;
|
||||
|
||||
/**
|
||||
* @param ScheduledDataflow $scheduled
|
||||
*
|
||||
* @return Job
|
||||
*/
|
||||
public static function createFromScheduledDataflow(ScheduledDataflow $scheduled): self
|
||||
@@ -124,8 +120,7 @@ class Job
|
||||
{
|
||||
$lost = array_diff(static::KEYS, array_keys($datas));
|
||||
if (count($lost) > 0) {
|
||||
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ',
|
||||
$lost).'"');
|
||||
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ', $lost).'"');
|
||||
}
|
||||
|
||||
$job = new self();
|
||||
@@ -161,11 +156,6 @@ class Job
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int $id
|
||||
*
|
||||
* @return Job
|
||||
*/
|
||||
public function setId(int $id): Job
|
||||
{
|
||||
$this->id = $id;
|
||||
@@ -173,27 +163,16 @@ class Job
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int|null
|
||||
*/
|
||||
public function getId(): ?int
|
||||
{
|
||||
return $this->id;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int
|
||||
*/
|
||||
public function getStatus(): int
|
||||
{
|
||||
return $this->status;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int $status
|
||||
*
|
||||
* @return Job
|
||||
*/
|
||||
public function setStatus(int $status): Job
|
||||
{
|
||||
$this->status = $status;
|
||||
@@ -201,19 +180,11 @@ class Job
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string|null
|
||||
*/
|
||||
public function getLabel(): ?string
|
||||
{
|
||||
return $this->label;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string|null $label
|
||||
*
|
||||
* @return Job
|
||||
*/
|
||||
public function setLabel(?string $label): Job
|
||||
{
|
||||
$this->label = $label;
|
||||
@@ -221,19 +192,11 @@ class Job
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string|null
|
||||
*/
|
||||
public function getDataflowType(): ?string
|
||||
{
|
||||
return $this->dataflowType;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string|null $dataflowType
|
||||
*
|
||||
* @return Job
|
||||
*/
|
||||
public function setDataflowType(?string $dataflowType): Job
|
||||
{
|
||||
$this->dataflowType = $dataflowType;
|
||||
@@ -241,19 +204,11 @@ class Job
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array|null
|
||||
*/
|
||||
public function getOptions(): ?array
|
||||
{
|
||||
return $this->options;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array|null $options
|
||||
*
|
||||
* @return Job
|
||||
*/
|
||||
public function setOptions(?array $options): Job
|
||||
{
|
||||
$this->options = $options;
|
||||
@@ -261,19 +216,11 @@ class Job
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \DateTimeInterface|null
|
||||
*/
|
||||
public function getRequestedDate(): ?\DateTimeInterface
|
||||
{
|
||||
return $this->requestedDate;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param \DateTimeInterface|null $requestedDate
|
||||
*
|
||||
* @return Job
|
||||
*/
|
||||
public function setRequestedDate(?\DateTimeInterface $requestedDate): Job
|
||||
{
|
||||
$this->requestedDate = $requestedDate;
|
||||
@@ -281,19 +228,11 @@ class Job
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int|null
|
||||
*/
|
||||
public function getScheduledDataflowId(): ?int
|
||||
{
|
||||
return $this->scheduledDataflowId;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int|null $scheduledDataflowId
|
||||
*
|
||||
* @return Job
|
||||
*/
|
||||
public function setScheduledDataflowId(?int $scheduledDataflowId): Job
|
||||
{
|
||||
$this->scheduledDataflowId = $scheduledDataflowId;
|
||||
@@ -301,19 +240,11 @@ class Job
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int|null
|
||||
*/
|
||||
public function getCount(): ?int
|
||||
{
|
||||
return $this->count;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int|null $count
|
||||
*
|
||||
* @return Job
|
||||
*/
|
||||
public function setCount(?int $count): Job
|
||||
{
|
||||
$this->count = $count;
|
||||
@@ -321,19 +252,11 @@ class Job
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array|null
|
||||
*/
|
||||
public function getExceptions(): ?array
|
||||
{
|
||||
return $this->exceptions;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array|null $exceptions
|
||||
*
|
||||
* @return Job
|
||||
*/
|
||||
public function setExceptions(?array $exceptions): Job
|
||||
{
|
||||
$this->exceptions = $exceptions;
|
||||
@@ -341,19 +264,11 @@ class Job
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \DateTimeInterface|null
|
||||
*/
|
||||
public function getStartTime(): ?\DateTimeInterface
|
||||
{
|
||||
return $this->startTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param \DateTimeInterface|null $startTime
|
||||
*
|
||||
* @return Job
|
||||
*/
|
||||
public function setStartTime(?\DateTimeInterface $startTime): Job
|
||||
{
|
||||
$this->startTime = $startTime;
|
||||
@@ -361,19 +276,11 @@ class Job
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \DateTimeInterface|null
|
||||
*/
|
||||
public function getEndTime(): ?\DateTimeInterface
|
||||
{
|
||||
return $this->endTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param \DateTimeInterface|null $endTime
|
||||
*
|
||||
* @return Job
|
||||
*/
|
||||
public function setEndTime(?\DateTimeInterface $endTime): Job
|
||||
{
|
||||
$this->endTime = $endTime;
|
||||
|
||||
@@ -73,8 +73,7 @@ class ScheduledDataflow
|
||||
{
|
||||
$lost = array_diff(static::KEYS, array_keys($datas));
|
||||
if (count($lost) > 0) {
|
||||
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ',
|
||||
$lost).'"');
|
||||
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ', $lost).'"');
|
||||
}
|
||||
|
||||
$scheduledDataflow = new self();
|
||||
@@ -103,11 +102,6 @@ class ScheduledDataflow
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int $id
|
||||
*
|
||||
* @return ScheduledDataflow
|
||||
*/
|
||||
public function setId(int $id): ScheduledDataflow
|
||||
{
|
||||
$this->id = $id;
|
||||
@@ -115,27 +109,16 @@ class ScheduledDataflow
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int|null
|
||||
*/
|
||||
public function getId(): ?int
|
||||
{
|
||||
return $this->id;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string|null
|
||||
*/
|
||||
public function getLabel(): ?string
|
||||
{
|
||||
return $this->label;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string|null $label
|
||||
*
|
||||
* @return ScheduledDataflow
|
||||
*/
|
||||
public function setLabel(?string $label): ScheduledDataflow
|
||||
{
|
||||
$this->label = $label;
|
||||
@@ -143,19 +126,11 @@ class ScheduledDataflow
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string|null
|
||||
*/
|
||||
public function getDataflowType(): ?string
|
||||
{
|
||||
return $this->dataflowType;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string|null $dataflowType
|
||||
*
|
||||
* @return ScheduledDataflow
|
||||
*/
|
||||
public function setDataflowType(?string $dataflowType): ScheduledDataflow
|
||||
{
|
||||
$this->dataflowType = $dataflowType;
|
||||
@@ -163,19 +138,11 @@ class ScheduledDataflow
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array|null
|
||||
*/
|
||||
public function getOptions(): ?array
|
||||
{
|
||||
return $this->options;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array|null $options
|
||||
*
|
||||
* @return ScheduledDataflow
|
||||
*/
|
||||
public function setOptions(?array $options): ScheduledDataflow
|
||||
{
|
||||
$this->options = $options;
|
||||
@@ -183,19 +150,11 @@ class ScheduledDataflow
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string|null
|
||||
*/
|
||||
public function getFrequency(): ?string
|
||||
{
|
||||
return $this->frequency;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string|null $frequency
|
||||
*
|
||||
* @return ScheduledDataflow
|
||||
*/
|
||||
public function setFrequency(?string $frequency): ScheduledDataflow
|
||||
{
|
||||
$this->frequency = $frequency;
|
||||
@@ -203,19 +162,11 @@ class ScheduledDataflow
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \DateTimeInterface|null
|
||||
*/
|
||||
public function getNext(): ?\DateTimeInterface
|
||||
{
|
||||
return $this->next;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param \DateTimeInterface|null $next
|
||||
*
|
||||
* @return ScheduledDataflow
|
||||
*/
|
||||
public function setNext(?\DateTimeInterface $next): ScheduledDataflow
|
||||
{
|
||||
$this->next = $next;
|
||||
@@ -223,19 +174,11 @@ class ScheduledDataflow
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return bool|null
|
||||
*/
|
||||
public function getEnabled(): ?bool
|
||||
{
|
||||
return $this->enabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bool|null $enabled
|
||||
*
|
||||
* @return ScheduledDataflow
|
||||
*/
|
||||
public function setEnabled(?bool $enabled): ScheduledDataflow
|
||||
{
|
||||
$this->enabled = $enabled;
|
||||
|
||||
20
src/Event/CrEvent.php
Normal file
20
src/Event/CrEvent.php
Normal file
@@ -0,0 +1,20 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Event;
|
||||
|
||||
/*
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
if (class_exists('Symfony\Contracts\EventDispatcher\Event')) {
|
||||
// For Symfony 5.0+
|
||||
abstract class CrEvent extends \Symfony\Contracts\EventDispatcher\Event
|
||||
{
|
||||
}
|
||||
} else {
|
||||
// For Symfony 3.4 to 4.4
|
||||
abstract class CrEvent extends \Symfony\Component\EventDispatcher\Event
|
||||
{
|
||||
}
|
||||
}
|
||||
@@ -5,31 +5,25 @@ declare(strict_types=1);
|
||||
namespace CodeRhapsodie\DataflowBundle\Event;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Entity\Job;
|
||||
use Symfony\Component\EventDispatcher\Event;
|
||||
|
||||
/**
|
||||
* Event used during the dataflow lifecycle.
|
||||
*
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class ProcessingEvent extends Event
|
||||
class ProcessingEvent extends CrEvent
|
||||
{
|
||||
/** @var Job */
|
||||
private $job;
|
||||
|
||||
/**
|
||||
* ProcessingEvent constructor.
|
||||
*
|
||||
* @param Job $job
|
||||
*/
|
||||
public function __construct(Job $job)
|
||||
{
|
||||
$this->job = $job;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Job
|
||||
*/
|
||||
public function getJob(): Job
|
||||
{
|
||||
return $this->job;
|
||||
|
||||
12
src/Exceptions/UnsupportedItemTypeException.php
Normal file
12
src/Exceptions/UnsupportedItemTypeException.php
Normal file
@@ -0,0 +1,12 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Exceptions;
|
||||
|
||||
/**
|
||||
* Exception thrown when a writer receives an item of an unsupported type.
|
||||
*/
|
||||
class UnsupportedItemTypeException extends \Exception
|
||||
{
|
||||
}
|
||||
@@ -4,10 +4,10 @@ declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Manager;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
|
||||
use CodeRhapsodie\DataflowBundle\Entity\Job;
|
||||
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
|
||||
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
|
||||
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
|
||||
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
|
||||
use Doctrine\DBAL\Driver\Connection;
|
||||
|
||||
/**
|
||||
@@ -53,9 +53,6 @@ class ScheduledDataflowManager implements ScheduledDataflowManagerInterface
|
||||
$this->connection->commit();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param ScheduledDataflow $scheduled
|
||||
*/
|
||||
private function updateScheduledDataflowNext(ScheduledDataflow $scheduled): void
|
||||
{
|
||||
$interval = \DateInterval::createFromDateString($scheduled->getFrequency());
|
||||
@@ -70,9 +67,6 @@ class ScheduledDataflowManager implements ScheduledDataflowManagerInterface
|
||||
$this->scheduledDataflowRepository->save($scheduled);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param ScheduledDataflow $scheduled
|
||||
*/
|
||||
private function createPendingForScheduled(ScheduledDataflow $scheduled): void
|
||||
{
|
||||
$this->jobRepository->save(Job::createFromScheduledDataflow($scheduled));
|
||||
|
||||
@@ -4,8 +4,8 @@ declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Registry;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\Exceptions\UnknownDataflowTypeException;
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowTypeInterface;
|
||||
use CodeRhapsodie\DataflowBundle\Exceptions\UnknownDataflowTypeException;
|
||||
|
||||
/**
|
||||
* Array based dataflow types registry.
|
||||
|
||||
@@ -13,10 +13,6 @@ interface DataflowTypeRegistryInterface
|
||||
{
|
||||
/**
|
||||
* Get a registered dataflow type from its FQCN or one of its aliases.
|
||||
*
|
||||
* @param string $fqcnOrAlias
|
||||
*
|
||||
* @return DataflowTypeInterface
|
||||
*/
|
||||
public function getDataflowType(string $fqcnOrAlias): DataflowTypeInterface;
|
||||
|
||||
@@ -29,8 +25,6 @@ interface DataflowTypeRegistryInterface
|
||||
|
||||
/**
|
||||
* Registers a dataflow type.
|
||||
*
|
||||
* @param DataflowTypeInterface $dataflowType
|
||||
*/
|
||||
public function registerDataflowType(DataflowTypeInterface $dataflowType): void;
|
||||
}
|
||||
|
||||
@@ -41,8 +41,6 @@ class JobRepository
|
||||
|
||||
/**
|
||||
* JobRepository constructor.
|
||||
*
|
||||
* @param Connection $connection
|
||||
*/
|
||||
public function __construct(Connection $connection)
|
||||
{
|
||||
|
||||
@@ -35,8 +35,6 @@ class ScheduledDataflowRepository
|
||||
|
||||
/**
|
||||
* JobRepository constructor.
|
||||
*
|
||||
* @param Connection $connection
|
||||
*/
|
||||
public function __construct(Connection $connection)
|
||||
{
|
||||
|
||||
@@ -23,6 +23,7 @@ services:
|
||||
arguments:
|
||||
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
|
||||
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
|
||||
$logger: '@logger'
|
||||
tags: ['console.command']
|
||||
|
||||
CodeRhapsodie\DataflowBundle\Command\JobShowCommand:
|
||||
|
||||
@@ -45,12 +45,14 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Job $job
|
||||
*/
|
||||
private function beforeProcessing(Job $job): void
|
||||
{
|
||||
$this->dispatcher->dispatch(Events::BEFORE_PROCESSING, new ProcessingEvent($job));
|
||||
// Symfony 3.4 to 4.4 call
|
||||
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
|
||||
$this->dispatcher->dispatch(Events::BEFORE_PROCESSING, new ProcessingEvent($job));
|
||||
} else { // Symfony 5.0+ call
|
||||
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::BEFORE_PROCESSING);
|
||||
}
|
||||
|
||||
$job
|
||||
->setStatus(Job::STATUS_RUNNING)
|
||||
@@ -59,10 +61,6 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
|
||||
$this->repository->save($job);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Job $job
|
||||
* @param Result $result
|
||||
*/
|
||||
private function afterProcessing(Job $job, Result $result): void
|
||||
{
|
||||
$exceptions = [];
|
||||
@@ -79,6 +77,11 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
|
||||
;
|
||||
$this->repository->save($job);
|
||||
|
||||
$this->dispatcher->dispatch(Events::AFTER_PROCESSING, new ProcessingEvent($job));
|
||||
// Symfony 3.4 to 4.4 call
|
||||
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
|
||||
$this->dispatcher->dispatch(Events::AFTER_PROCESSING, new ProcessingEvent($job));
|
||||
} else { // Symfony 5.0+ call
|
||||
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,10 +19,10 @@ class DataflowSchemaProvider
|
||||
{
|
||||
$schema = new Schema();
|
||||
$tableJob = $schema->createTable(JobRepository::TABLE_NAME);
|
||||
$tableJob->addColumn('id', 'integer', array(
|
||||
$tableJob->addColumn('id', 'integer', [
|
||||
'autoincrement' => true,
|
||||
));
|
||||
$tableJob->setPrimaryKey(array('id'));
|
||||
]);
|
||||
$tableJob->setPrimaryKey(['id']);
|
||||
|
||||
$tableJob->addColumn('scheduled_dataflow_id', 'integer', ['notnull' => false]);
|
||||
$tableJob->addColumn('status', 'integer', ['notnull' => true]);
|
||||
@@ -36,10 +36,10 @@ class DataflowSchemaProvider
|
||||
$tableJob->addColumn('end_time', 'datetime', ['notnull' => false]);
|
||||
|
||||
$tableSchedule = $schema->createTable(ScheduledDataflowRepository::TABLE_NAME);
|
||||
$tableSchedule->addColumn('id', 'integer', array(
|
||||
$tableSchedule->addColumn('id', 'integer', [
|
||||
'autoincrement' => true,
|
||||
));
|
||||
$tableSchedule->setPrimaryKey(array('id'));
|
||||
]);
|
||||
$tableSchedule->setPrimaryKey(['id']);
|
||||
$tableSchedule->addColumn('label', 'string', ['notnull' => true, 'length' => 255]);
|
||||
$tableSchedule->addColumn('dataflow_type', 'string', ['notnull' => true, 'length' => 255]);
|
||||
$tableSchedule->addColumn('options', 'json', ['notnull' => true]);
|
||||
|
||||
Reference in New Issue
Block a user