3 Commits

8 changed files with 48 additions and 93 deletions

View File

@@ -1,3 +0,0 @@
service_name : travis-ci
coverage_clover: var/build/clover.xml
json_path : var/build/upload.json

View File

@@ -1,74 +0,0 @@
language: php
sudo: false
cache:
directories:
- $HOME/.composer/cache
branches:
only:
- master
- /^\d+\.\d+$/
- travis-setup
env:
global:
- SYMFONY_DEPRECATIONS_HELPER="max[self]=0"
- PHPUNIT_FLAGS="-v"
- PHPUNIT_ENABLED="true"
- STABILITY=stable
- COVERALLS_ENABLED="false"
matrix:
fast_finish: true
include:
- php: '8.0'
env:
- SYMFONY_VERSION=3.4.*
- php: '8.0'
env:
- SYMFONY_VERSION=4.4.*
- php: '8.0'
env:
- SYMFONY_VERSION=5.4.*
- php: '8.0'
env:
- SYMFONY_VERSION=6.0.*
- php: '8.1'
env:
- SYMFONY_VERSION=3.4.*
- php: '8.1'
env:
- SYMFONY_VERSION=4.4.*
- php: '8.1'
env:
- SYMFONY_VERSION=5.4.*
- php: '8.1'
env:
- SYMFONY_VERSION=6.0.*
before_install:
- if [[ "$SYMFONY_VERSION" != "" ]]; then
travis_retry composer global require "symfony/flex:^1.4";
composer config extra.symfony.require $SYMFONY_VERSION;
fi
- if [[ "$STABILITY" != "stable" ]]; then
travis_retry composer config minimum-stability $STABILITY;
fi
- if [[ "$COVERALLS_ENABLED" != "true" ]]; then
phpenv config-rm xdebug.ini || true;
fi
- if [[ "$COVERALLS_ENABLED" == "true" ]]; then
travis_retry composer require --dev php-coveralls/php-coveralls:^2.0 --no-update $COMPOSER_FLAGS;
fi
install:
- travis_retry composer update --prefer-dist --no-interaction --no-progress --ansi $COMPOSER_FLAGS
script: ./vendor/bin/phpunit $PHPUNIT_FLAGS
after_success:
- if [[ "$PHPUNIT_ENABLED" == "true" && "$COVERALLS_ENABLED" == "true" ]]; then
./vendor/bin/php-coveralls -vvv --config .coveralls.yml;
fi;

View File

@@ -1,3 +1,7 @@
# Version 4.1.0
* Added custom index for exception log
# Version 4.0.0
* Added Symfony 6 support

View File

@@ -3,10 +3,6 @@
DataflowBundle is a bundle for Symfony 3.4+
providing an easy way to create import / export dataflow.
[![Build Status](https://travis-ci.com/code-rhapsodie/dataflow-bundle.svg?branch=master)](https://travis-ci.com/code-rhapsodie/dataflow-bundle)
[![Coverage Status](https://coveralls.io/repos/github/code-rhapsodie/dataflow-bundle/badge.svg)](https://coveralls.io/github/code-rhapsodie/dataflow-bundle)
Dataflow uses a linear generic workflow in three parts:
* one reader
* any number of steps that can be synchronous or asynchronous
@@ -36,8 +32,6 @@ As the following schema shows, you can define more than one dataflow:
Security notice: Symfony 4.x is not supported before 4.1.12, see https://github.com/advisories/GHSA-pgwj-prpq-jpc2
And basically, every allowed-to-failed jobs in our travis configuration are not fully supported.
### Add the dependency
To install this bundle, run this command :

View File

@@ -19,6 +19,8 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
/** @var WriterInterface[] */
private array $writers = [];
private ?\Closure $customExceptionIndex = null;
public function __construct(private iterable $reader, private ?string $name)
{
}
@@ -43,6 +45,16 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
return $this;
}
/**
* @return $this
*/
public function setCustomExceptionIndex(callable $callable): self
{
$this->customExceptionIndex = \Closure::fromCallable($callable);
return $this;
}
/**
* {@inheritdoc}
*/
@@ -61,8 +73,17 @@ class Dataflow implements DataflowInterface, LoggerAwareInterface
try {
$this->processItem($item);
} catch (\Throwable $e) {
$exceptions[$index] = $e;
$this->logException($e, (string) $index);
$exceptionIndex = $index;
try {
if (is_callable($this->customExceptionIndex)) {
$exceptionIndex = (string) ($this->customExceptionIndex)($item, $index);
}
} catch (\Throwable $e2) {
$exceptions[$index] = $e2;
$this->logException($e2, $index);
}
$exceptions[$exceptionIndex] = $e;
$this->logException($e, $exceptionIndex);
}
++$count;

View File

@@ -19,6 +19,8 @@ class DataflowBuilder
/** @var WriterInterface[] */
private array $writers = [];
private ?\Closure $customExceptionIndex = null;
public function setName(string $name): self
{
$this->name = $name;
@@ -47,6 +49,13 @@ class DataflowBuilder
return $this;
}
public function setCustomExceptionIndex(callable $callable): self
{
$this->customExceptionIndex = \Closure::fromCallable($callable);
return $this;
}
public function getDataflow(): DataflowInterface
{
$dataflow = new Dataflow($this->reader, $this->name);
@@ -62,6 +71,10 @@ class DataflowBuilder
$dataflow->addWriter($writer);
}
if (is_callable($this->customExceptionIndex)) {
$dataflow->setCustomExceptionIndex($this->customExceptionIndex);
}
return $dataflow;
}
}

View File

@@ -58,7 +58,7 @@ class JobRepository
$qb
->andWhere($qb->expr()->isNull('scheduled_dataflow_id'))
->andWhere($qb->expr()->eq('status', $qb->createNamedParameter(Job::STATUS_PENDING, ParameterType::INTEGER)));
$stmt = $qb->executeQuery();
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
@@ -106,7 +106,7 @@ class JobRepository
$qb
->orderBy('requested_date', 'DESC')
->setMaxResults(20);
$stmt = $qb->executeQuery();
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
@@ -121,7 +121,7 @@ class JobRepository
$qb->andWhere($qb->expr()->eq('scheduled_dataflow_id', $qb->createNamedParameter($id, ParameterType::INTEGER)))
->orderBy('requested_date', 'DESC')
->setMaxResults(20);
$stmt = $qb->executeQuery();
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
@@ -162,7 +162,7 @@ class JobRepository
private function returnFirstOrNull(QueryBuilder $qb): ?Job
{
$stmt = $qb->executeQuery();
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return null;
}

View File

@@ -50,7 +50,7 @@ class ScheduledDataflowRepository
->orderBy('next', 'ASC')
;
$stmt = $qb->executeQuery();
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
@@ -74,7 +74,7 @@ class ScheduledDataflowRepository
$qb = $this->createQueryBuilder();
$qb->orderBy('label', 'ASC');
$stmt = $qb->executeQuery();
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return [];
}
@@ -92,7 +92,7 @@ class ScheduledDataflowRepository
->orderBy('w.label', 'ASC')
->groupBy('w.id');
return $query->executeQuery()->fetchAllAssociative();
return $query->execute()->fetchAllAssociative();
}
public function save(ScheduledDataflow $scheduledDataflow)
@@ -138,7 +138,7 @@ class ScheduledDataflowRepository
private function returnFirstOrNull(QueryBuilder $qb): ?ScheduledDataflow
{
$stmt = $qb->executeQuery();
$stmt = $qb->execute();
if (0 === $stmt->rowCount()) {
return null;
}