11 Commits

Author SHA1 Message Date
jeremycr
a5518c80e2 Added more output when errors occured during execute command (#49)
* Added more output when errors occured during execute command

* Apply suggestions from code review

Co-Authored-By: jbcr <51637606+jbcr@users.noreply.github.com>
2020-02-27 09:57:42 +01:00
jeremycr
bd9171ad53 Added semi-colon after each query to ease copy-paste (#48) 2020-02-14 14:37:18 +01:00
jeremycr
5b4b3f1b6f Fixed the connection proxy class created by the factory (#47) 2020-01-30 14:37:47 +01:00
jbcr
d1330ae638 display errors catched during processing (#44)
* display errors catched during processing
* update changelog
2019-12-04 15:37:24 +01:00
jbcr
42b242ee6c catch all error (#43) 2019-12-04 15:37:00 +01:00
jeremycr
4d98adfe0a Updated composer aliases (#40) 2019-11-29 09:10:16 +01:00
jeremycr
c5fc6adf08 Passed PHP-CS-Fixer 2.16.1 (#39) 2019-11-28 14:38:16 +01:00
jeremycr
8efb4bd2d9 Removed unused dependency (#38) 2019-11-28 14:36:31 +01:00
jbcr
a9b19d933a Add Symfony 5.0 compatibility (#35)
* update travis config
* add doctrine dbal 2 in requirement
* remove php cs fixer. Use a global installed CS Fixer
* Add backward compatibility
* add coverage for SF5.0
* add tests
* code coverage on lowest version
* code coverage on 3.4 version
2019-11-26 14:25:05 +01:00
jeremycr
ca946429b1 Fixed next execution for scheduled dataflows not increasing (#36) 2019-11-22 14:12:04 +01:00
jeremycr
26ac98eb98 Added CollectionWriter and DelegatorWriter (#34)
* Added CollectionWriter and DelegatorWriter

* Added explanations and examples in README
2019-11-21 11:47:21 +01:00
36 changed files with 731 additions and 299 deletions

View File

@@ -30,6 +30,7 @@ matrix:
# Enable code coverage with the latest supported PHP version
- php: '7.3'
env:
- SYMFONY_VERSION=3.4.*
- COVERALLS_ENABLED="true"
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
@@ -47,10 +48,15 @@ matrix:
- SYMFONY_VERSION=3.4.*
- php: '7.1'
env:
- SYMFONY_VERSION=4.2.*
- SYMFONY_VERSION=4.3.*
- php: '7.1'
env:
- SYMFONY_VERSION=4.3.*
- SYMFONY_VERSION=4.4.*
- php: '7.2'
env:
- COVERALLS_ENABLED="true"
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
- SYMFONY_VERSION=5.0.*
# Test unsupported versions of Symfony
- php: '7.1'
@@ -59,13 +65,16 @@ matrix:
- php: '7.1'
env:
- SYMFONY_VERSION=4.1.*
- php: '7.1'
env:
- SYMFONY_VERSION=4.2.*
# Test upcoming Symfony versions with lowest supported PHP version and dev dependencies
- php: '7.1'
- php: '7.2'
env:
- STABILITY=dev
- SYMFONY_VERSION=4.4.*
- SYMFONY_VERSION=5.1.*
# Test upcoming PHP versions with dev dependencies
- php: '7.4snapshot'
@@ -78,12 +87,14 @@ matrix:
- SYMFONY_VERSION=4.0.*
- env:
- SYMFONY_VERSION=4.1.*
- env:
- SYMFONY_VERSION=4.2.*
- env:
- STABILITY=dev
- COMPOSER_FLAGS="--ignore-platform-reqs --prefer-stable"
- env:
- STABILITY=dev
- SYMFONY_VERSION=4.4.*
- SYMFONY_VERSION=5.1.*
before_install:
- if [[ "$SYMFONY_VERSION" != "" ]]; then
@@ -108,4 +119,4 @@ script: ./vendor/bin/phpunit $PHPUNIT_FLAGS
after_success:
- if [[ "$PHPUNIT_ENABLED" == "true" && "$COVERALLS_ENABLED" == "true" ]]; then
./vendor/bin/php-coveralls -vvv --config .coveralls.yml;
fi;
fi;

View File

@@ -1,3 +1,18 @@
# Version 2.1.0
* Added CollectionWriter and DelegatorWriter
* Adding Symfony 5.0 compatibility
* Save all exceptions caught in the log for `code-rhapsodie:dataflow:execute`
* Added more output when errors occured during `code-rhapsodie:dataflow:execute`
# Version 2.0.2
* Fixed the connection proxy class created by the factory
# Version 2.0.1
* Fixed next execution time not increasing for scheduled dataflows
# Version 2.0.0
* Add Doctrine DBAL multi-connection support

View File

@@ -386,6 +386,95 @@ class FileWriter implements WriterInterface
}
```
#### CollectionWriter
If you want to write multiple items from a single item read, you can use the generic `CollectionWriter`. This writer will iterate over any `iterable` it receives, and pass each item from that collection to your own writer that handles single items.
```php
$builder->addWriter(new CollectionWriter($mySingleItemWriter));
```
#### DelegatorWriter
If you want to call different writers depending on what item is read, you can use the generic `DelegatorWriter`.
As an example, let's suppose our items are arrays with the first entry being either `product` or `order`. We want to use a different writer based on that value.
First, create your writers implementing `DelegateWriterInterface` (this interface extends `WriterInterface` so your writers can still be used without the `DelegatorWriter`).
```php
<?php
namespace CodeRhapsodie\DataflowExemple\Writer;
use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface;
class ProductWriter implements DelegateWriterInterface
{
public function supports($item): bool
{
return 'product' === reset($item);
}
public function prepare()
{
}
public function write($item)
{
// Process your product
}
public function finish()
{
}
}
```
```php
<?php
namespace CodeRhapsodie\DataflowExemple\Writer;
use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface;
class OrderWriter implements DelegateWriterInterface
{
public function supports($item): bool
{
return 'order' === reset($item);
}
public function prepare()
{
}
public function write($item)
{
// Process your order
}
public function finish()
{
}
}
```
Then, configure your `DelegatorWriter` and add it to your dataflow type.
```php
protected function buildDataflow(DataflowBuilder $builder, array $options): void
{
// Snip add reader and steps
$delegatorWriter = new DelegatorWriter();
$delegatorWriter->addDelegate(new ProductWriter());
$delegatorWriter->addDelegate(new OrderWriter());
$builder->addWriter($delegatorWriter);
}
```
During execution, the `DelegatorWriter` will simply pass each item received to its first delegate (in the order those were added) that supports it. If no delegate supports an item, an exception will be thrown.
## Queue
All pending dataflow job processes are stored in a queue into the database.

View File

@@ -4,7 +4,6 @@ namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder;
use PHPUnit\Framework\Constraint\IsIdentical;
use PHPUnit\Framework\TestCase;
use Symfony\Component\OptionsResolver\OptionsResolver;
@@ -15,18 +14,21 @@ class AbstractDataflowTypeTest extends TestCase
$label = 'Test label';
$options = ['testOption' => 'Test value'];
$values = [1, 2, 3];
$testCase = $this;
$dataflowType = new class($label, $options, $values) extends AbstractDataflowType
$dataflowType = new class($label, $options, $values, $testCase) extends AbstractDataflowType
{
private $label;
private $options;
private $values;
private $testCase;
public function __construct(string $label, array $options, array $values)
public function __construct(string $label, array $options, array $values, TestCase $testCase)
{
$this->label = $label;
$this->options = $options;
$this->values = $values;
$this->testCase = $testCase;
}
public function getLabel(): string
@@ -42,7 +44,7 @@ class AbstractDataflowTypeTest extends TestCase
protected function buildDataflow(DataflowBuilder $builder, array $options): void
{
$builder->setReader($this->values);
(new IsIdentical($this->options))->evaluate($options);
$this->testCase->assertSame($this->options, $options);
}
};

View File

@@ -0,0 +1,56 @@
<?php
namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType\Writer;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\CollectionWriter;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use CodeRhapsodie\DataflowBundle\Exceptions\UnsupportedItemTypeException;
use PHPUnit\Framework\TestCase;
class CollectionWriterTest extends TestCase
{
public function testNotACollection()
{
$this->expectException(UnsupportedItemTypeException::class);
$writer = new CollectionWriter($this->createMock(WriterInterface::class));
$writer->write('Not an iterable');
}
public function testSupports()
{
$writer = new CollectionWriter($this->createMock(WriterInterface::class));
$this->assertTrue($writer->supports([]));
$this->assertTrue($writer->supports(new \ArrayIterator([])));
$this->assertFalse($writer->supports(''));
$this->assertFalse($writer->supports(0));
}
public function testAll()
{
$values = ['a', 'b', 'c'];
$embeddedWriter = $this->createMock(WriterInterface::class);
$embeddedWriter
->expects($this->once())
->method('prepare')
;
$embeddedWriter
->expects($this->once())
->method('finish')
;
$embeddedWriter
->expects($this->exactly(count($values)))
->method('write')
->withConsecutive(...array_map(function ($item) {
return [$item];
}, $values))
;
$writer = new CollectionWriter($embeddedWriter);
$writer->prepare();
$writer->write($values);
$writer->finish();
}
}

View File

@@ -0,0 +1,160 @@
<?php
namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType\Writer;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegateWriterInterface;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegatorWriter;
use CodeRhapsodie\DataflowBundle\Exceptions\UnsupportedItemTypeException;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
class DelegatorWriterTest extends TestCase
{
/** @var DelegatorWriter */
private $delegatorWriter;
/** @var DelegateWriterInterface|MockObject */
private $delegateInt;
/** @var DelegateWriterInterface|MockObject */
private $delegateString;
/** @var DelegateWriterInterface|MockObject */
private $delegateArray;
protected function setUp(): void
{
$this->delegateInt = $this->createMock(DelegateWriterInterface::class);
$this->delegateInt->method('supports')->willReturnCallback(function ($argument) {
return is_int($argument);
});
$this->delegateString = $this->createMock(DelegateWriterInterface::class);
$this->delegateString->method('supports')->willReturnCallback(function ($argument) {
return is_string($argument);
});
$this->delegateArray = $this->createMock(DelegateWriterInterface::class);
$this->delegateArray->method('supports')->willReturnCallback(function ($argument) {
return is_array($argument);
});
$this->delegatorWriter = new DelegatorWriter();
$this->delegatorWriter->addDelegates([
$this->delegateInt,
$this->delegateString,
$this->delegateArray,
]);
}
public function testUnsupported()
{
$this->expectException(UnsupportedItemTypeException::class);
$this->delegatorWriter->write(new \stdClass());
}
public function testStopAtFirstSupportingDelegate()
{
$value = 0;
$this->delegateInt->expects($this->once())->method('supports');
$this->delegateInt
->expects($this->once())
->method('write')
->with($value)
;
$this->delegateString->expects($this->never())->method('supports');
$this->delegateArray->expects($this->never())->method('supports');
$this->delegateString->expects($this->never())->method('write');
$this->delegateArray->expects($this->never())->method('write');
$this->delegatorWriter->write($value);
}
public function testNotSupported()
{
$value = new \stdClass();
$this->delegateInt
->expects($this->once())
->method('supports')
->with($value)
;
$this->delegateString
->expects($this->once())
->method('supports')
->with($value)
;
$this->delegateArray
->expects($this->once())
->method('supports')
->with($value)
;
$this->assertFalse($this->delegatorWriter->supports($value));
}
public function testSupported()
{
$value = '';
$this->delegateInt
->expects($this->once())
->method('supports')
->with($value)
;
$this->delegateString
->expects($this->once())
->method('supports')
->with($value)
;
$this->delegateArray
->expects($this->never())
->method('supports')
;
$this->assertTrue($this->delegatorWriter->supports($value));
}
public function testAll()
{
$value = ['a'];
$this->delegateInt
->expects($this->once())
->method('supports')
->with($value)
;
$this->delegateString
->expects($this->once())
->method('supports')
->with($value)
;
$this->delegateArray
->expects($this->once())
->method('supports')
->with($value)
;
$this->delegateInt->expects($this->once())->method('prepare');
$this->delegateString->expects($this->once())->method('prepare');
$this->delegateArray->expects($this->once())->method('prepare');
$this->delegateInt->expects($this->once())->method('finish');
$this->delegateString->expects($this->once())->method('finish');
$this->delegateArray->expects($this->once())->method('finish');
$this->delegateInt->expects($this->never())->method('write');
$this->delegateString->expects($this->never())->method('write');
$this->delegateArray
->expects($this->once())
->method('write')
->with($value)
;
$this->delegatorWriter->prepare();
$this->delegatorWriter->write($value);
$this->delegatorWriter->finish();
}
}

View File

@@ -98,4 +98,44 @@ class ScheduledDataflowManagerTest extends TestCase
$this->assertEquals($next->add(\DateInterval::createFromDateString($frequency)), $scheduled2->getNext());
}
public function testCreateJobsFromScheduledDataflowsWithError()
{
$scheduled1 = new ScheduledDataflow();
$this->scheduledDataflowRepository
->expects($this->once())
->method('findReadyToRun')
->willReturn([$scheduled1])
;
$this->jobRepository
->expects($this->exactly(1))
->method('findPendingForScheduledDataflow')
->withConsecutive([$scheduled1])
->willThrowException(new \Exception())
;
$this->connection
->expects($this->once())
->method('beginTransaction')
;
$this->jobRepository
->expects($this->never())
->method('save')
;
$this->connection
->expects($this->never())
->method('commit')
;
$this->connection
->expects($this->once())
->method('rollBack')
;
$this->expectException(\Exception::class);
$this->manager->createJobsFromScheduledDataflows();
}
}

View File

@@ -58,36 +58,68 @@ class PendingDataflowRunnerTest extends TestCase
->willReturnOnConsecutiveCalls($job1, $job2, null)
;
$this->dispatcher
->expects($this->exactly(4))
->method('dispatch')
->withConsecutive(
[
Events::BEFORE_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
})
],
[
Events::AFTER_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
})
],
[
Events::BEFORE_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
})
],
[
Events::AFTER_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
})
]
)
;
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher
->expects($this->exactly(4))
->method('dispatch')
->withConsecutive(
[
Events::BEFORE_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
})
],
[
Events::AFTER_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
})
],
[
Events::BEFORE_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
})
],
[
Events::AFTER_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
})
]
);
} else { // Symfony 5.0+
$this->dispatcher
->expects($this->exactly(4))
->method('dispatch')
->withConsecutive(
[
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
}),
Events::BEFORE_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job1) {
return $event->getJob() === $job1;
}),
Events::AFTER_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
}),
Events::BEFORE_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job2) {
return $event->getJob() === $job2;
}),
Events::AFTER_PROCESSING,
]
);
}
$dataflowType1 = $this->createMock(DataflowTypeInterface::class);
$dataflowType2 = $this->createMock(DataflowTypeInterface::class);

View File

@@ -38,20 +38,18 @@
"require": {
"php": "^7.1",
"doctrine/dbal": "^2.0",
"seld/signal-handler": "^1.0",
"symfony/config": "^3.4||^4.0",
"symfony/console": "^3.4||^4.0",
"symfony/dependency-injection": "^3.4||^4.0",
"symfony/event-dispatcher": "^3.4||^4.0",
"symfony/http-kernel": "^3.4||^4.0",
"symfony/lock": "^3.4||^4.0",
"symfony/options-resolver": "^3.4||^4.0",
"symfony/validator": "^3.4||^4.0",
"symfony/yaml": "^3.4||^4.0",
"doctrine/doctrine-bundle": "^1.0"
"symfony/config": "^3.4||^4.0||^5.0",
"symfony/console": "^3.4||^4.0||^5.0",
"symfony/dependency-injection": "^3.4||^4.0||^5.0",
"symfony/event-dispatcher": "^3.4||^4.0||^5.0",
"symfony/http-kernel": "^3.4||^4.0||^5.0",
"symfony/lock": "^3.4||^4.0||^5.0",
"symfony/options-resolver": "^3.4||^4.0||^5.0",
"symfony/validator": "^3.4||^4.0||^5.0",
"symfony/yaml": "^3.4||^4.0||^5.0",
"doctrine/doctrine-bundle": "^1.0||^2.0"
},
"require-dev": {
"friendsofphp/php-cs-fixer": "^2.15",
"phpunit/phpunit": "^7||^8"
},
"suggest": {
@@ -62,7 +60,8 @@
},
"extra": {
"branch-alias": {
"dev-master": "2.x-dev",
"dev-master": "2.1.x-dev",
"dev-v2.0.x": "2.0.x-dev",
"dev-v1.x": "1.x-dev"
}
}

View File

@@ -5,6 +5,7 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Symfony\Component\Console\Command\Command;
@@ -13,7 +14,6 @@ use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Validator\Validator\ValidatorInterface;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
/**
* @codeCoverageIgnore

View File

@@ -5,6 +5,7 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
@@ -12,7 +13,6 @@ use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
/**
* @codeCoverageIgnore

View File

@@ -4,13 +4,14 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
/**
* Runs one dataflow.
@@ -27,12 +28,16 @@ class ExecuteDataflowCommand extends Command
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(DataflowTypeRegistryInterface $registry, ConnectionFactory $connectionFactory)
/** @var LoggerInterface */
private $logger;
public function __construct(DataflowTypeRegistryInterface $registry, ConnectionFactory $connectionFactory, LoggerInterface $logger)
{
parent::__construct();
$this->registry = $registry;
$this->connectionFactory = $connectionFactory;
$this->logger = $logger;
}
/**
@@ -72,6 +77,17 @@ EOF
$output->writeln('End time: '.$result->getEndTime()->format('Y/m/d H:i:s'));
$output->writeln('Success: '.$result->getSuccessCount());
if ($result->hasErrors() > 0) {
$output->writeln('<error> Errors: '.$result->getErrorCount().' </error>');
$output->writeln('<error> Exceptions traces are available in the logs. </error>');
foreach ($result->getExceptions() as $e) {
$this->logger->error('Error during processing : '.$e->getMessage(), ['exception' => $e]);
}
return 1;
}
return 0;
}
}

View File

@@ -5,13 +5,13 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
/**
* @codeCoverageIgnore

View File

@@ -4,6 +4,7 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManagerInterface;
use CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface;
use Symfony\Component\Console\Command\Command;
@@ -11,7 +12,6 @@ use Symfony\Component\Console\Command\LockableTrait;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
/**
* Runs dataflows according to user-defined schedule.

View File

@@ -4,13 +4,13 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
/**
* @codeCoverageIgnore

View File

@@ -4,6 +4,7 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Command;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use CodeRhapsodie\DataflowBundle\SchemaProvider\DataflowSchemaProvider;
@@ -14,7 +15,6 @@ use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
/**
* @codeCoverageIgnore
@@ -93,7 +93,7 @@ class SchemaCommand extends Command
$io = new SymfonyStyle($input, $output);
$io->text('Execute these SQL Queries on your database:');
foreach ($sqls as $sql) {
$io->text($sql);
$io->text($sql.';');
}
}
}

View File

@@ -21,10 +21,6 @@ class Dataflow implements DataflowInterface
/** @var WriterInterface[] */
private $writers = [];
/**
* @param iterable $reader
* @param string|null $name
*/
public function __construct(iterable $reader, ?string $name)
{
$this->reader = $reader;
@@ -32,8 +28,6 @@ class Dataflow implements DataflowInterface
}
/**
* @param callable $step
*
* @return $this
*/
public function addStep(callable $step): self
@@ -44,8 +38,6 @@ class Dataflow implements DataflowInterface
}
/**
* @param WriterInterface $writer
*
* @return $this
*/
public function addWriter(WriterInterface $writer): self
@@ -71,7 +63,7 @@ class Dataflow implements DataflowInterface
foreach ($this->reader as $index => $item) {
try {
$this->processItem($item);
} catch (\Exception $e) {
} catch (\Throwable $e) {
$exceptions[$index] = $e;
}

View File

@@ -13,8 +13,6 @@ interface DataflowInterface
{
/**
* Processes the data.
*
* @return Result
*/
public function process(): Result;
}

View File

@@ -33,13 +33,6 @@ class Result
/** @var array */
private $exceptions;
/**
* @param string $name
* @param \DateTimeInterface $startTime
* @param \DateTimeInterface $endTime
* @param int $totalCount
* @param \SplObjectStorage $exceptions
*/
public function __construct(string $name, \DateTimeInterface $startTime, \DateTimeInterface $endTime, int $totalCount, array $exceptions)
{
$this->name = $name;
@@ -52,73 +45,46 @@ class Result
$this->exceptions = $exceptions;
}
/**
* @return string
*/
public function getName(): string
{
return $this->name;
}
/**
* @return \DateTimeInterface
*/
public function getStartTime(): \DateTimeInterface
{
return $this->startTime;
}
/**
* @return \DateTimeInterface
*/
public function getEndTime(): \DateTimeInterface
{
return $this->endTime;
}
/**
* @return \DateInterval
*/
public function getElapsed(): \DateInterval
{
return $this->elapsed;
}
/**
* @return int
*/
public function getErrorCount(): int
{
return $this->errorCount;
}
/**
* @return int
*/
public function getSuccessCount(): int
{
return $this->successCount;
}
/**
* @return int
*/
public function getTotalProcessedCount(): int
{
return $this->totalProcessedCount;
}
/**
* @return bool
*/
public function hasErrors(): bool
{
return $this->errorCount > 0;
}
/**
* @return array
*/
public function getExceptions(): array
{
return $this->exceptions;

View File

@@ -0,0 +1,62 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType\Writer;
use CodeRhapsodie\DataflowBundle\Exceptions\UnsupportedItemTypeException;
/**
* Delegates the writing of each item in a collection to an embedded writer.
*/
class CollectionWriter implements DelegateWriterInterface
{
/** @var WriterInterface */
private $writer;
/**
* CollectionWriter constructor.
*/
public function __construct(WriterInterface $writer)
{
$this->writer = $writer;
}
/**
* {@inheritdoc}
*/
public function prepare()
{
$this->writer->prepare();
}
/**
* {@inheritdoc}
*/
public function write($collection)
{
if (!is_iterable($collection)) {
throw new UnsupportedItemTypeException(sprintf('Item to write was expected to be an iterable, received %s.', is_object($collection) ? get_class($collection) : gettype($collection)));
}
foreach ($collection as $item) {
$this->writer->write($item);
}
}
/**
* {@inheritdoc}
*/
public function finish()
{
$this->writer->finish();
}
/**
* {@inheritdoc}
*/
public function supports($item): bool
{
return is_iterable($item);
}
}

View File

@@ -0,0 +1,18 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType\Writer;
/**
* A writer that can be used as a delegate of DelegatorWriter.
*/
interface DelegateWriterInterface extends WriterInterface
{
/**
* Returns true if the argument is of a supported type.
*
* @param $item
*/
public function supports($item): bool;
}

View File

@@ -0,0 +1,96 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType\Writer;
use CodeRhapsodie\DataflowBundle\Exceptions\UnsupportedItemTypeException;
/**
* Writer that delegated the actual writing to other writers.
*/
class DelegatorWriter implements DelegateWriterInterface
{
/** @var DelegateWriterInterface[] */
private $delegates;
/**
* DelegatorWriter constructor.
*/
public function __construct()
{
$this->delegates = [];
}
/**
* {@inheritdoc}
*/
public function prepare()
{
foreach ($this->delegates as $delegate) {
$delegate->prepare();
}
}
/**
* {@inheritdoc}
*/
public function write($item)
{
foreach ($this->delegates as $delegate) {
if (!$delegate->supports($item)) {
continue;
}
$delegate->write($item);
return;
}
throw new UnsupportedItemTypeException(sprintf('None of the registered delegate writers support the received item of type %s', is_object($item) ? get_class($item) : gettype($item)));
}
/**
* {@inheritdoc}
*/
public function finish()
{
foreach ($this->delegates as $delegate) {
$delegate->finish();
}
}
/**
* {@inheritdoc}
*/
public function supports($item): bool
{
foreach ($this->delegates as $delegate) {
if ($delegate->supports($item)) {
return true;
}
}
return false;
}
/**
* Registers a collection of delegates.
*
* @param iterable|DelegateWriterInterface[] $delegates
*/
public function addDelegates(iterable $delegates): void
{
foreach ($delegates as $delegate) {
$this->addDelegate($delegate);
}
}
/**
* Registers one delegate.
*/
public function addDelegate(DelegateWriterInterface $delegate): void
{
$this->delegates[] = $delegate;
}
}

View File

@@ -4,11 +4,25 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\DataflowType\Writer;
/**
* Represents a writer for dataflows.
*/
interface WriterInterface
{
/**
* Called before the dataflow is processed.
*/
public function prepare();
/**
* Write an item.
*
* @param mixed $item
*/
public function write($item);
/**
* Called after the dataflow is processed.
*/
public function finish();
}

View File

@@ -99,8 +99,6 @@ class Job
private $endTime;
/**
* @param ScheduledDataflow $scheduled
*
* @return Job
*/
public static function createFromScheduledDataflow(ScheduledDataflow $scheduled): self
@@ -124,8 +122,7 @@ class Job
{
$lost = array_diff(static::KEYS, array_keys($datas));
if (count($lost) > 0) {
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ',
$lost).'"');
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ', $lost).'"');
}
$job = new self();
@@ -161,11 +158,6 @@ class Job
];
}
/**
* @param int $id
*
* @return Job
*/
public function setId(int $id): Job
{
$this->id = $id;
@@ -173,27 +165,16 @@ class Job
return $this;
}
/**
* @return int|null
*/
public function getId(): ?int
{
return $this->id;
}
/**
* @return int
*/
public function getStatus(): int
{
return $this->status;
}
/**
* @param int $status
*
* @return Job
*/
public function setStatus(int $status): Job
{
$this->status = $status;
@@ -201,19 +182,11 @@ class Job
return $this;
}
/**
* @return string|null
*/
public function getLabel(): ?string
{
return $this->label;
}
/**
* @param string|null $label
*
* @return Job
*/
public function setLabel(?string $label): Job
{
$this->label = $label;
@@ -221,19 +194,11 @@ class Job
return $this;
}
/**
* @return string|null
*/
public function getDataflowType(): ?string
{
return $this->dataflowType;
}
/**
* @param string|null $dataflowType
*
* @return Job
*/
public function setDataflowType(?string $dataflowType): Job
{
$this->dataflowType = $dataflowType;
@@ -241,19 +206,11 @@ class Job
return $this;
}
/**
* @return array|null
*/
public function getOptions(): ?array
{
return $this->options;
}
/**
* @param array|null $options
*
* @return Job
*/
public function setOptions(?array $options): Job
{
$this->options = $options;
@@ -261,19 +218,11 @@ class Job
return $this;
}
/**
* @return \DateTimeInterface|null
*/
public function getRequestedDate(): ?\DateTimeInterface
{
return $this->requestedDate;
}
/**
* @param \DateTimeInterface|null $requestedDate
*
* @return Job
*/
public function setRequestedDate(?\DateTimeInterface $requestedDate): Job
{
$this->requestedDate = $requestedDate;
@@ -281,19 +230,11 @@ class Job
return $this;
}
/**
* @return int|null
*/
public function getScheduledDataflowId(): ?int
{
return $this->scheduledDataflowId;
}
/**
* @param int|null $scheduledDataflowId
*
* @return Job
*/
public function setScheduledDataflowId(?int $scheduledDataflowId): Job
{
$this->scheduledDataflowId = $scheduledDataflowId;
@@ -301,19 +242,11 @@ class Job
return $this;
}
/**
* @return int|null
*/
public function getCount(): ?int
{
return $this->count;
}
/**
* @param int|null $count
*
* @return Job
*/
public function setCount(?int $count): Job
{
$this->count = $count;
@@ -321,19 +254,11 @@ class Job
return $this;
}
/**
* @return array|null
*/
public function getExceptions(): ?array
{
return $this->exceptions;
}
/**
* @param array|null $exceptions
*
* @return Job
*/
public function setExceptions(?array $exceptions): Job
{
$this->exceptions = $exceptions;
@@ -341,19 +266,11 @@ class Job
return $this;
}
/**
* @return \DateTimeInterface|null
*/
public function getStartTime(): ?\DateTimeInterface
{
return $this->startTime;
}
/**
* @param \DateTimeInterface|null $startTime
*
* @return Job
*/
public function setStartTime(?\DateTimeInterface $startTime): Job
{
$this->startTime = $startTime;
@@ -361,19 +278,11 @@ class Job
return $this;
}
/**
* @return \DateTimeInterface|null
*/
public function getEndTime(): ?\DateTimeInterface
{
return $this->endTime;
}
/**
* @param \DateTimeInterface|null $endTime
*
* @return Job
*/
public function setEndTime(?\DateTimeInterface $endTime): Job
{
$this->endTime = $endTime;

View File

@@ -73,8 +73,7 @@ class ScheduledDataflow
{
$lost = array_diff(static::KEYS, array_keys($datas));
if (count($lost) > 0) {
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ',
$lost).'"');
throw new \LogicException('The first argument of '.__METHOD__.' must be contains: "'.implode(', ', $lost).'"');
}
$scheduledDataflow = new self();
@@ -103,11 +102,6 @@ class ScheduledDataflow
];
}
/**
* @param int $id
*
* @return ScheduledDataflow
*/
public function setId(int $id): ScheduledDataflow
{
$this->id = $id;
@@ -115,27 +109,16 @@ class ScheduledDataflow
return $this;
}
/**
* @return int|null
*/
public function getId(): ?int
{
return $this->id;
}
/**
* @return string|null
*/
public function getLabel(): ?string
{
return $this->label;
}
/**
* @param string|null $label
*
* @return ScheduledDataflow
*/
public function setLabel(?string $label): ScheduledDataflow
{
$this->label = $label;
@@ -143,19 +126,11 @@ class ScheduledDataflow
return $this;
}
/**
* @return string|null
*/
public function getDataflowType(): ?string
{
return $this->dataflowType;
}
/**
* @param string|null $dataflowType
*
* @return ScheduledDataflow
*/
public function setDataflowType(?string $dataflowType): ScheduledDataflow
{
$this->dataflowType = $dataflowType;
@@ -163,19 +138,11 @@ class ScheduledDataflow
return $this;
}
/**
* @return array|null
*/
public function getOptions(): ?array
{
return $this->options;
}
/**
* @param array|null $options
*
* @return ScheduledDataflow
*/
public function setOptions(?array $options): ScheduledDataflow
{
$this->options = $options;
@@ -183,19 +150,11 @@ class ScheduledDataflow
return $this;
}
/**
* @return string|null
*/
public function getFrequency(): ?string
{
return $this->frequency;
}
/**
* @param string|null $frequency
*
* @return ScheduledDataflow
*/
public function setFrequency(?string $frequency): ScheduledDataflow
{
$this->frequency = $frequency;
@@ -203,19 +162,11 @@ class ScheduledDataflow
return $this;
}
/**
* @return \DateTimeInterface|null
*/
public function getNext(): ?\DateTimeInterface
{
return $this->next;
}
/**
* @param \DateTimeInterface|null $next
*
* @return ScheduledDataflow
*/
public function setNext(?\DateTimeInterface $next): ScheduledDataflow
{
$this->next = $next;
@@ -223,19 +174,11 @@ class ScheduledDataflow
return $this;
}
/**
* @return bool|null
*/
public function getEnabled(): ?bool
{
return $this->enabled;
}
/**
* @param bool|null $enabled
*
* @return ScheduledDataflow
*/
public function setEnabled(?bool $enabled): ScheduledDataflow
{
$this->enabled = $enabled;

20
src/Event/CrEvent.php Normal file
View File

@@ -0,0 +1,20 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Event;
/*
* @codeCoverageIgnore
*/
if (class_exists('Symfony\Contracts\EventDispatcher\Event')) {
// For Symfony 5.0+
abstract class CrEvent extends \Symfony\Contracts\EventDispatcher\Event
{
}
} else {
// For Symfony 3.4 to 4.4
abstract class CrEvent extends \Symfony\Component\EventDispatcher\Event
{
}
}

View File

@@ -5,31 +5,25 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Event;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use Symfony\Component\EventDispatcher\Event;
/**
* Event used during the dataflow lifecycle.
*
* @codeCoverageIgnore
*/
class ProcessingEvent extends Event
class ProcessingEvent extends CrEvent
{
/** @var Job */
private $job;
/**
* ProcessingEvent constructor.
*
* @param Job $job
*/
public function __construct(Job $job)
{
$this->job = $job;
}
/**
* @return Job
*/
public function getJob(): Job
{
return $this->job;

View File

@@ -0,0 +1,12 @@
<?php
declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Exceptions;
/**
* Exception thrown when a writer receives an item of an unsupported type.
*/
class UnsupportedItemTypeException extends \Exception
{
}

View File

@@ -4,10 +4,10 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Manager;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Doctrine\DBAL\Driver\Connection;
/**
@@ -53,9 +53,6 @@ class ScheduledDataflowManager implements ScheduledDataflowManagerInterface
$this->connection->commit();
}
/**
* @param ScheduledDataflow $scheduled
*/
private function updateScheduledDataflowNext(ScheduledDataflow $scheduled): void
{
$interval = \DateInterval::createFromDateString($scheduled->getFrequency());
@@ -70,9 +67,6 @@ class ScheduledDataflowManager implements ScheduledDataflowManagerInterface
$this->scheduledDataflowRepository->save($scheduled);
}
/**
* @param ScheduledDataflow $scheduled
*/
private function createPendingForScheduled(ScheduledDataflow $scheduled): void
{
$this->jobRepository->save(Job::createFromScheduledDataflow($scheduled));

View File

@@ -4,8 +4,8 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Registry;
use CodeRhapsodie\DataflowBundle\Exceptions\UnknownDataflowTypeException;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowTypeInterface;
use CodeRhapsodie\DataflowBundle\Exceptions\UnknownDataflowTypeException;
/**
* Array based dataflow types registry.

View File

@@ -13,10 +13,6 @@ interface DataflowTypeRegistryInterface
{
/**
* Get a registered dataflow type from its FQCN or one of its aliases.
*
* @param string $fqcnOrAlias
*
* @return DataflowTypeInterface
*/
public function getDataflowType(string $fqcnOrAlias): DataflowTypeInterface;
@@ -29,8 +25,6 @@ interface DataflowTypeRegistryInterface
/**
* Registers a dataflow type.
*
* @param DataflowTypeInterface $dataflowType
*/
public function registerDataflowType(DataflowTypeInterface $dataflowType): void;
}

View File

@@ -41,8 +41,6 @@ class JobRepository
/**
* JobRepository constructor.
*
* @param Connection $connection
*/
public function __construct(Connection $connection)
{

View File

@@ -35,8 +35,6 @@ class ScheduledDataflowRepository
/**
* JobRepository constructor.
*
* @param Connection $connection
*/
public function __construct(Connection $connection)
{

View File

@@ -23,6 +23,7 @@ services:
arguments:
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
$logger: '@logger'
tags: ['console.command']
CodeRhapsodie\DataflowBundle\Command\JobShowCommand:
@@ -61,7 +62,7 @@ services:
coderhapsodie.dataflow.connection.internal:
lazy: true
class: Doctrine\DBAL\Driver\Connection
class: Doctrine\DBAL\Connection
factory: ['@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory', 'getConnection']
CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory:

View File

@@ -45,12 +45,14 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
}
}
/**
* @param Job $job
*/
private function beforeProcessing(Job $job): void
{
$this->dispatcher->dispatch(Events::BEFORE_PROCESSING, new ProcessingEvent($job));
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher->dispatch(Events::BEFORE_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::BEFORE_PROCESSING);
}
$job
->setStatus(Job::STATUS_RUNNING)
@@ -59,10 +61,6 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
$this->repository->save($job);
}
/**
* @param Job $job
* @param Result $result
*/
private function afterProcessing(Job $job, Result $result): void
{
$exceptions = [];
@@ -79,6 +77,11 @@ class PendingDataflowRunner implements PendingDataflowRunnerInterface
;
$this->repository->save($job);
$this->dispatcher->dispatch(Events::AFTER_PROCESSING, new ProcessingEvent($job));
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$this->dispatcher->dispatch(Events::AFTER_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING);
}
}
}

View File

@@ -19,10 +19,10 @@ class DataflowSchemaProvider
{
$schema = new Schema();
$tableJob = $schema->createTable(JobRepository::TABLE_NAME);
$tableJob->addColumn('id', 'integer', array(
$tableJob->addColumn('id', 'integer', [
'autoincrement' => true,
));
$tableJob->setPrimaryKey(array('id'));
]);
$tableJob->setPrimaryKey(['id']);
$tableJob->addColumn('scheduled_dataflow_id', 'integer', ['notnull' => false]);
$tableJob->addColumn('status', 'integer', ['notnull' => true]);
@@ -36,10 +36,10 @@ class DataflowSchemaProvider
$tableJob->addColumn('end_time', 'datetime', ['notnull' => false]);
$tableSchedule = $schema->createTable(ScheduledDataflowRepository::TABLE_NAME);
$tableSchedule->addColumn('id', 'integer', array(
$tableSchedule->addColumn('id', 'integer', [
'autoincrement' => true,
));
$tableSchedule->setPrimaryKey(array('id'));
]);
$tableSchedule->setPrimaryKey(['id']);
$tableSchedule->addColumn('label', 'string', ['notnull' => true, 'length' => 255]);
$tableSchedule->addColumn('dataflow_type', 'string', ['notnull' => true, 'length' => 255]);
$tableSchedule->addColumn('options', 'json', ['notnull' => true]);