mirror of
https://github.com/php/frankenphp.git
synced 2026-03-24 00:52:11 +01:00
suggestion: simplify exponential backoff (#1970)
* removes backoff. * Adjusts comment. * Suggestions by @dunglas * Removes 'max_consecutive_failures' * Removes 'max_consecutive_failures' * Adjusts warning. * Disables the logger in tests. * Revert "Adjusts warning." This reverts commite93a6a9301. * Revert "Removes 'max_consecutive_failures'" This reverts commitba28ea0e4a. * Revert "Removes 'max_consecutive_failures'" This reverts commit32e649caf7. * Only fails on max failures again. * Restores failure timings.
This commit is contained in:
committed by
GitHub
parent
4161623736
commit
a36547bc2f
@@ -601,10 +601,12 @@ func testRequestHeaders(t *testing.T, opts *testOptions) {
|
||||
}
|
||||
|
||||
func TestFailingWorker(t *testing.T) {
|
||||
runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, i int) {
|
||||
body, _ := testGet("http://example.com/failing-worker.php", handler, t)
|
||||
assert.Contains(t, body, "ok")
|
||||
}, &testOptions{workerScript: "failing-worker.php"})
|
||||
err := frankenphp.Init(
|
||||
frankenphp.WithLogger(slog.New(slog.NewTextHandler(io.Discard, nil))),
|
||||
frankenphp.WithWorkers("failing worker", "testdata/failing-worker.php", 4, frankenphp.WithWorkerMaxFailures(1)),
|
||||
frankenphp.WithNumThreads(5),
|
||||
)
|
||||
assert.Error(t, err, "should return an immediate error if workers fail on startup")
|
||||
}
|
||||
|
||||
func TestEnv(t *testing.T) {
|
||||
|
||||
@@ -1,59 +0,0 @@
|
||||
package backoff
|
||||
|
||||
import "C"
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ExponentialBackoff struct {
|
||||
backoff time.Duration
|
||||
failureCount int
|
||||
mu sync.RWMutex
|
||||
MaxBackoff time.Duration
|
||||
MinBackoff time.Duration
|
||||
MaxConsecutiveFailures int
|
||||
}
|
||||
|
||||
// recordSuccess resets the backoff and failureCount
|
||||
func (e *ExponentialBackoff) RecordSuccess() {
|
||||
e.mu.Lock()
|
||||
e.failureCount = 0
|
||||
e.backoff = e.MinBackoff
|
||||
e.mu.Unlock()
|
||||
}
|
||||
|
||||
// recordFailure increments the failure count and increases the backoff, it returns true if MaxConsecutiveFailures has been reached
|
||||
func (e *ExponentialBackoff) RecordFailure() bool {
|
||||
e.mu.Lock()
|
||||
e.failureCount += 1
|
||||
if e.backoff < e.MinBackoff {
|
||||
e.backoff = e.MinBackoff
|
||||
}
|
||||
|
||||
e.backoff = min(e.backoff*2, e.MaxBackoff)
|
||||
|
||||
e.mu.Unlock()
|
||||
return e.MaxConsecutiveFailures != -1 && e.failureCount >= e.MaxConsecutiveFailures
|
||||
}
|
||||
|
||||
// wait sleeps for the backoff duration if failureCount is non-zero.
|
||||
// NOTE: this is not tested and should be kept 'obviously correct' (i.e., simple)
|
||||
func (e *ExponentialBackoff) Wait() {
|
||||
e.mu.RLock()
|
||||
if e.failureCount == 0 {
|
||||
e.mu.RUnlock()
|
||||
|
||||
return
|
||||
}
|
||||
e.mu.RUnlock()
|
||||
|
||||
time.Sleep(e.backoff)
|
||||
}
|
||||
|
||||
func (e *ExponentialBackoff) FailureCount() int {
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
|
||||
return e.failureCount
|
||||
}
|
||||
@@ -1,41 +0,0 @@
|
||||
package backoff
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestExponentialBackoff_Reset(t *testing.T) {
|
||||
e := &ExponentialBackoff{
|
||||
MaxBackoff: 5 * time.Second,
|
||||
MinBackoff: 500 * time.Millisecond,
|
||||
MaxConsecutiveFailures: 3,
|
||||
}
|
||||
|
||||
assert.False(t, e.RecordFailure())
|
||||
assert.False(t, e.RecordFailure())
|
||||
e.RecordSuccess()
|
||||
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
assert.Equal(t, 0, e.failureCount, "expected failureCount to be reset to 0")
|
||||
assert.Equal(t, e.backoff, e.MinBackoff, "expected backoff to be reset to MinBackoff")
|
||||
}
|
||||
|
||||
func TestExponentialBackoff_Trigger(t *testing.T) {
|
||||
e := &ExponentialBackoff{
|
||||
MaxBackoff: 500 * 3 * time.Millisecond,
|
||||
MinBackoff: 500 * time.Millisecond,
|
||||
MaxConsecutiveFailures: 3,
|
||||
}
|
||||
|
||||
assert.False(t, e.RecordFailure())
|
||||
assert.False(t, e.RecordFailure())
|
||||
assert.True(t, e.RecordFailure())
|
||||
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
assert.Equal(t, e.failureCount, e.MaxConsecutiveFailures, "expected failureCount to be MaxConsecutiveFailures")
|
||||
assert.Equal(t, e.backoff, e.MaxBackoff, "expected backoff to be MaxBackoff")
|
||||
}
|
||||
@@ -175,9 +175,9 @@ func TestFinishBootingAWorkerScript(t *testing.T) {
|
||||
|
||||
func TestReturnAnErrorIf2WorkersHaveTheSameFileName(t *testing.T) {
|
||||
workers = []*worker{}
|
||||
w, err1 := newWorker(workerOpt{fileName: "filename.php", maxConsecutiveFailures: defaultMaxConsecutiveFailures})
|
||||
w, err1 := newWorker(workerOpt{fileName: "filename.php"})
|
||||
workers = append(workers, w)
|
||||
_, err2 := newWorker(workerOpt{fileName: "filename.php", maxConsecutiveFailures: defaultMaxConsecutiveFailures})
|
||||
_, err2 := newWorker(workerOpt{fileName: "filename.php"})
|
||||
|
||||
assert.NoError(t, err1)
|
||||
assert.Error(t, err2, "two workers cannot have the same filename")
|
||||
@@ -185,9 +185,9 @@ func TestReturnAnErrorIf2WorkersHaveTheSameFileName(t *testing.T) {
|
||||
|
||||
func TestReturnAnErrorIf2ModuleWorkersHaveTheSameName(t *testing.T) {
|
||||
workers = []*worker{}
|
||||
w, err1 := newWorker(workerOpt{fileName: "filename.php", name: "workername", maxConsecutiveFailures: defaultMaxConsecutiveFailures})
|
||||
w, err1 := newWorker(workerOpt{fileName: "filename.php", name: "workername"})
|
||||
workers = append(workers, w)
|
||||
_, err2 := newWorker(workerOpt{fileName: "filename2.php", name: "workername", maxConsecutiveFailures: defaultMaxConsecutiveFailures})
|
||||
_, err2 := newWorker(workerOpt{fileName: "filename2.php", name: "workername"})
|
||||
|
||||
assert.NoError(t, err1)
|
||||
assert.Error(t, err2, "two workers cannot have the same name")
|
||||
@@ -198,9 +198,8 @@ func getDummyWorker(fileName string) *worker {
|
||||
workers = []*worker{}
|
||||
}
|
||||
worker, _ := newWorker(workerOpt{
|
||||
fileName: testDataPath + "/" + fileName,
|
||||
num: 1,
|
||||
maxConsecutiveFailures: defaultMaxConsecutiveFailures,
|
||||
fileName: testDataPath + "/" + fileName,
|
||||
num: 1,
|
||||
})
|
||||
workers = append(workers, worker)
|
||||
return worker
|
||||
|
||||
17
testdata/failing-worker.php
vendored
17
testdata/failing-worker.php
vendored
@@ -1,18 +1,7 @@
|
||||
<?php
|
||||
|
||||
$fail = random_int(1, 100) < 10;
|
||||
$wait = random_int(1000 * 100, 1000 * 500); // wait 100ms - 500ms
|
||||
|
||||
usleep($wait);
|
||||
if ($fail) {
|
||||
exit(1);
|
||||
if (rand(1, 100) <= 50) {
|
||||
throw new Exception('this exception is expected to fail the worker');
|
||||
}
|
||||
|
||||
while (frankenphp_handle_request(function () {
|
||||
echo "ok";
|
||||
})) {
|
||||
$fail = random_int(1, 100) < 10;
|
||||
if ($fail) {
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
// frankenphp_handle_request() has not been reached (also a failure)
|
||||
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/dunglas/frankenphp/internal/backoff"
|
||||
"github.com/dunglas/frankenphp/internal/state"
|
||||
)
|
||||
|
||||
@@ -23,8 +22,8 @@ type workerThread struct {
|
||||
worker *worker
|
||||
dummyContext *frankenPHPContext
|
||||
workerContext *frankenPHPContext
|
||||
backoff *backoff.ExponentialBackoff
|
||||
isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet
|
||||
failureCount int // number of consecutive startup failures
|
||||
}
|
||||
|
||||
func convertToWorkerThread(thread *phpThread, worker *worker) {
|
||||
@@ -32,11 +31,6 @@ func convertToWorkerThread(thread *phpThread, worker *worker) {
|
||||
state: thread.state,
|
||||
thread: thread,
|
||||
worker: worker,
|
||||
backoff: &backoff.ExponentialBackoff{
|
||||
MaxBackoff: 1 * time.Second,
|
||||
MinBackoff: 100 * time.Millisecond,
|
||||
MaxConsecutiveFailures: worker.maxConsecutiveFailures,
|
||||
},
|
||||
})
|
||||
worker.attachThread(thread)
|
||||
}
|
||||
@@ -92,7 +86,6 @@ func (handler *workerThread) name() string {
|
||||
}
|
||||
|
||||
func setupWorkerScript(handler *workerThread, worker *worker) {
|
||||
handler.backoff.Wait()
|
||||
metrics.StartWorker(worker.name)
|
||||
|
||||
if handler.state.Is(state.Ready) {
|
||||
@@ -132,7 +125,6 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) {
|
||||
// on exit status 0 we just run the worker script again
|
||||
if exitStatus == 0 && !handler.isBootingScript {
|
||||
metrics.StopWorker(worker.name, StopReasonRestart)
|
||||
handler.backoff.RecordSuccess()
|
||||
logger.LogAttrs(ctx, slog.LevelDebug, "restarting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("exit_status", exitStatus))
|
||||
|
||||
return
|
||||
@@ -148,16 +140,30 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) {
|
||||
return
|
||||
}
|
||||
|
||||
logger.LogAttrs(ctx, slog.LevelError, "worker script has not reached frankenphp_handle_request()", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex))
|
||||
|
||||
// panic after exponential backoff if the worker has never reached frankenphp_handle_request
|
||||
if handler.backoff.RecordFailure() {
|
||||
if !watcherIsEnabled && !handler.state.Is(state.Ready) {
|
||||
logger.LogAttrs(ctx, slog.LevelError, "too many consecutive worker failures", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.backoff.FailureCount()))
|
||||
panic("too many consecutive worker failures")
|
||||
if worker.maxConsecutiveFailures >= 0 && startupFailChan != nil && !watcherIsEnabled && handler.failureCount >= worker.maxConsecutiveFailures {
|
||||
select {
|
||||
case startupFailChan <- fmt.Errorf("worker failure: script %s has not reached frankenphp_handle_request()", worker.fileName):
|
||||
handler.thread.state.Set(state.ShuttingDown)
|
||||
return
|
||||
}
|
||||
logger.LogAttrs(ctx, slog.LevelWarn, "many consecutive worker failures", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.backoff.FailureCount()))
|
||||
}
|
||||
|
||||
if watcherIsEnabled {
|
||||
// worker script has probably failed due to script changes while watcher is enabled
|
||||
logger.LogAttrs(ctx, slog.LevelWarn, "(watcher enabled) worker script has not reached frankenphp_handle_request()", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex))
|
||||
} else {
|
||||
// rare case where worker script has failed on a restart during normal operation
|
||||
// this can happen if startup success depends on external resources
|
||||
logger.LogAttrs(ctx, slog.LevelWarn, "worker script has failed on restart", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex))
|
||||
}
|
||||
|
||||
// wait a bit and try again (exponential backoff)
|
||||
backoffDuration := time.Duration(handler.failureCount*handler.failureCount*100) * time.Millisecond
|
||||
if backoffDuration > time.Second {
|
||||
backoffDuration = time.Second
|
||||
}
|
||||
handler.failureCount++
|
||||
time.Sleep(backoffDuration)
|
||||
}
|
||||
|
||||
// waitForWorkerRequest is called during frankenphp_handle_request in the php worker script.
|
||||
@@ -171,6 +177,7 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) {
|
||||
// Clear the first dummy request created to initialize the worker
|
||||
if handler.isBootingScript {
|
||||
handler.isBootingScript = false
|
||||
handler.failureCount = 0
|
||||
if !C.frankenphp_shutdown_dummy_request() {
|
||||
panic("Not in CGI context")
|
||||
}
|
||||
|
||||
25
worker.go
25
worker.go
@@ -31,41 +31,52 @@ type worker struct {
|
||||
var (
|
||||
workers []*worker
|
||||
watcherIsEnabled bool
|
||||
startupFailChan chan (error)
|
||||
)
|
||||
|
||||
func initWorkers(opt []workerOpt) error {
|
||||
workers = make([]*worker, 0, len(opt))
|
||||
workersReady := sync.WaitGroup{}
|
||||
directoriesToWatch := getDirectoriesToWatch(opt)
|
||||
watcherIsEnabled = len(directoriesToWatch) > 0
|
||||
totalThreadsToStart := 0
|
||||
|
||||
for _, o := range opt {
|
||||
w, err := newWorker(o)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
totalThreadsToStart += w.num
|
||||
workers = append(workers, w)
|
||||
}
|
||||
|
||||
startupFailChan = make(chan error, totalThreadsToStart)
|
||||
var workersReady sync.WaitGroup
|
||||
for _, w := range workers {
|
||||
workersReady.Add(w.num)
|
||||
for i := 0; i < w.num; i++ {
|
||||
thread := getInactivePHPThread()
|
||||
convertToWorkerThread(thread, w)
|
||||
go func() {
|
||||
thread.state.WaitFor(state.Ready)
|
||||
workersReady.Done()
|
||||
}()
|
||||
workersReady.Go(func() {
|
||||
thread.state.WaitFor(state.Ready, state.ShuttingDown, state.Done)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
workersReady.Wait()
|
||||
|
||||
select {
|
||||
case err := <-startupFailChan:
|
||||
// at least 1 worker has failed, shut down and return an error
|
||||
Shutdown()
|
||||
return fmt.Errorf("failed to initialize workers: %w", err)
|
||||
default:
|
||||
// all workers started successfully
|
||||
startupFailChan = nil
|
||||
}
|
||||
|
||||
if !watcherIsEnabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
watcherIsEnabled = true
|
||||
if err := watcher.InitWatcher(directoriesToWatch, RestartWorkers, logger); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user