add error handling and handle shutdowns

Signed-off-by: Robert Landers <landers.robert@gmail.com>
This commit is contained in:
Robert Landers
2025-08-09 12:18:05 +02:00
committed by Kévin Dunglas
parent ceec4a0d3d
commit b23db79d2d
3 changed files with 91 additions and 20 deletions

View File

@@ -302,6 +302,7 @@ func Shutdown() {
drainWatcher()
drainAutoScaling()
drainExternalWorkerPipes()
drainPHPThreads()
metrics.Shutdown()

View File

@@ -1,6 +1,8 @@
package frankenphp
import (
"context"
"log/slog"
"net/http"
"sync"
)
@@ -20,10 +22,11 @@ import (
// After the thread is returned to the thread pool, ThreadDeactivatedNotification will be called.
//
// Once you have at least one thread activated, you will receive calls to ProvideRequest where you should respond with
// a request.
// a request. FrankenPHP will automatically pipe these requests to the worker script and handle the response.
// The piping process is designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down.
//
// Note: External workers receive the lowest priority when determining thread allocations. If GetMinThreads cannot be
// allocated, then frankenphp will panic and provide this information to the user (who will need to allocation more
// allocated, then frankenphp will panic and provide this information to the user (who will need to allocate more
// total threads). Don't be greedy.
type WorkerExtension interface {
Name() string
@@ -44,9 +47,93 @@ type WorkerRequest struct {
var externalWorkers = make(map[string]WorkerExtension)
var externalWorkerMutex sync.Mutex
var externalWorkerPipes = make(map[string]context.CancelFunc)
var externalWorkerPipesMutex sync.Mutex
func RegisterExternalWorker(worker WorkerExtension) {
externalWorkerMutex.Lock()
defer externalWorkerMutex.Unlock()
externalWorkers[worker.Name()] = worker
}
// startExternalWorkerPipe creates a pipe from an external worker to the main worker.
func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) {
ctx, cancel := context.WithCancel(context.Background())
// Register the cancel function for shutdown
externalWorkerPipesMutex.Lock()
externalWorkerPipes[w.name] = cancel
externalWorkerPipesMutex.Unlock()
go func() {
defer func() {
if r := recover(); r != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "external worker pipe panicked", slog.String("worker", w.name), slog.Any("panic", r))
}
externalWorkerPipesMutex.Lock()
delete(externalWorkerPipes, w.name)
externalWorkerPipesMutex.Unlock()
}()
for {
select {
case <-ctx.Done():
logger.LogAttrs(context.Background(), slog.LevelDebug, "external worker pipe shutting down", slog.String("worker", w.name))
return
default:
}
var rq *WorkerRequest
func() {
defer func() {
if r := recover(); r != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "ProvideRequest panicked", slog.String("worker", w.name), slog.Any("panic", r))
rq = nil
}
}()
rq = externalWorker.ProvideRequest()
}()
if rq == nil || rq.Request == nil {
logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name))
continue
}
r := rq.Request
fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name))
if err != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err))
continue
}
if fc, ok := fromContext(fr.Context()); ok {
fc.responseWriter = rq.Response
select {
case w.requestChan <- fc:
// Request successfully queued
case <-ctx.Done():
fc.reject(503, "Service Unavailable")
return
}
}
}
}()
}
// drainExternalWorkerPipes shuts down all external worker pipes gracefully
func drainExternalWorkerPipes() {
externalWorkerPipesMutex.Lock()
defer externalWorkerPipesMutex.Unlock()
logger.LogAttrs(context.Background(), slog.LevelDebug, "shutting down external worker pipes", slog.Int("count", len(externalWorkerPipes)))
for workerName, cancel := range externalWorkerPipes {
logger.LogAttrs(context.Background(), slog.LevelDebug, "shutting down external worker pipe", slog.String("worker", workerName))
cancel()
}
// Clear the map
externalWorkerPipes = make(map[string]context.CancelFunc)
}

View File

@@ -3,9 +3,7 @@ package frankenphp
// #include "frankenphp.h"
import "C"
import (
"context"
"fmt"
"log/slog"
"strings"
"sync"
"time"
@@ -57,22 +55,7 @@ func initWorkers(opt []workerOpt) error {
// create a pipe from the external worker to the main worker
// note: this is locked to the initial thread size the external worker requested
if workerThread, ok := thread.handler.(*workerThread); ok && workerThread.externalWorker != nil {
go func(w *worker, externalWorker WorkerExtension) {
for {
// todo: handle shutdown
rq := externalWorker.ProvideRequest()
r := rq.Request
fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name))
if err != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Any("error", err))
continue
}
if fc, ok := fromContext(fr.Context()); ok {
fc.responseWriter = rq.Response
w.requestChan <- fc
}
}
}(w, workerThread.externalWorker)
startExternalWorkerPipe(w, workerThread.externalWorker, thread)
}
workersReady.Done()
}()