11 Commits

Author SHA1 Message Date
AUDUL
9d76b45771 Added custom index for job status (#78)
* Added custom index for job status
2025-07-04 09:26:13 +02:00
Matt Mankins
db37c4bdd1 Update Kudos Github Action to support generation from source repo only (#71)
* Update semicolons-kudos.yaml

* Update GitHub Action workflow for Semicolons Kudos Action

* Update semicolons-kudos.yaml

* Update GitHub Action workflow for Semicolons Kudos Action

* Update GitHub Action workflow for Semicolons Kudos Action

---------

Co-authored-by: semicolons-for-kudos[bot] <145267638+semicolons-for-kudos[bot]@users.noreply.github.com>
2023-12-27 17:29:46 +01:00
Olivier PORTIER
f20cd96ec5 Update semicolons-kudos.yaml 2023-12-20 11:49:50 +01:00
Olivier PORTIER
fd2c6aaab5 Update semicolons-kudos.yaml (#70) 2023-12-20 11:36:51 +01:00
Olivier PORTIER
4efd310a6e Initiate Kudos on dataflow-bundle by creating new file semicolons-kudos.yaml (#69) 2023-12-20 11:11:26 +01:00
Jérémy J
cec42a3337 Fix log exception argument typing 2023-12-06 13:56:16 +01:00
jbcr
d440ad008b add sonar config 2023-11-16 16:50:24 +01:00
jeremycr
e8b362526a Fix DBAL 2.12 compatibility break (#68) 2023-07-27 16:47:50 +02:00
jeremycr
3c56a90a93 Added the possibility to define a custom item index for exception logs (#66) 2023-07-27 09:38:22 +02:00
jeremycr
1b2b1be958 Removed travis for now, to be replaced by github actions in the future (#67) 2023-07-27 09:29:34 +02:00
jeremycr
25b2e9ec0f Upgrade for Symfony 6 (#65)
* Upgrade for Symfony 6
2022-08-18 09:36:43 +02:00
57 changed files with 337 additions and 709 deletions

View File

@@ -1,3 +0,0 @@
service_name : travis-ci
coverage_clover: var/build/clover.xml
json_path : var/build/upload.json

27
.github/workflows/build.yml vendored Normal file
View File

@@ -0,0 +1,27 @@
name: Build
on:
push:
branches:
- master
jobs:
build:
name: Build
runs-on: ubuntu-latest
permissions: read-all
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis
- uses: sonarsource/sonarqube-scan-action@master
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_HOST_URL: ${{ secrets.SONAR_HOST_URL }}
# If you wish to fail your job when the Quality Gate is red, uncomment the
# following lines. This would typically be used to fail a deployment.
# - uses: sonarsource/sonarqube-quality-gate-action@master
# timeout-minutes: 5
# env:
# SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}

26
.github/workflows/semicolons-kudos.yaml vendored Normal file
View File

@@ -0,0 +1,26 @@
name: Kudos for Code
on:
push:
branches: ["master"]
workflow_dispatch:
jobs:
kudos:
name: Semicolons Kudos
permissions: write-all
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: LoremLabs/kudos-for-code-action@latest
with:
search-dir: "."
destination: "artifact"
generate-nomerges: true
generate-validemails: true
generate-limitdepth: 0
generate-fromrepo: true
analyze-repo: false
skip-ids: ""

View File

@@ -1,135 +0,0 @@
language: php
sudo: false
cache:
directories:
- $HOME/.composer/cache
branches:
only:
- master
- /^\d+\.\d+$/
- travis-setup
env:
global:
- SYMFONY_DEPRECATIONS_HELPER="max[self]=0"
- PHPUNIT_FLAGS="-v"
- PHPUNIT_ENABLED="true"
- STABILITY=stable
- COVERALLS_ENABLED="false"
matrix:
fast_finish: true
include:
- php: '7.3'
- php: '7.4'
- php: '8.0'
# Enable code coverage with the previous supported PHP version
# - php: '7.4'
# env:
# - SYMFONY_VERSION=3.4.*
# - COVERALLS_ENABLED="true"
# - PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
# Enable code coverage with the latest supported PHP version
# - php: '8.0'
# env:
# - SYMFONY_VERSION=3.4.*
# - COVERALLS_ENABLED="true"
# - PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
# Minimum supported dependencies with the latest and oldest supported PHP versions
- php: '7.3'
env:
- COMPOSER_FLAGS="--prefer-lowest"
# Incompatibility between lowest symfony testing utils and phpunit
# - php: '8.0'
# env:
# - COMPOSER_FLAGS="--prefer-lowest"
# Test each supported Symfony version with lowest supported PHP version
# - php: '7.3'
# env:
# - SYMFONY_VERSION=3.4.*
- php: '7.3'
env:
- SYMFONY_VERSION=4.4.*
- php: '7.3'
env:
- COVERALLS_ENABLED="true"
- PHPUNIT_FLAGS="-v --coverage-text --coverage-clover var/build/clover.xml"
- SYMFONY_VERSION=5.2.*
# Test unsupported versions of Symfony
# - php: '7.3'
# env:
# - SYMFONY_VERSION=4.1.*
# - php: '7.3'
# env:
# - SYMFONY_VERSION=4.2.*
# - php: '7.3'
# env:
# - SYMFONY_VERSION=4.3.*
- php: '7.3'
env:
- SYMFONY_VERSION=5.0.*
- php: '7.3'
env:
- SYMFONY_VERSION=5.1.*
# Test upcoming Symfony versions with lowest supported PHP version and dev dependencies
# - php: '7.2'
# env:
# - STABILITY=dev
# - SYMFONY_VERSION=5.3.*
# Test upcoming PHP versions with dev dependencies
#- php: '7.5snapshot'
# env:
# - STABILITY=dev
# - COMPOSER_FLAGS="--ignore-platform-reqs --prefer-stable"
allow_failures:
# 4.0 not supported because of https://github.com/advisories/GHSA-pgwj-prpq-jpc2
- env:
- SYMFONY_VERSION=4.0.*
- env:
- SYMFONY_VERSION=4.1.*
- env:
- SYMFONY_VERSION=4.2.*
- env:
- STABILITY=dev
- COMPOSER_FLAGS="--ignore-platform-reqs --prefer-stable"
- env:
- STABILITY=dev
- SYMFONY_VERSION=5.1.*
- env:
- STABILITY=dev
- SYMFONY_VERSION=5.2.*
before_install:
- if [[ "$SYMFONY_VERSION" != "" ]]; then
travis_retry composer global require "symfony/flex:^1.4";
composer config extra.symfony.require $SYMFONY_VERSION;
fi
- if [[ "$STABILITY" != "stable" ]]; then
travis_retry composer config minimum-stability $STABILITY;
fi
- if [[ "$COVERALLS_ENABLED" != "true" ]]; then
phpenv config-rm xdebug.ini || true;
fi
- if [[ "$COVERALLS_ENABLED" == "true" ]]; then
travis_retry composer require --dev php-coveralls/php-coveralls:^2.0 --no-update $COMPOSER_FLAGS;
fi
install:
- travis_retry composer update --prefer-dist --no-interaction --no-progress --ansi $COMPOSER_FLAGS
script: ./vendor/bin/phpunit $PHPUNIT_FLAGS
after_success:
- if [[ "$PHPUNIT_ENABLED" == "true" && "$COVERALLS_ENABLED" == "true" ]]; then
./vendor/bin/php-coveralls -vvv --config .coveralls.yml;
fi;

View File

@@ -1,3 +1,14 @@
# Version 4.2.0
* Added custom index for job status
# Version 4.1.0
* Added custom index for exception log
# Version 4.0.0
* Added Symfony 6 support
* PHP minimum requirements bumped to 8.0
# Version 3.1.0
* Added optional "messenger mode", to delegate jobs execution to workers from the Symfony messenger component

View File

@@ -3,10 +3,6 @@
DataflowBundle is a bundle for Symfony 3.4+
providing an easy way to create import / export dataflow.
[![Build Status](https://travis-ci.com/code-rhapsodie/dataflow-bundle.svg?branch=master)](https://travis-ci.com/code-rhapsodie/dataflow-bundle)
[![Coverage Status](https://coveralls.io/repos/github/code-rhapsodie/dataflow-bundle/badge.svg)](https://coveralls.io/github/code-rhapsodie/dataflow-bundle)
Dataflow uses a linear generic workflow in three parts:
* one reader
* any number of steps that can be synchronous or asynchronous
@@ -20,7 +16,6 @@ As the following schema shows, you can define more than one dataflow:
![Dataflow schema](src/Resources/doc/schema.png)
# Features
* Define and configure a Dataflow
@@ -37,8 +32,6 @@ As the following schema shows, you can define more than one dataflow:
Security notice: Symfony 4.x is not supported before 4.1.12, see https://github.com/advisories/GHSA-pgwj-prpq-jpc2
And basically, every allowed-to-failed jobs in our travis configuration are not fully supported.
### Add the dependency
To install this bundle, run this command :
@@ -609,6 +602,8 @@ $ bin/console code-rhapsodie:dataflow:run-pending --connection=default
Please report issues and request features at https://github.com/code-rhapsodie/dataflow-bundle/issues.
Please note that only the last release of the 3.x and the 4.x versions of this bundle are actively supported.
# Contributing
Contributions are very welcome. Please see [CONTRIBUTING.md](CONTRIBUTING.md) for

View File

@@ -18,17 +18,8 @@ class AbstractDataflowTypeTest extends TestCase
$dataflowType = new class($label, $options, $values, $testCase) extends AbstractDataflowType
{
private $label;
private $options;
private $values;
private $testCase;
public function __construct(string $label, array $options, array $values, TestCase $testCase)
public function __construct(private string $label, private array $options, private array $values, private TestCase $testCase)
{
$this->label = $label;
$this->options = $options;
$this->values = $values;
$this->testCase = $testCase;
}
public function getLabel(): string

View File

@@ -15,9 +15,7 @@ class AMPAsyncDataflowTest extends TestCase
$reader = [1, 2, 3];
$result = [];
$dataflow = new AMPAsyncDataflow($reader, 'simple');
$dataflow->addStep(static function($item) {
return $item + 1;
});
$dataflow->addStep(static fn($item) => $item + 1);
$dataflow->addStep(static function($item): \Generator {
yield new Delayed(10); //delay 10 milliseconds
return $item * 2;

View File

@@ -43,9 +43,7 @@ class CollectionWriterTest extends TestCase
$embeddedWriter
->expects($this->exactly(count($values)))
->method('write')
->withConsecutive(...array_map(function ($item) {
return [$item];
}, $values))
->withConsecutive(...array_map(fn($item) => [$item], $values))
;
$writer = new CollectionWriter($embeddedWriter);

View File

@@ -10,34 +10,24 @@ use PHPUnit\Framework\TestCase;
class DelegatorWriterTest extends TestCase
{
/** @var DelegatorWriter */
private $delegatorWriter;
private \CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegatorWriter $delegatorWriter;
/** @var DelegateWriterInterface|MockObject */
private $delegateInt;
private \CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegateWriterInterface|\PHPUnit\Framework\MockObject\MockObject $delegateInt;
/** @var DelegateWriterInterface|MockObject */
private $delegateString;
private \CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegateWriterInterface|\PHPUnit\Framework\MockObject\MockObject $delegateString;
/** @var DelegateWriterInterface|MockObject */
private $delegateArray;
private \CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegateWriterInterface|\PHPUnit\Framework\MockObject\MockObject $delegateArray;
protected function setUp(): void
{
$this->delegateInt = $this->createMock(DelegateWriterInterface::class);
$this->delegateInt->method('supports')->willReturnCallback(function ($argument) {
return is_int($argument);
});
$this->delegateInt->method('supports')->willReturnCallback(fn($argument) => is_int($argument));
$this->delegateString = $this->createMock(DelegateWriterInterface::class);
$this->delegateString->method('supports')->willReturnCallback(function ($argument) {
return is_string($argument);
});
$this->delegateString->method('supports')->willReturnCallback(fn($argument) => is_string($argument));
$this->delegateArray = $this->createMock(DelegateWriterInterface::class);
$this->delegateArray->method('supports')->willReturnCallback(function ($argument) {
return is_array($argument);
});
$this->delegateArray->method('supports')->willReturnCallback(fn($argument) => is_array($argument));
$this->delegatorWriter = new DelegatorWriter();
$this->delegatorWriter->addDelegates([

View File

@@ -9,24 +9,19 @@ use CodeRhapsodie\DataflowBundle\Exceptions\UnknownDataflowTypeException;
use CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManager;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Doctrine\DBAL\Driver\Connection;
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\DBAL\Connection;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
class ScheduledDataflowManagerTest extends TestCase
{
/** @var ScheduledDataflowManager */
private $manager;
private \CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManager $manager;
/** @var Connection|MockObject */
private $connection;
private \Doctrine\DBAL\Connection|\PHPUnit\Framework\MockObject\MockObject $connection;
/** @var ScheduledDataflowRepository|MockObject */
private $scheduledDataflowRepository;
private \CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository|\PHPUnit\Framework\MockObject\MockObject $scheduledDataflowRepository;
/** @var JobRepository|MockObject */
private $jobRepository;
private \CodeRhapsodie\DataflowBundle\Repository\JobRepository|\PHPUnit\Framework\MockObject\MockObject $jobRepository;
protected function setUp(): void
{
@@ -70,16 +65,12 @@ class ScheduledDataflowManagerTest extends TestCase
->expects($this->once())
->method('save')
->with(
$this->callback(function (Job $job) use ($type, $options, $next, $label, $scheduled2) {
return (
$job->getStatus() === Job::STATUS_PENDING
&& $job->getDataflowType() === $type
&& $job->getOptions() === $options
&& $job->getRequestedDate() == $next
&& $job->getLabel() === $label
&& $job->getScheduledDataflowId() === $scheduled2->getId()
);
})
$this->callback(fn(Job $job) => $job->getStatus() === Job::STATUS_PENDING
&& $job->getDataflowType() === $type
&& $job->getOptions() === $options
&& $job->getRequestedDate() == $next
&& $job->getLabel() === $label
&& $job->getScheduledDataflowId() === $scheduled2->getId())
)
;

View File

@@ -14,14 +14,11 @@ use PHPUnit\Framework\TestCase;
class JobMessageHandlerTest extends TestCase
{
/** @var JobRepository|MockObject */
private $repository;
private \CodeRhapsodie\DataflowBundle\Repository\JobRepository|\PHPUnit\Framework\MockObject\MockObject $repository;
/** @var JobProcessorInterface|MockObject */
private $processor;
private \CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface|\PHPUnit\Framework\MockObject\MockObject $processor;
/** @var JobMessageHandler */
private $handler;
private \CodeRhapsodie\DataflowBundle\MessengerMode\JobMessageHandler $handler;
protected function setUp(): void
{

View File

@@ -16,17 +16,13 @@ use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class JobProcessorTest extends TestCase
{
/** @var JobProcessor */
private $processor;
private \CodeRhapsodie\DataflowBundle\Processor\JobProcessor $processor;
/** @var JobRepository|MockObject */
private $repository;
private \CodeRhapsodie\DataflowBundle\Repository\JobRepository|\PHPUnit\Framework\MockObject\MockObject $repository;
/** @var DataflowTypeRegistryInterface|MockObject */
private $registry;
private \CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface|\PHPUnit\Framework\MockObject\MockObject $registry;
/** @var EventDispatcherInterface|MockObject */
private $dispatcher;
private \Symfony\Component\EventDispatcher\EventDispatcherInterface|\PHPUnit\Framework\MockObject\MockObject $dispatcher;
protected function setUp(): void
{
@@ -47,22 +43,18 @@ class JobProcessorTest extends TestCase
;
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
if (!class_exists(\Symfony\Contracts\EventDispatcher\Event::class)) {
$this->dispatcher
->expects($this->exactly(2))
->method('dispatch')
->withConsecutive(
[
Events::BEFORE_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job) {
return $event->getJob() === $job;
})
$this->callback(fn(ProcessingEvent $event) => $event->getJob() === $job)
],
[
Events::AFTER_PROCESSING,
$this->callback(function (ProcessingEvent $event) use ($job) {
return $event->getJob() === $job;
})
$this->callback(fn(ProcessingEvent $event) => $event->getJob() === $job)
],
);
} else { // Symfony 5.0+
@@ -71,15 +63,11 @@ class JobProcessorTest extends TestCase
->method('dispatch')
->withConsecutive(
[
$this->callback(function (ProcessingEvent $event) use ($job) {
return $event->getJob() === $job;
}),
$this->callback(fn(ProcessingEvent $event) => $event->getJob() === $job),
Events::BEFORE_PROCESSING,
],
[
$this->callback(function (ProcessingEvent $event) use ($job) {
return $event->getJob() === $job;
}),
$this->callback(fn(ProcessingEvent $event) => $event->getJob() === $job),
Events::AFTER_PROCESSING,
],
);

View File

@@ -10,8 +10,7 @@ use PHPUnit\Framework\TestCase;
class DataflowTypeRegistryTest extends TestCase
{
/** @var DataflowTypeRegistry */
private $registry;
private \CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry $registry;
protected function setUp(): void
{
@@ -33,7 +32,7 @@ class DataflowTypeRegistryTest extends TestCase
$this->registry->registerDataflowType($type);
$this->assertSame($type, $this->registry->getDataflowType(get_class($type)));
$this->assertSame($type, $this->registry->getDataflowType($type::class));
$this->assertSame($type, $this->registry->getDataflowType($alias1));
$this->assertSame($type, $this->registry->getDataflowType($alias2));
$this->assertContains($type, $this->registry->listDataflowTypes());

View File

@@ -13,14 +13,11 @@ use Symfony\Component\Messenger\MessageBusInterface;
class MessengerDataflowRunnerTest extends TestCase
{
/** @var MessengerDataflowRunner */
private $runner;
private \CodeRhapsodie\DataflowBundle\Runner\MessengerDataflowRunner $runner;
/** @var JobRepository|MockObject */
private $repository;
private \CodeRhapsodie\DataflowBundle\Repository\JobRepository|\PHPUnit\Framework\MockObject\MockObject $repository;
/** @var MessageBusInterface|MockObject */
private $bus;
private \Symfony\Component\Messenger\MessageBusInterface|\PHPUnit\Framework\MockObject\MockObject $bus;
protected function setUp(): void
{
@@ -50,13 +47,9 @@ class MessengerDataflowRunnerTest extends TestCase
->expects($this->exactly(2))
->method('dispatch')
->withConsecutive([
$this->callback(function ($message) use ($id1) {
return $message instanceof JobMessage && $message->getJobId() === $id1;
})
$this->callback(fn($message) => $message instanceof JobMessage && $message->getJobId() === $id1)
], [
$this->callback(function ($message) use ($id2) {
return $message instanceof JobMessage && $message->getJobId() === $id2;
})
$this->callback(fn($message) => $message instanceof JobMessage && $message->getJobId() === $id2)
])
->willReturnOnConsecutiveCalls(
new Envelope(new JobMessage($id1)),

View File

@@ -11,14 +11,11 @@ use PHPUnit\Framework\TestCase;
class PendingDataflowRunnerTest extends TestCase
{
/** @var PendingDataflowRunner */
private $runner;
private \CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner $runner;
/** @var JobRepository|MockObject */
private $repository;
private \CodeRhapsodie\DataflowBundle\Repository\JobRepository|\PHPUnit\Framework\MockObject\MockObject $repository;
/** @var JobProcessorInterface|MockObject */
private $processor;
private \CodeRhapsodie\DataflowBundle\Processor\JobProcessorInterface|\PHPUnit\Framework\MockObject\MockObject $processor;
protected function setUp(): void
{

View File

@@ -41,26 +41,28 @@
}
},
"require": {
"php": "^7.3||^8.0",
"php": "^8.0",
"ext-json": "*",
"doctrine/dbal": "^2.12||^3.0",
"doctrine/doctrine-bundle": "^1.0||^2.0",
"psr/log": "^1.1",
"symfony/config": "^3.4||^4.0||^5.0",
"symfony/console": "^3.4||^4.0||^5.0",
"symfony/dependency-injection": "^3.4||>=4.1.12||^5.0",
"symfony/event-dispatcher": "^3.4||^4.0||^5.0",
"symfony/http-kernel": "^3.4||^4.0||^5.0",
"symfony/lock": "^3.4||^4.0||^5.0",
"symfony/monolog-bridge": "^3.4||^4.0||^5.0",
"symfony/options-resolver": "^3.4||^4.0||^5.0",
"symfony/validator": "^3.4||^4.0||^5.0",
"symfony/yaml": "^3.4||^4.0||^5.0"
"monolog/monolog": "^1.0||^2.0",
"psr/log": "^1.1||^2.0||^3.0",
"symfony/config": "^3.4||^4.0||^5.0||^6.0",
"symfony/console": "^3.4||^4.0||^5.0||^6.0",
"symfony/dependency-injection": "^3.4||>=4.1.12||^5.0||^6.0",
"symfony/event-dispatcher": "^3.4||^4.0||^5.0||^6.0",
"symfony/http-kernel": "^3.4||^4.0||^5.0||^6.0",
"symfony/lock": "^3.4||^4.0||^5.0||^6.0",
"symfony/monolog-bridge": "^3.4||^4.0||^5.0||^6.0",
"symfony/options-resolver": "^3.4||^4.0||^5.0||^6.0",
"symfony/validator": "^3.4||^4.0||^5.0||^6.0",
"symfony/yaml": "^3.4||^4.0||^5.0||^6.0"
},
"require-dev": {
"amphp/amp": "^2.5",
"phpunit/phpunit": "^7||^8||^9",
"symfony/messenger": "^4.4||^5.0"
"rector/rector": "^0.13.10",
"symfony/messenger": "^4.4||^5.0||^6.0"
},
"suggest": {
"amphp/amp": "Provide asynchronous steps for your dataflows",

View File

@@ -1,32 +1,21 @@
<?xml version = '1.0' encoding = 'UTF-8'?>
<?xml version="1.0" encoding="UTF-8"?>
<!-- http://www.phpunit.de/manual/current/en/appendixes.configuration.html -->
<phpunit
backupGlobals="false"
backupStaticAttributes="false"
bootstrap="Tests/bootstrap.php"
convertErrorsToExceptions="true"
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
colors="false"
>
<php>
<ini name="error_reporting" value="-1" />
</php>
<testsuites>
<testsuite name="Dataflow tests suite">
<directory suffix="Test.php">./Tests</directory>
</testsuite>
</testsuites>
<filter>
<whitelist>
<directory>./src/</directory>
<exclude>
<directory>Tests/</directory>
<directory>vendor/</directory>
</exclude>
</whitelist>
</filter>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" backupGlobals="false" backupStaticAttributes="false" bootstrap="Tests/bootstrap.php" convertErrorsToExceptions="true" convertNoticesToExceptions="true" convertWarningsToExceptions="true" colors="false" xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.3/phpunit.xsd">
<coverage>
<include>
<directory>./src/</directory>
</include>
<exclude>
<directory>Tests/</directory>
<directory>vendor/</directory>
</exclude>
</coverage>
<php>
<ini name="error_reporting" value="-1"/>
</php>
<testsuites>
<testsuite name="Dataflow tests suite">
<directory suffix="Test.php">./Tests</directory>
</testsuite>
</testsuites>
</phpunit>

25
rector.php Normal file
View File

@@ -0,0 +1,25 @@
<?php
declare(strict_types=1);
use Rector\CodeQuality\Rector\Class_\InlineConstructorDefaultToPropertyRector;
use Rector\Config\RectorConfig;
use Rector\Set\ValueObject\LevelSetList;
use Rector\Symfony\Set\SymfonySetList;
return static function (RectorConfig $rectorConfig): void {
$rectorConfig->paths([
__DIR__ . '/src',
__DIR__ . '/Tests',
]);
// register a single rule
$rectorConfig->rule(InlineConstructorDefaultToPropertyRector::class);
$rectorConfig->sets([
SymfonySetList::SYMFONY_60,
SymfonySetList::SYMFONY_CODE_QUALITY,
SymfonySetList::SYMFONY_CONSTRUCTOR_INJECTION,
LevelSetList::UP_TO_PHP_80,
]);
};

4
sonar-project.properties Normal file
View File

@@ -0,0 +1,4 @@
sonar.projectKey=code-rhapsodie_dataflow-bundle_AYvYuaAwWE9sbcQmD1vw
sonar.sources=src
sonar.tests=Tests

View File

@@ -9,6 +9,7 @@ use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\BusCompilerPass;
use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\DataflowTypeCompilerPass;
use CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler\DefaultLoggerCompilerPass;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Extension\ExtensionInterface;
use Symfony\Component\HttpKernel\Bundle\Bundle;
/**
@@ -18,7 +19,7 @@ class CodeRhapsodieDataflowBundle extends Bundle
{
protected $name = 'CodeRhapsodieDataflowBundle';
public function getContainerExtension()
public function getContainerExtension(): ?ExtensionInterface
{
return new CodeRhapsodieDataflowExtension();
}

View File

@@ -22,24 +22,9 @@ class AddScheduledDataflowCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:add';
/** @var DataflowTypeRegistryInterface */
private $registry;
/** @var ScheduledDataflowRepository */
private $scheduledDataflowRepository;
/** @var ValidatorInterface */
private $validator;
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(DataflowTypeRegistryInterface $registry, ScheduledDataflowRepository $scheduledDataflowRepository, ValidatorInterface $validator, ConnectionFactory $connectionFactory)
public function __construct(private DataflowTypeRegistryInterface $registry, private ScheduledDataflowRepository $scheduledDataflowRepository, private ValidatorInterface $validator, private ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->registry = $registry;
$this->scheduledDataflowRepository = $scheduledDataflowRepository;
$this->validator = $validator;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -63,7 +48,7 @@ class AddScheduledDataflowCommand extends Command
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
@@ -108,7 +93,7 @@ class AddScheduledDataflowCommand extends Command
'id' => null,
'label' => $label,
'dataflow_type' => $type,
'options' => json_decode($options, true),
'options' => json_decode($options, true, 512, JSON_THROW_ON_ERROR),
'frequency' => $frequency,
'next' => new \DateTimeImmutable($firstRun),
'enabled' => $enabled,

View File

@@ -21,18 +21,9 @@ class ChangeScheduleStatusCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:change-status';
/** @var ScheduledDataflowRepository */
private $scheduledDataflowRepository;
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(ScheduledDataflowRepository $scheduledDataflowRepository, ConnectionFactory $connectionFactory)
public function __construct(private ScheduledDataflowRepository $scheduledDataflowRepository, private ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->scheduledDataflowRepository = $scheduledDataflowRepository;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -52,7 +43,7 @@ class ChangeScheduleStatusCommand extends Command
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));

View File

@@ -26,18 +26,9 @@ class ExecuteDataflowCommand extends Command implements LoggerAwareInterface
protected static $defaultName = 'code-rhapsodie:dataflow:execute';
/** @var DataflowTypeRegistryInterface */
private $registry;
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(DataflowTypeRegistryInterface $registry, ConnectionFactory $connectionFactory)
public function __construct(private DataflowTypeRegistryInterface $registry, private ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->registry = $registry;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -61,13 +52,13 @@ EOF
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
}
$fqcnOrAlias = $input->getArgument('fqcn');
$options = json_decode($input->getArgument('options'), true);
$options = json_decode($input->getArgument('options'), true, 512, JSON_THROW_ON_ERROR);
$io = new SymfonyStyle($input, $output);
$dataflowType = $this->registry->getDataflowType($fqcnOrAlias);

View File

@@ -26,18 +26,9 @@ class JobShowCommand extends Command
protected static $defaultName = 'code-rhapsodie:dataflow:job:show';
/** @var JobRepository */
private $jobRepository;
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(JobRepository $jobRepository, ConnectionFactory $connectionFactory)
public function __construct(private JobRepository $jobRepository, private ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->jobRepository = $jobRepository;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -57,7 +48,7 @@ class JobShowCommand extends Command
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));
@@ -97,21 +88,19 @@ class JobShowCommand extends Command
['Started at', $job->getStartTime() ? $job->getStartTime()->format('Y-m-d H:i:s') : '-'],
['Ended at', $job->getEndTime() ? $job->getEndTime()->format('Y-m-d H:i:s') : '-'],
['Object number', $job->getCount()],
['Errors', count($job->getExceptions())],
['Errors', count((array) $job->getExceptions())],
['Status', $this->translateStatus($job->getStatus())],
];
if ($input->getOption('details')) {
$display[] = ['Type', $job->getDataflowType()];
$display[] = ['Options', json_encode($job->getOptions())];
$display[] = ['Options', json_encode($job->getOptions(), JSON_THROW_ON_ERROR)];
$io->section('Summary');
}
$io->table(['Field', 'Value'], $display);
if ($input->getOption('details')) {
$io->section('Exceptions');
$exceptions = array_map(function (string $exception) {
return substr($exception, 0, 900).'…';
}, $job->getExceptions());
$exceptions = array_map(fn(string $exception) => substr($exception, 0, 900).'…', $job->getExceptions());
$io->write($exceptions);
}

View File

@@ -24,22 +24,9 @@ class RunPendingDataflowsCommand extends Command
protected static $defaultName = 'code-rhapsodie:dataflow:run-pending';
/** @var ScheduledDataflowManagerInterface */
private $manager;
/** @var PendingDataflowRunnerInterface */
private $runner;
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(ScheduledDataflowManagerInterface $manager, PendingDataflowRunnerInterface $runner, ConnectionFactory $connectionFactory)
public function __construct(private ScheduledDataflowManagerInterface $manager, private PendingDataflowRunnerInterface $runner, private ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->manager = $manager;
$this->runner = $runner;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -59,7 +46,7 @@ EOF
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (!$this->lock()) {
$output->writeln('The command is already running in another process.');

View File

@@ -19,18 +19,9 @@ class ScheduleListCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:schedule:list';
/** @var ScheduledDataflowRepository */
private $scheduledDataflowRepository;
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(ScheduledDataflowRepository $scheduledDataflowRepository, ConnectionFactory $connectionFactory)
public function __construct(private ScheduledDataflowRepository $scheduledDataflowRepository, private ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->scheduledDataflowRepository = $scheduledDataflowRepository;
$this->connectionFactory = $connectionFactory;
}
/**
@@ -47,7 +38,7 @@ class ScheduleListCommand extends Command
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));

View File

@@ -23,14 +23,9 @@ class SchemaCommand extends Command
{
protected static $defaultName = 'code-rhapsodie:dataflow:dump-schema';
/** @var ConnectionFactory */
private $connectionFactory;
public function __construct(ConnectionFactory $connectionFactory)
public function __construct(private ConnectionFactory $connectionFactory)
{
parent::__construct();
$this->connectionFactory = $connectionFactory;
}
/**
@@ -49,7 +44,7 @@ class SchemaCommand extends Command
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (null !== $input->getOption('connection')) {
$this->connectionFactory->setConnectionName($input->getOption('connection'));

View File

@@ -11,29 +11,18 @@ use Symfony\Component\OptionsResolver\OptionsResolver;
class AMPAsyncDataflowBuilder extends DataflowBuilder
{
/** @var int */
protected $loopInterval;
/** @var int */
protected $emitInterval;
public function __construct(?int $loopInterval = 0, ?int $emitInterval = 0)
public function __construct(protected ?int $loopInterval = 0, protected ?int $emitInterval = 0)
{
$this->loopInterval = $loopInterval;
$this->emitInterval = $emitInterval;
}
/** @var string */
private $name;
private ?string $name = null;
/** @var iterable */
private $reader;
private ?iterable $reader = null;
/** @var array */
private $steps = [];
private array $steps = [];
/** @var WriterInterface[] */
private $writers = [];
private array $writers = [];
public function setName(string $name): self
{

View File

@@ -22,41 +22,18 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
/** @var string */
private $name;
/** @var iterable */
private $reader;
/** @var callable[] */
private $steps;
private array $steps = [];
/** @var WriterInterface[] */
private $writers;
private array $writers = [];
/** @var int */
private $loopInterval;
private array $states = [];
/** @var int */
private $emitInterval;
private array $stepsJobs = [];
/** @var array */
private $states;
/** @var array */
private $stepsJobs;
public function __construct(iterable $reader, ?string $name, ?int $loopInterval = 0, ?int $emitInterval = 0)
public function __construct(private iterable $reader, private ?string $name, private ?int $loopInterval = 0, private ?int $emitInterval = 0)
{
$this->reader = $reader;
$this->name = $name;
$this->steps = [];
$this->writers = [];
$this->loopInterval = $loopInterval;
$this->emitInterval = $emitInterval;
$this->states = [];
$this->stepsJobs = [];
if (!function_exists('Amp\\Promise\\wait')) {
throw new RuntimeException('Amp is not loaded. Suggest install it with composer require amphp/amp');
}
@@ -137,11 +114,10 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
}
/**
* @param mixed $state
* @param int $count internal count reference
* @param array $exceptions internal exceptions
*/
private function processState($state, int &$count, array &$exceptions): void
private function processState(mixed $state, int &$count, array &$exceptions): void
{
[$readIndex, $stepIndex, $item] = $state;
if ($stepIndex < count($this->steps)) {
@@ -149,7 +125,7 @@ class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
$this->stepsJobs[$stepIndex] = [];
}
[$step, $scale] = $this->steps[$stepIndex];
if (count($this->stepsJobs[$stepIndex]) < $scale && !isset($this->stepsJobs[$stepIndex][$readIndex])) {
if ((is_countable($this->stepsJobs[$stepIndex]) ? count($this->stepsJobs[$stepIndex]) : 0) < $scale && !isset($this->stepsJobs[$stepIndex][$readIndex])) {
$this->stepsJobs[$stepIndex][$readIndex] = true;
/** @var Promise<void> $promise */
$promise = coroutine($step)($item);

View File

@@ -13,24 +13,16 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
/** @var string */
private $name;
/** @var iterable */
private $reader;
/** @var callable[] */
private $steps;
private array $steps = [];
/** @var WriterInterface[] */
private $writers;
private array $writers = [];
public function __construct(iterable $reader, ?string $name)
private ?\Closure $customExceptionIndex = null;
public function __construct(private iterable $reader, private ?string $name)
{
$this->reader = $reader;
$this->name = $name;
$this->steps = [];
$this->writers = [];
}
/**
@@ -53,6 +45,16 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
return $this;
}
/**
* @return $this
*/
public function setCustomExceptionIndex(callable $callable): self
{
$this->customExceptionIndex = \Closure::fromCallable($callable);
return $this;
}
/**
* {@inheritdoc}
*/
@@ -71,8 +73,17 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
try {
$this->processItem($item);
} catch (\Throwable $e) {
$exceptions[$index] = $e;
$this->logException($e, (string) $index);
$exceptionIndex = $index;
try {
if (is_callable($this->customExceptionIndex)) {
$exceptionIndex = (string) ($this->customExceptionIndex)($item, $index);
}
} catch (\Throwable $e2) {
$exceptions[$index] = $e2;
$this->logException($e2, $index);
}
$exceptions[$exceptionIndex] = $e;
$this->logException($e, $exceptionIndex);
}
++$count;
@@ -89,10 +100,7 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
return new Result($this->name, $startTime, new \DateTimeImmutable(), $count, $exceptions);
}
/**
* @param mixed $item
*/
private function processItem($item): void
private function processItem(mixed $item): void
{
foreach ($this->steps as $step) {
$item = call_user_func($step, $item);
@@ -107,7 +115,7 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
}
}
private function logException(\Throwable $e, ?string $index = null): void
private function logException(\Throwable $e, string|int|null $index = null): void
{
if (!isset($this->logger)) {
return;

View File

@@ -10,17 +10,16 @@ use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
class DataflowBuilder
{
/** @var string */
private $name;
private ?string $name = null;
/** @var iterable */
private $reader;
private ?iterable $reader = null;
/** @var array */
private $steps = [];
private array $steps = [];
/** @var WriterInterface[] */
private $writers = [];
private array $writers = [];
private ?\Closure $customExceptionIndex = null;
public function setName(string $name): self
{
@@ -50,6 +49,13 @@ class DataflowBuilder
return $this;
}
public function setCustomExceptionIndex(callable $callable): self
{
$this->customExceptionIndex = \Closure::fromCallable($callable);
return $this;
}
public function getDataflow(): DataflowInterface
{
$dataflow = new Dataflow($this->reader, $this->name);
@@ -65,6 +71,10 @@ class DataflowBuilder
$dataflow->addWriter($writer);
}
if (is_callable($this->customExceptionIndex)) {
$dataflow->setCustomExceptionIndex($this->customExceptionIndex);
}
return $dataflow;
}
}

View File

@@ -9,39 +9,19 @@ namespace CodeRhapsodie\DataflowBundle\DataflowType;
*/
class Result
{
/** @var string */
private $name;
private \DateInterval $elapsed;
/** @var \DateTimeInterface */
private $startTime;
private int $errorCount = 0;
/** @var \DateTimeInterface */
private $endTime;
private int $successCount = 0;
/** @var \DateInterval */
private $elapsed;
private array $exceptions;
/** @var int */
private $errorCount = 0;
/** @var int */
private $successCount = 0;
/** @var int */
private $totalProcessedCount = 0;
/** @var array */
private $exceptions;
public function __construct(string $name, \DateTimeInterface $startTime, \DateTimeInterface $endTime, int $totalCount, array $exceptions)
public function __construct(private string $name, private \DateTimeInterface $startTime, private \DateTimeInterface $endTime, private int $totalProcessedCount, array $exceptions)
{
$this->name = $name;
$this->startTime = $startTime;
$this->endTime = $endTime;
$this->elapsed = $startTime->diff($endTime);
$this->totalProcessedCount = $totalCount;
$this->errorCount = count($exceptions);
$this->successCount = $totalCount - $this->errorCount;
$this->successCount = $totalProcessedCount - $this->errorCount;
$this->exceptions = $exceptions;
}

View File

@@ -11,15 +11,11 @@ use CodeRhapsodie\DataflowBundle\Exceptions\UnsupportedItemTypeException;
*/
class CollectionWriter implements DelegateWriterInterface
{
/** @var WriterInterface */
private $writer;
/**
* CollectionWriter constructor.
*/
public function __construct(WriterInterface $writer)
public function __construct(private WriterInterface $writer)
{
$this->writer = $writer;
}
/**
@@ -36,7 +32,7 @@ class CollectionWriter implements DelegateWriterInterface
public function write($collection)
{
if (!is_iterable($collection)) {
throw new UnsupportedItemTypeException(sprintf('Item to write was expected to be an iterable, received %s.', is_object($collection) ? get_class($collection) : gettype($collection)));
throw new UnsupportedItemTypeException(sprintf('Item to write was expected to be an iterable, received %s.', get_debug_type($collection)));
}
foreach ($collection as $item) {

View File

@@ -12,14 +12,13 @@ use CodeRhapsodie\DataflowBundle\Exceptions\UnsupportedItemTypeException;
class DelegatorWriter implements DelegateWriterInterface
{
/** @var DelegateWriterInterface[] */
private $delegates;
private array $delegates = [];
/**
* DelegatorWriter constructor.
*/
public function __construct()
{
$this->delegates = [];
}
/**
@@ -47,7 +46,7 @@ class DelegatorWriter implements DelegateWriterInterface
return;
}
throw new UnsupportedItemTypeException(sprintf('None of the registered delegate writers support the received item of type %s', is_object($item) ? get_class($item) : gettype($item)));
throw new UnsupportedItemTypeException(sprintf('None of the registered delegate writers support the received item of type %s', get_debug_type($item)));
}
/**

View File

@@ -6,12 +6,8 @@ namespace CodeRhapsodie\DataflowBundle\DataflowType\Writer;
class PortWriterAdapter implements WriterInterface
{
/** @var \Port\Writer */
private $writer;
public function __construct(\Port\Writer $writer)
public function __construct(private \Port\Writer $writer)
{
$this->writer = $writer;
}
public function prepare()

View File

@@ -16,10 +16,8 @@ interface WriterInterface
/**
* Write an item.
*
* @param mixed $item
*/
public function write($item);
public function write(mixed $item);
/**
* Called after the dataflow is processed.

View File

@@ -10,7 +10,7 @@ use Symfony\Component\Messenger\MessageBusInterface;
class Configuration implements ConfigurationInterface
{
public function getConfigTreeBuilder()
public function getConfigTreeBuilder(): \Symfony\Component\Config\Definition\Builder\TreeBuilder
{
$treeBuilder = new TreeBuilder('code_rhapsodie_dataflow');
if (method_exists($treeBuilder, 'getRootNode')) {
@@ -39,7 +39,7 @@ class Configuration implements ConfigurationInterface
->end()
->end()
->validate()
->ifTrue(static function ($v): bool { return $v['enabled'] && !interface_exists(MessageBusInterface::class); })
->ifTrue(static fn($v): bool => $v['enabled'] && !interface_exists(MessageBusInterface::class))
->thenInvalid('You need "symfony/messenger" in order to use Dataflow messenger mode.')
->end()
->end()

View File

@@ -13,10 +13,10 @@ use Symfony\Component\Validator\Constraints as Asserts;
*/
class Job
{
const STATUS_PENDING = 0;
const STATUS_RUNNING = 1;
const STATUS_COMPLETED = 2;
const STATUS_QUEUED = 3;
public const STATUS_PENDING = 0;
public const STATUS_RUNNING = 1;
public const STATUS_COMPLETED = 2;
public const STATUS_QUEUED = 3;
private const KEYS = [
'id',
@@ -32,74 +32,35 @@ class Job
'end_time',
];
/**
* @var int|null
*/
private $id;
private ?int $id = null;
/**
* @var int
*
* @Asserts\Range(min=0, max=2)
*/
private $status;
#[Asserts\Range(min: 0, max: 2)]
private int $status = self::STATUS_PENDING;
/**
* @var string|null
*
* @Asserts\NotBlank()
* @Asserts\Length(min=1, max=255)
* @Asserts\Regex("#^[[:alnum:] ]+\z#u")
*/
private $label;
#[Asserts\NotBlank]
#[Asserts\Length(min: 1, max: 255)]
#[Asserts\Regex('#^[[:alnum:] ]+\z#u')]
private ?string $label = null;
/**
* @var string|null
*
* @Asserts\NotBlank()
* @Asserts\Length(min=1, max=255)
* @Asserts\Regex("#^[[:alnum:]\\]+\z#u")
*/
private $dataflowType;
#[Asserts\NotBlank]
#[Asserts\Length(min: 1, max: 255)]
#[Asserts\Regex('#^[[:alnum:]\\\]+\z#u')]
private ?string $dataflowType = null;
/**
* @var array|null
*/
private $options;
private ?array $options = null;
/**
* @var \DateTimeInterface|null
*/
private $requestedDate;
private ?\DateTimeInterface $requestedDate = null;
/**
* @var int|null
*/
private $scheduledDataflowId;
private ?int $scheduledDataflowId = null;
/**
* @var int|null
*/
private $count;
private ?int $count = 0;
/**
* @var array|null
*/
private $exceptions;
private ?array $exceptions = null;
/**
* @var \DateTimeInterface|null
*/
private $startTime;
private ?\DateTimeInterface $startTime = null;
/**
* @var \DateTimeInterface|null
*/
private $endTime;
private ?\DateTimeInterface $endTime = null;
/**
* @return Job
*/
public static function createFromScheduledDataflow(ScheduledDataflow $scheduled): self
{
return (new static())
@@ -113,8 +74,6 @@ class Job
public function __construct()
{
$this->count = 0;
$this->status = static::STATUS_PENDING;
}
public static function createFromArray(array $datas)

View File

@@ -14,7 +14,7 @@ use Symfony\Component\Validator\Constraints as Asserts;
*/
class ScheduledDataflow
{
const AVAILABLE_FREQUENCIES = [
public const AVAILABLE_FREQUENCIES = [
'1 hour',
'1 day',
'1 week',
@@ -23,51 +23,29 @@ class ScheduledDataflow
private const KEYS = ['id', 'label', 'dataflow_type', 'options', 'frequency', 'next', 'enabled'];
/**
* @var int|null
*/
private $id;
private ?int $id = null;
#[Asserts\NotBlank]
#[Asserts\Length(min: 1, max: 255)]
#[Asserts\Regex('#^[[:alnum:] ]+\z#u')]
private ?string $label = null;
#[Asserts\NotBlank]
#[Asserts\Length(min: 1, max: 255)]
#[Asserts\Regex('#^[[:alnum:]\\\]+\z#u')]
private ?string $dataflowType = null;
private ?array $options = null;
/**
* @var string|null
*
* @Asserts\NotBlank()
* @Asserts\Length(min=1, max=255)
* @Asserts\Regex("#^[[:alnum:] ]+\z#u")
*/
private $label;
/**
* @var string|null
*
* @Asserts\NotBlank()
* @Asserts\Length(min=1, max=255)
* @Asserts\Regex("#^[[:alnum:]\\]+\z#u")
*/
private $dataflowType;
/**
* @var array|null
*/
private $options;
/**
* @var string|null
*
* @Asserts\NotBlank()
* @Frequency()
*/
private $frequency;
#[Asserts\NotBlank]
private ?string $frequency = null;
/**
* @var \DateTimeInterface|null
*/
private $next;
private ?\DateTimeInterface $next = null;
/**
* @var bool|null
*/
private $enabled;
private ?bool $enabled = null;
public static function createFromArray(array $datas)
{

View File

@@ -7,7 +7,7 @@ namespace CodeRhapsodie\DataflowBundle\Event;
/*
* @codeCoverageIgnore
*/
if (class_exists('Symfony\Contracts\EventDispatcher\Event')) {
if (class_exists(\Symfony\Contracts\EventDispatcher\Event::class)) {
// For Symfony 5.0+
abstract class CrEvent extends \Symfony\Contracts\EventDispatcher\Event
{

View File

@@ -6,6 +6,6 @@ namespace CodeRhapsodie\DataflowBundle\Event;
final class Events
{
const BEFORE_PROCESSING = 'coderhapsodie.dataflow.before_processing';
const AFTER_PROCESSING = 'coderhapsodie.dataflow.after_processing';
public const BEFORE_PROCESSING = 'coderhapsodie.dataflow.before_processing';
public const AFTER_PROCESSING = 'coderhapsodie.dataflow.after_processing';
}

View File

@@ -13,15 +13,11 @@ use CodeRhapsodie\DataflowBundle\Entity\Job;
*/
class ProcessingEvent extends CrEvent
{
/** @var Job */
private $job;
/**
* ProcessingEvent constructor.
*/
public function __construct(Job $job)
public function __construct(private Job $job)
{
$this->job = $job;
}
public function getJob(): Job

View File

@@ -13,14 +13,8 @@ use Symfony\Component\DependencyInjection\Container;
*/
class ConnectionFactory
{
private $connectionName;
private $container;
public function __construct(Container $container, string $connectionName)
public function __construct(private Container $container, private string $connectionName)
{
$this->connectionName = $connectionName;
$this->container = $container;
}
public function setConnectionName(string $connectionName)
@@ -28,7 +22,7 @@ class ConnectionFactory
$this->connectionName = $connectionName;
}
public function getConnection(): \Doctrine\DBAL\Driver\Connection
public function getConnection(): \Doctrine\DBAL\Connection
{
return $this->container->get(sprintf('doctrine.dbal.%s_connection', $this->connectionName));
}

View File

@@ -13,13 +13,11 @@ class BufferHandler extends AbstractProcessingHandler
{
private const FORMAT = "[%datetime%] %level_name% when processing item %context.index%: %message% %context% %extra%\n";
private $buffer;
private array $buffer = [];
public function __construct($level = Logger::DEBUG, bool $bubble = true)
{
parent::__construct($level, $bubble);
$this->buffer = [];
}
public function clearBuffer(): array

View File

@@ -10,20 +10,20 @@ use Psr\Log\LoggerInterface;
final class DelegatingLogger extends AbstractLogger
{
/** @var LoggerInterface[] */
private $loggers;
private ?array $loggers = null;
public function __construct(iterable $loggers)
{
foreach ($loggers as $logger) {
if (!$logger instanceof LoggerInterface) {
throw new \InvalidArgumentException(sprintf('Only instances of %s should be passed to the constructor of %s. An instance of %s was passed instead.', LoggerInterface::class, self::class, get_class($logger)));
throw new \InvalidArgumentException(sprintf('Only instances of %s should be passed to the constructor of %s. An instance of %s was passed instead.', LoggerInterface::class, self::class, $logger::class));
}
$this->loggers[] = $logger;
}
}
public function log($level, $message, array $context = [])
public function log($level, $message, array $context = []): void
{
foreach ($this->loggers as $logger) {
$logger->log($level, $message, $context);

View File

@@ -8,27 +8,15 @@ use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository;
use Doctrine\DBAL\Driver\Connection;
use Doctrine\DBAL\Connection;
/**
* Handles scheduled dataflows execution dates based on their frequency.
*/
class ScheduledDataflowManager implements ScheduledDataflowManagerInterface
{
/** @var ScheduledDataflowRepository */
private $scheduledDataflowRepository;
/** @var JobRepository */
private $jobRepository;
/** @var Connection */
private $connection;
public function __construct(Connection $connection, ScheduledDataflowRepository $scheduledDataflowRepository, JobRepository $jobRepository)
public function __construct(private Connection $connection, private ScheduledDataflowRepository $scheduledDataflowRepository, private JobRepository $jobRepository)
{
$this->connection = $connection;
$this->scheduledDataflowRepository = $scheduledDataflowRepository;
$this->jobRepository = $jobRepository;
}
/**

View File

@@ -6,12 +6,8 @@ namespace CodeRhapsodie\DataflowBundle\MessengerMode;
class JobMessage
{
/** @var int */
private $jobId;
public function __construct(int $jobId)
public function __construct(private int $jobId)
{
$this->jobId = $jobId;
}
public function getJobId(): int

View File

@@ -10,16 +10,8 @@ use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
class JobMessageHandler implements MessageSubscriberInterface
{
/** @var JobRepository */
private $repository;
/** @var JobProcessorInterface */
private $processor;
public function __construct(JobRepository $repository, JobProcessorInterface $processor)
public function __construct(private JobRepository $repository, private JobProcessorInterface $processor)
{
$this->repository = $repository;
$this->processor = $processor;
}
public function __invoke(JobMessage $message)

View File

@@ -21,20 +21,8 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
{
use LoggerAwareTrait;
/** @var JobRepository */
private $repository;
/** @var DataflowTypeRegistryInterface */
private $registry;
/** @var EventDispatcherInterface */
private $dispatcher;
public function __construct(JobRepository $repository, DataflowTypeRegistryInterface $registry, EventDispatcherInterface $dispatcher)
public function __construct(private JobRepository $repository, private DataflowTypeRegistryInterface $registry, private EventDispatcherInterface $dispatcher)
{
$this->repository = $repository;
$this->registry = $registry;
$this->dispatcher = $dispatcher;
}
public function process(Job $job): void
@@ -66,7 +54,7 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
private function beforeProcessing(Job $job): void
{
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
if (!class_exists(\Symfony\Contracts\EventDispatcher\Event::class)) {
$this->dispatcher->dispatch(Events::BEFORE_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::BEFORE_PROCESSING);
@@ -90,7 +78,7 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
$this->repository->save($job);
// Symfony 3.4 to 4.4 call
if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
if (!class_exists(\Symfony\Contracts\EventDispatcher\Event::class)) {
$this->dispatcher->dispatch(Events::AFTER_PROCESSING, new ProcessingEvent($job));
} else { // Symfony 5.0+ call
$this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING);

View File

@@ -13,10 +13,10 @@ use CodeRhapsodie\DataflowBundle\Exceptions\UnknownDataflowTypeException;
class DataflowTypeRegistry implements DataflowTypeRegistryInterface
{
/** @var array|DataflowTypeInterface[] */
private $fqcnRegistry = [];
private array $fqcnRegistry = [];
/** @var array|DataflowTypeInterface[] */
private $aliasesRegistry = [];
private array $aliasesRegistry = [];
/**
* {@inheritdoc}
@@ -31,7 +31,7 @@ class DataflowTypeRegistry implements DataflowTypeRegistryInterface
return $this->aliasesRegistry[$fqcnOrAlias];
}
throw UnknownDataflowTypeException::create($fqcnOrAlias, array_merge(array_keys($this->fqcnRegistry), array_keys($this->aliasesRegistry)));
throw UnknownDataflowTypeException::create($fqcnOrAlias, [...array_keys($this->fqcnRegistry), ...array_keys($this->aliasesRegistry)]);
}
/**
@@ -47,7 +47,7 @@ class DataflowTypeRegistry implements DataflowTypeRegistryInterface
*/
public function registerDataflowType(DataflowTypeInterface $dataflowType): void
{
$this->fqcnRegistry[get_class($dataflowType)] = $dataflowType;
$this->fqcnRegistry[$dataflowType::class] = $dataflowType;
foreach ($dataflowType->getAliases() as $alias) {
$this->aliasesRegistry[$alias] = $dataflowType;
}

View File

@@ -38,7 +38,7 @@ trait InitFromDbTrait
return [];
}
$array = json_decode($value, true);
$array = json_decode($value, true, 512, JSON_THROW_ON_ERROR);
return (false === $array) ? [] : $array;
}

View File

@@ -6,7 +6,7 @@ namespace CodeRhapsodie\DataflowBundle\Repository;
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use Doctrine\DBAL\Driver\Connection;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\ParameterType;
use Doctrine\DBAL\Query\QueryBuilder;
@@ -35,17 +35,11 @@ class JobRepository
'end_time' => 'datetime',
];
/**
* @var \Doctrine\DBAL\Connection
*/
private $connection;
/**
* JobRepository constructor.
*/
public function __construct(Connection $connection)
public function __construct(private Connection $connection)
{
$this->connection = $connection;
}
public function find(int $jobId)
@@ -142,10 +136,10 @@ class JobRepository
unset($datas['id']);
if (is_array($datas['options'])) {
$datas['options'] = json_encode($datas['options']);
$datas['options'] = json_encode($datas['options'], JSON_THROW_ON_ERROR);
}
if (is_array($datas['exceptions'])) {
$datas['exceptions'] = json_encode($datas['exceptions']);
$datas['exceptions'] = json_encode($datas['exceptions'], JSON_THROW_ON_ERROR);
}
if (null === $job->getId()) {

View File

@@ -5,7 +5,7 @@ declare(strict_types=1);
namespace CodeRhapsodie\DataflowBundle\Repository;
use CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow;
use Doctrine\DBAL\Driver\Connection;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\ParameterType;
use Doctrine\DBAL\Query\QueryBuilder;
@@ -29,17 +29,12 @@ class ScheduledDataflowRepository
'next' => 'datetime',
'enabled' => ParameterType::BOOLEAN,
];
/**
* @var \Doctrine\DBAL\Connection
*/
private $connection;
/**
* JobRepository constructor.
*/
public function __construct(Connection $connection)
public function __construct(private Connection $connection)
{
$this->connection = $connection;
}
/**
@@ -84,7 +79,7 @@ class ScheduledDataflowRepository
return [];
}
while (false !== ($row = $stmt->fetchAssociative())) {
yield ScheduledDataflow::createFromArray($this->initDateTime($this->initOptions($row)));
yield ScheduledDataflow::createFromArray($this->initDateTime($this->initArray($row)));
}
}
@@ -106,7 +101,7 @@ class ScheduledDataflowRepository
unset($datas['id']);
if (is_array($datas['options'])) {
$datas['options'] = json_encode($datas['options']);
$datas['options'] = json_encode($datas['options'], JSON_THROW_ON_ERROR);
}
if (null === $scheduledDataflow->getId()) {

View File

@@ -11,16 +11,8 @@ use Symfony\Component\Messenger\MessageBusInterface;
class MessengerDataflowRunner implements PendingDataflowRunnerInterface
{
/** @var JobRepository */
private $repository;
/** @var MessageBusInterface */
private $bus;
public function __construct(JobRepository $repository, MessageBusInterface $bus)
public function __construct(private JobRepository $repository, private MessageBusInterface $bus)
{
$this->repository = $repository;
$this->bus = $bus;
}
public function runPendingDataflows(): void

View File

@@ -9,16 +9,8 @@ use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
class PendingDataflowRunner implements PendingDataflowRunnerInterface
{
/** @var JobRepository */
private $repository;
/** @var JobProcessorInterface */
private $processor;
public function __construct(JobRepository $repository, JobProcessorInterface $processor)
public function __construct(private JobRepository $repository, private JobProcessorInterface $processor)
{
$this->repository = $repository;
$this->processor = $processor;
}
/**

View File

@@ -48,6 +48,7 @@ class DataflowSchemaProvider
$tableSchedule->addColumn('enabled', 'boolean', ['notnull' => true]);
$tableJob->addForeignKeyConstraint($tableSchedule, ['scheduled_dataflow_id'], ['id']);
$tableJob->addIndex(['status'], 'idx_status');
return $schema;
}