mirror of
https://github.com/code-rhapsodie/dataflow-bundle.git
synced 2026-03-24 06:42:23 +01:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4c80b3c51b | ||
|
|
23801f8785 | ||
|
|
689d79535c |
16
README.md
16
README.md
@@ -1,4 +1,4 @@
|
||||
# Code-Rhapsodie Dataflow Bundle
|
||||
# Code Rhapsodie Dataflow Bundle
|
||||
|
||||
DataflowBundle is a bundle for Symfony 3.4+
|
||||
providing an easy way to create import / export dataflow.
|
||||
@@ -7,6 +7,20 @@ providing an easy way to create import / export dataflow.
|
||||
|
||||
[](https://coveralls.io/github/code-rhapsodie/dataflow-bundle)
|
||||
|
||||
Dataflow uses a linear generic workflow in three parts:
|
||||
* one reader
|
||||
* any number of steps
|
||||
* one or more writers
|
||||
|
||||
The reader can read data from anywhere and return data row by row. Each step processes the current row data.
|
||||
The steps are executed in the order in which they are added.
|
||||
And, one or more writers save the row anywhere you want.
|
||||
|
||||
As the following schema shows, you can define more than one dataflow:
|
||||
|
||||

|
||||
|
||||
|
||||
# Features
|
||||
|
||||
* Define and configure a Dataflow
|
||||
|
||||
53
Tests/DataflowType/AbstractDataflowTypeTest.php
Normal file
53
Tests/DataflowType/AbstractDataflowTypeTest.php
Normal file
@@ -0,0 +1,53 @@
|
||||
<?php
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType;
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder;
|
||||
use PHPUnit\Framework\Constraint\IsIdentical;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\OptionsResolver\OptionsResolver;
|
||||
|
||||
class AbstractDataflowTypeTest extends TestCase
|
||||
{
|
||||
public function testProcess()
|
||||
{
|
||||
$label = 'Test label';
|
||||
$options = ['testOption' => 'Test value'];
|
||||
$values = [1, 2, 3];
|
||||
|
||||
$dataflowType = new class($label, $options, $values) extends AbstractDataflowType
|
||||
{
|
||||
private $label;
|
||||
private $options;
|
||||
private $values;
|
||||
|
||||
public function __construct(string $label, array $options, array $values)
|
||||
{
|
||||
$this->label = $label;
|
||||
$this->options = $options;
|
||||
$this->values = $values;
|
||||
}
|
||||
|
||||
public function getLabel(): string
|
||||
{
|
||||
return $this->label;
|
||||
}
|
||||
|
||||
protected function configureOptions(OptionsResolver $optionsResolver): void
|
||||
{
|
||||
$optionsResolver->setDefined('testOption');
|
||||
}
|
||||
|
||||
protected function buildDataflow(DataflowBuilder $builder, array $options): void
|
||||
{
|
||||
$builder->setReader($this->values);
|
||||
(new IsIdentical($this->options))->evaluate($options);
|
||||
}
|
||||
};
|
||||
|
||||
$result = $dataflowType->process($options);
|
||||
$this->assertSame($label, $result->getName());
|
||||
$this->assertSame(count($values), $result->getTotalProcessedCount());
|
||||
}
|
||||
}
|
||||
37
Tests/DataflowType/Writer/PortWriterAdapterTest.php
Normal file
37
Tests/DataflowType/Writer/PortWriterAdapterTest.php
Normal file
@@ -0,0 +1,37 @@
|
||||
<?php
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\Tests\DataflowType\Writer;
|
||||
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
class PortWriterAdapterTest extends TestCase
|
||||
{
|
||||
public function testAll()
|
||||
{
|
||||
$value = 'not an array';
|
||||
|
||||
$writer = $this->getMockBuilder('\Port\Writer')
|
||||
->setMethods(['prepare', 'finish', 'writeItem'])
|
||||
->getMock()
|
||||
;
|
||||
$writer
|
||||
->expects($this->once())
|
||||
->method('prepare')
|
||||
;
|
||||
$writer
|
||||
->expects($this->once())
|
||||
->method('finish')
|
||||
;
|
||||
$writer
|
||||
->expects($this->once())
|
||||
->method('writeItem')
|
||||
->with([$value])
|
||||
;
|
||||
|
||||
$adapter = new PortWriterAdapter($writer);
|
||||
$adapter->prepare();
|
||||
$adapter->write($value);
|
||||
$adapter->finish();
|
||||
}
|
||||
}
|
||||
@@ -103,10 +103,8 @@ class PendingDataflowRunnerTest extends TestCase
|
||||
->willReturnOnConsecutiveCalls($dataflowType1, $dataflowType2)
|
||||
;
|
||||
|
||||
$bag1 = new \SplObjectStorage();
|
||||
$bag1->attach(new \Exception('message1'));
|
||||
$bag2 = new \SplObjectStorage();
|
||||
$bag2->attach(new \Exception('message2'));
|
||||
$bag1 = [new \Exception('message1')];
|
||||
$bag2 = [new \Exception('message2')];
|
||||
|
||||
$result1 = new Result('name', new \DateTime(), $end1 = new \DateTime(), $count1 = 10, $bag1);
|
||||
$result2 = new Result('name', new \DateTime(), $end2 = new \DateTime(), $count2 = 20, $bag2);
|
||||
|
||||
@@ -47,7 +47,8 @@
|
||||
"symfony/lock": "^3.4||^4.0",
|
||||
"symfony/options-resolver": "^3.4||^4.0",
|
||||
"symfony/validator": "^3.4||^4.0",
|
||||
"symfony/yaml": "^3.4||^4.0"
|
||||
"symfony/yaml": "^3.4||^4.0",
|
||||
"doctrine/doctrine-bundle": "^1.0"
|
||||
},
|
||||
"require-dev": {
|
||||
"friendsofphp/php-cs-fixer": "^2.15",
|
||||
|
||||
@@ -9,6 +9,9 @@ use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\DataflowTypeCompil
|
||||
use Symfony\Component\DependencyInjection\ContainerBuilder;
|
||||
use Symfony\Component\HttpKernel\Bundle\Bundle;
|
||||
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class CodeRhapsodieDataflowBundle extends Bundle
|
||||
{
|
||||
protected $name = 'CodeRhapsodieDataflowBundle';
|
||||
|
||||
@@ -14,6 +14,9 @@ use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
use Symfony\Component\Validator\Validator\ValidatorInterface;
|
||||
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class AddScheduledDataflowCommand extends Command
|
||||
{
|
||||
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:add';
|
||||
|
||||
@@ -13,6 +13,9 @@ use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class ChangeScheduleStatusCommand extends Command
|
||||
{
|
||||
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:change-status';
|
||||
|
||||
@@ -12,6 +12,8 @@ use Symfony\Component\Console\Output\OutputInterface;
|
||||
|
||||
/**
|
||||
* Runs one dataflow.
|
||||
*
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class ExecuteDataflowCommand extends Command
|
||||
{
|
||||
|
||||
@@ -12,6 +12,9 @@ use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class JobShowCommand extends Command
|
||||
{
|
||||
private const STATUS_MAPPING = [
|
||||
|
||||
@@ -13,6 +13,8 @@ use Symfony\Component\Console\Output\OutputInterface;
|
||||
|
||||
/**
|
||||
* Runs dataflows according to user-defined schedule.
|
||||
*
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class RunPendingDataflowsCommand extends Command
|
||||
{
|
||||
|
||||
@@ -10,6 +10,9 @@ use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class ScheduleListCommand extends Command
|
||||
{
|
||||
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:list';
|
||||
|
||||
@@ -8,6 +8,9 @@ use Symfony\Component\OptionsResolver\OptionsResolver;
|
||||
|
||||
abstract class AbstractDataflowType implements DataflowTypeInterface
|
||||
{
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
public function getAliases(): iterable
|
||||
{
|
||||
return [];
|
||||
@@ -28,6 +31,9 @@ abstract class AbstractDataflowType implements DataflowTypeInterface
|
||||
return $dataflow->process();
|
||||
}
|
||||
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
protected function configureOptions(OptionsResolver $optionsResolver): void
|
||||
{
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ namespace CodeRhapsodie\DataflowBundle\DataflowType\Dataflow;
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\Result;
|
||||
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
|
||||
use CodeRhapsodie\DataflowBundle\Exceptions\InterruptedProcessingException;
|
||||
use Seld\Signal\SignalHandler;
|
||||
|
||||
class Dataflow implements DataflowInterface
|
||||
{
|
||||
@@ -63,13 +62,9 @@ class Dataflow implements DataflowInterface
|
||||
public function process(): Result
|
||||
{
|
||||
$count = 0;
|
||||
$exceptions = new \SplObjectStorage();
|
||||
$exceptions = [];
|
||||
$startTime = new \DateTime();
|
||||
|
||||
SignalHandler::create(['SIGTERM', 'SIGINT'], function () {
|
||||
throw new InterruptedProcessingException();
|
||||
});
|
||||
|
||||
foreach ($this->writers as $writer) {
|
||||
$writer->prepare();
|
||||
}
|
||||
@@ -78,7 +73,7 @@ class Dataflow implements DataflowInterface
|
||||
try {
|
||||
$this->processItem($item);
|
||||
} catch (\Exception $e) {
|
||||
$exceptions->attach($e, $index);
|
||||
$exceptions[$index] = $e;
|
||||
}
|
||||
|
||||
++$count;
|
||||
|
||||
@@ -4,6 +4,9 @@ declare(strict_types=1);
|
||||
|
||||
namespace CodeRhapsodie\DataflowBundle\DataflowType;
|
||||
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class Result
|
||||
{
|
||||
/** @var string */
|
||||
@@ -27,7 +30,7 @@ class Result
|
||||
/** @var int */
|
||||
private $totalProcessedCount = 0;
|
||||
|
||||
/** @var \SplObjectStorage */
|
||||
/** @var array */
|
||||
private $exceptions;
|
||||
|
||||
/**
|
||||
@@ -37,7 +40,7 @@ class Result
|
||||
* @param int $totalCount
|
||||
* @param \SplObjectStorage $exceptions
|
||||
*/
|
||||
public function __construct(string $name, \DateTimeInterface $startTime, \DateTimeInterface $endTime, int $totalCount, \SplObjectStorage $exceptions)
|
||||
public function __construct(string $name, \DateTimeInterface $startTime, \DateTimeInterface $endTime, int $totalCount, array $exceptions)
|
||||
{
|
||||
$this->name = $name;
|
||||
$this->startTime = $startTime;
|
||||
@@ -114,9 +117,9 @@ class Result
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \SplObjectStorage
|
||||
* @return array
|
||||
*/
|
||||
public function getExceptions(): \SplObjectStorage
|
||||
public function getExceptions(): array
|
||||
{
|
||||
return $this->exceptions;
|
||||
}
|
||||
|
||||
@@ -10,6 +10,9 @@ use Symfony\Component\DependencyInjection\ContainerBuilder;
|
||||
use Symfony\Component\DependencyInjection\Extension\Extension;
|
||||
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
|
||||
|
||||
/**
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class CodeRhapsodieDataflowExtension extends Extension
|
||||
{
|
||||
public function load(array $configs, ContainerBuilder $container)
|
||||
|
||||
@@ -11,6 +11,8 @@ use Symfony\Component\DependencyInjection\Reference;
|
||||
|
||||
/**
|
||||
* Registers dataflow types in the registry.
|
||||
*
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class DataflowTypeCompilerPass implements CompilerPassInterface
|
||||
{
|
||||
|
||||
@@ -11,6 +11,8 @@ use Doctrine\ORM\Mapping as ORM;
|
||||
*
|
||||
* @ORM\Entity(repositoryClass="CodeRhapsodie\DataflowBundle\Repository\JobRepository")
|
||||
* @ORM\Table(name="cr_dataflow_job")
|
||||
*
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class Job
|
||||
{
|
||||
|
||||
@@ -12,6 +12,8 @@ use Doctrine\ORM\Mapping as ORM;
|
||||
*
|
||||
* @ORM\Entity(repositoryClass="CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository")
|
||||
* @ORM\Table(name="cr_dataflow_scheduled")
|
||||
*
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class ScheduledDataflow
|
||||
{
|
||||
|
||||
@@ -9,6 +9,8 @@ use Symfony\Component\EventDispatcher\Event;
|
||||
|
||||
/**
|
||||
* Event used during the dataflow lifecycle.
|
||||
*
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class ProcessingEvent extends Event
|
||||
{
|
||||
|
||||
@@ -11,6 +11,8 @@ use Doctrine\ORM\EntityRepository;
|
||||
|
||||
/**
|
||||
* Repository.
|
||||
*
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class JobRepository extends EntityRepository
|
||||
{
|
||||
|
||||
@@ -10,6 +10,8 @@ use Doctrine\ORM\EntityRepository;
|
||||
|
||||
/**
|
||||
* Repository for the ScheduledDataflow entity.
|
||||
*
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class ScheduledDataflowRepository extends EntityRepository
|
||||
{
|
||||
|
||||
BIN
src/Resources/doc/schema.png
Normal file
BIN
src/Resources/doc/schema.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 130 KiB |
@@ -8,6 +8,8 @@ use Symfony\Component\Validator\Constraint;
|
||||
|
||||
/**
|
||||
* @Annotation
|
||||
*
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
class Frequency extends Constraint
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user