refactor: extension worker (#1910)

* refactor: extension worker

* feat: optional HTTP request

* allow passing unsafe.Pointer to the extension callback

* lint

* simplify
This commit is contained in:
Kévin Dunglas
2025-10-09 14:10:09 +02:00
committed by GitHub
parent 1f6f768c97
commit c42d287138
9 changed files with 232 additions and 195 deletions

10
cgi.go
View File

@@ -214,8 +214,10 @@ func go_register_variables(threadIndex C.uintptr_t, trackVarsArray *C.zval) {
thread := phpThreads[threadIndex]
fc := thread.getRequestContext()
addKnownVariablesToServer(thread, fc, trackVarsArray)
addHeadersToServer(fc, trackVarsArray)
if fc.request != nil {
addKnownVariablesToServer(thread, fc, trackVarsArray)
addHeadersToServer(fc, trackVarsArray)
}
// The Prepared Environment is registered last and can overwrite any previous values
addPreparedEnvToServer(fc, trackVarsArray)
@@ -280,6 +282,10 @@ func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info)
fc := thread.getRequestContext()
request := fc.request
if request == nil {
return C.bool(fc.worker != nil)
}
authUser, authPassword, ok := request.BasicAuth()
if ok {
if authPassword != "" {

View File

@@ -42,13 +42,18 @@ func fromContext(ctx context.Context) (fctx *frankenPHPContext, ok bool) {
return
}
// NewRequestWithContext creates a new FrankenPHP request context.
func NewRequestWithContext(r *http.Request, opts ...RequestOption) (*http.Request, error) {
fc := &frankenPHPContext{
func newFrankenPHPContext() *frankenPHPContext {
return &frankenPHPContext{
done: make(chan any),
startedAt: time.Now(),
request: r,
}
}
// NewRequestWithContext creates a new FrankenPHP request context.
func NewRequestWithContext(r *http.Request, opts ...RequestOption) (*http.Request, error) {
fc := newFrankenPHPContext()
fc.request = r
for _, o := range opts {
if err := o(fc); err != nil {
return nil, err
@@ -132,6 +137,10 @@ func (fc *frankenPHPContext) validate() bool {
}
func (fc *frankenPHPContext) clientHasClosed() bool {
if fc.request == nil {
return false
}
select {
case <-fc.request.Context().Done():
return true

View File

@@ -215,7 +215,7 @@ func Init(options ...Option) error {
registerExtensions()
// add registered external workers
for _, ew := range externalWorkers {
for _, ew := range extensionWorkers {
options = append(options, WithWorkers(ew.Name(), ew.FileName(), ew.GetMinThreads(), WithWorkerEnv(ew.Env())))
}
@@ -527,8 +527,12 @@ func go_read_post(threadIndex C.uintptr_t, cBuf *C.char, countBytes C.size_t) (r
//export go_read_cookies
func go_read_cookies(threadIndex C.uintptr_t) *C.char {
cookies := phpThreads[threadIndex].getRequestContext().request.Header.Values("Cookie")
cookie := strings.Join(cookies, "; ")
request := phpThreads[threadIndex].getRequestContext().request
if request == nil {
return nil
}
cookie := strings.Join(request.Header.Values("Cookie"), "; ")
if cookie == "" {
return nil
}

View File

@@ -1,145 +0,0 @@
package frankenphp
import (
"context"
"log/slog"
"net/http"
"sync"
"sync/atomic"
)
// EXPERIMENTAL: WorkerExtension allows you to register an external worker where instead of calling frankenphp handlers on
// frankenphp_handle_request(), the ProvideRequest method is called. You are responsible for providing a standard
// http.Request that will be conferred to the underlying worker script.
//
// A worker script with the provided Name and FileName will be registered, along with the provided
// configuration. You can also provide any environment variables that you want through Env. GetMinThreads allows you to
// reserve a minimum number of threads from the frankenphp thread pool. This number must be positive.
// These methods are only called once at startup, so register them in an init() function.
//
// When a thread is activated and nearly ready, ThreadActivatedNotification will be called with an opaque threadId;
// this is a time for setting up any per-thread resources. When a thread is about to be returned to the thread pool,
// you will receive a call to ThreadDrainNotification that will inform you of the threadId.
// After the thread is returned to the thread pool, ThreadDeactivatedNotification will be called.
//
// Once you have at least one thread activated, you will receive calls to ProvideRequest where you should respond with
// a request. FrankenPHP will automatically pipe these requests to the worker script and handle the response.
// The piping process is designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down.
//
// Note: External workers receive the lowest priority when determining thread allocations. If GetMinThreads cannot be
// allocated, then frankenphp will panic and provide this information to the user (who will need to allocate more
// total threads). Don't be greedy.
type WorkerExtension interface {
Name() string
FileName() string
Env() PreparedEnv
GetMinThreads() int
ThreadActivatedNotification(threadId int)
ThreadDrainNotification(threadId int)
ThreadDeactivatedNotification(threadId int)
ProvideRequest() *WorkerRequest[any, any]
}
// EXPERIMENTAL
type WorkerRequest[P any, R any] struct {
// The request for your worker script to handle
Request *http.Request
// Response is a response writer that provides the output of the provided request, it must not be nil to access the request body
Response http.ResponseWriter
// CallbackParameters is an optional field that will be converted in PHP types and passed as parameter to the PHP callback
CallbackParameters P
// AfterFunc is an optional function that will be called after the request is processed with the original value, the return of the PHP callback, converted in Go types, is passed as parameter
AfterFunc func(callbackReturn R)
}
var externalWorkers = make(map[string]WorkerExtension)
var externalWorkerMutex sync.Mutex
// EXPERIMENTAL
func RegisterExternalWorker(worker WorkerExtension) {
externalWorkerMutex.Lock()
defer externalWorkerMutex.Unlock()
externalWorkers[worker.Name()] = worker
}
// startExternalWorkerPipe creates a pipe from an external worker to the main worker.
func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) {
for {
rq := externalWorker.ProvideRequest()
if rq == nil || rq.Request == nil {
logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex))
continue
}
r := rq.Request
fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name))
if err != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex), slog.Any("error", err))
continue
}
if fc, ok := fromContext(fr.Context()); ok {
fc.responseWriter = rq.Response
fc.handlerParameters = rq.CallbackParameters
// Queue the request and wait for completion if Done channel was provided
logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex))
w.requestChan <- fc
if rq.AfterFunc != nil {
go func() {
<-fc.done
if rq.AfterFunc != nil {
rq.AfterFunc(fc.handlerReturn)
}
}()
}
}
}
}
type Worker struct {
ExtensionName string
WorkerFileName string
WorkerEnv PreparedEnv
MinThreads int
RequestChan chan *WorkerRequest[any, any]
ActivatedCount atomic.Int32
DrainCount atomic.Int32
}
func (w *Worker) Name() string {
return w.ExtensionName
}
func (w *Worker) FileName() string {
return w.WorkerFileName
}
func (w *Worker) Env() PreparedEnv {
return w.WorkerEnv
}
func (w *Worker) GetMinThreads() int {
return w.MinThreads
}
func (w *Worker) ThreadActivatedNotification(threadId int) {
w.ActivatedCount.Add(1)
}
func (w *Worker) ThreadDrainNotification(threadId int) {
w.DrainCount.Add(1)
}
func (w *Worker) ThreadDeactivatedNotification(threadId int) {
w.DrainCount.Add(-1)
w.ActivatedCount.Add(-1)
}
func (w *Worker) ProvideRequest() *WorkerRequest[any, any] {
return <-w.RequestChan
}

View File

@@ -20,12 +20,12 @@ type workerThread struct {
dummyContext *frankenPHPContext
workerContext *frankenPHPContext
backoff *exponentialBackoff
externalWorker WorkerExtension
externalWorker Worker
isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet
}
func convertToWorkerThread(thread *phpThread, worker *worker) {
externalWorker := externalWorkers[worker.name]
externalWorker := extensionWorkers[worker.name]
thread.setHandler(&workerThread{
state: thread.state,
@@ -204,7 +204,11 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) {
handler.workerContext = fc
handler.state.markAsWaiting(false)
logger.LogAttrs(ctx, slog.LevelDebug, "request handling started", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex), slog.String("url", fc.request.RequestURI))
if fc.request == nil {
logger.LogAttrs(ctx, slog.LevelDebug, "request handling started", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex))
} else {
logger.LogAttrs(ctx, slog.LevelDebug, "request handling started", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex), slog.String("url", fc.request.RequestURI))
}
return true, fc.handlerParameters
}
@@ -217,10 +221,18 @@ func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) (C.bool,
hasRequest, parameters := handler.waitForWorkerRequest()
if parameters != nil {
p := PHPValue(parameters)
handler.thread.Pin(p)
var ptr unsafe.Pointer
return C.bool(hasRequest), p
switch p := parameters.(type) {
case unsafe.Pointer:
ptr = p
default:
ptr = PHPValue(ptr)
}
handler.thread.Pin(ptr)
return C.bool(hasRequest), ptr
}
return C.bool(hasRequest), nil
@@ -239,7 +251,11 @@ func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t, retval *C.zval
fc.closeContext()
thread.handler.(*workerThread).workerContext = nil
fc.logger.LogAttrs(context.Background(), slog.LevelDebug, "request handling finished", slog.String("worker", fc.scriptFilename), slog.Int("thread", thread.threadIndex), slog.String("url", fc.request.RequestURI))
if fc.request == nil {
fc.logger.LogAttrs(context.Background(), slog.LevelDebug, "request handling finished", slog.String("worker", fc.scriptFilename), slog.Int("thread", thread.threadIndex))
} else {
fc.logger.LogAttrs(context.Background(), slog.LevelDebug, "request handling finished", slog.String("worker", fc.scriptFilename), slog.Int("thread", thread.threadIndex), slog.String("url", fc.request.RequestURI))
}
}
// when frankenphp_finish_request() is directly called from PHP

View File

@@ -5,6 +5,7 @@ package frankenphp
*/
import "C"
import (
"fmt"
"strconv"
"unsafe"
)
@@ -284,7 +285,7 @@ func phpValue(value any) *C.zval {
case []any:
return (*C.zval)(PHPPackedArray(v))
default:
C.__zval_null__(&zval)
panic(fmt.Sprintf("unsupported Go type %T", v))
}
return &zval

View File

@@ -55,7 +55,7 @@ func initWorkers(opt []workerOpt) error {
// create a pipe from the external worker to the main worker
// note: this is locked to the initial thread size the external worker requested
if workerThread, ok := thread.handler.(*workerThread); ok && workerThread.externalWorker != nil {
go startExternalWorkerPipe(w, workerThread.externalWorker, thread)
go startWorker(w, workerThread.externalWorker, thread)
}
workersReady.Done()
}()

167
workerextension.go Normal file
View File

@@ -0,0 +1,167 @@
package frankenphp
import (
"context"
"log/slog"
"net/http"
"sync"
"sync/atomic"
)
// EXPERIMENTAL: Worker allows you to register a worker where instead of calling FrankenPHP handlers on
// frankenphp_handle_request(), the ProvideRequest method is called. You may provide a standard
// http.Request that will be conferred to the underlying worker script.
//
// A worker script with the provided Name and FileName will be registered, along with the provided
// configuration. You can also provide any environment variables that you want through Env. GetMinThreads allows you to
// reserve a minimum number of threads from the frankenphp thread pool. This number must be positive.
// These methods are only called once at startup, so register them in an init() function.
//
// When a thread is activated and nearly ready, ThreadActivatedNotification will be called with an opaque threadId;
// this is a time for setting up any per-thread resources. When a thread is about to be returned to the thread pool,
// you will receive a call to ThreadDrainNotification that will inform you of the threadId.
// After the thread is returned to the thread pool, ThreadDeactivatedNotification will be called.
//
// Once you have at least one thread activated, you will receive calls to ProvideRequest where you should respond with
// a request. FrankenPHP will automatically pipe these requests to the worker script and handle the response.
// The piping process is designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down.
//
// Note: External workers receive the lowest priority when determining thread allocations. If GetMinThreads cannot be
// allocated, then frankenphp will panic and provide this information to the user (who will need to allocate more
// total threads). Don't be greedy.
type Worker interface {
Name() string
FileName() string
Env() PreparedEnv
GetMinThreads() int
ThreadActivatedNotification(threadId int)
ThreadDrainNotification(threadId int)
ThreadDeactivatedNotification(threadId int)
ProvideRequest() *WorkerRequest
InjectRequest(r *WorkerRequest)
}
// EXPERIMENTAL
type WorkerRequest struct {
// The request for your worker script to handle
Request *http.Request
// Response is a response writer that provides the output of the provided request, it must not be nil to access the request body
Response http.ResponseWriter
// CallbackParameters is an optional field that will be converted in PHP types and passed as parameter to the PHP callback
CallbackParameters any
// AfterFunc is an optional function that will be called after the request is processed with the original value, the return of the PHP callback, converted in Go types, is passed as parameter
AfterFunc func(callbackReturn any)
}
var extensionWorkers = make(map[string]Worker)
var extensionWorkersMutex sync.Mutex
// EXPERIMENTAL
func RegisterWorker(worker Worker) {
extensionWorkersMutex.Lock()
defer extensionWorkersMutex.Unlock()
extensionWorkers[worker.Name()] = worker
}
// startWorker creates a pipe from a worker to the main worker.
func startWorker(w *worker, extensionWorker Worker, thread *phpThread) {
for {
rq := extensionWorker.ProvideRequest()
var fc *frankenPHPContext
if rq.Request == nil {
fc = newFrankenPHPContext()
fc.logger = logger
} else {
fr, err := NewRequestWithContext(rq.Request, WithOriginalRequest(rq.Request))
if err != nil {
logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex), slog.Any("error", err))
continue
}
var ok bool
if fc, ok = fromContext(fr.Context()); !ok {
continue
}
}
fc.worker = w
fc.responseWriter = rq.Response
fc.handlerParameters = rq.CallbackParameters
// Queue the request and wait for completion if Done channel was provided
logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex))
w.requestChan <- fc
if rq.AfterFunc != nil {
go func() {
<-fc.done
if rq.AfterFunc != nil {
rq.AfterFunc(fc.handlerReturn)
}
}()
}
}
}
func NewWorker(name, fileName string, minThreads int, env PreparedEnv) Worker {
return &defaultWorker{
name: name,
fileName: fileName,
env: env,
minThreads: minThreads,
requestChan: make(chan *WorkerRequest),
activatedCount: atomic.Int32{},
drainCount: atomic.Int32{},
}
}
type defaultWorker struct {
name string
fileName string
env PreparedEnv
minThreads int
requestChan chan *WorkerRequest
activatedCount atomic.Int32
drainCount atomic.Int32
}
func (w *defaultWorker) Name() string {
return w.name
}
func (w *defaultWorker) FileName() string {
return w.fileName
}
func (w *defaultWorker) Env() PreparedEnv {
return w.env
}
func (w *defaultWorker) GetMinThreads() int {
return w.minThreads
}
func (w *defaultWorker) ThreadActivatedNotification(_ int) {
w.activatedCount.Add(1)
}
func (w *defaultWorker) ThreadDrainNotification(_ int) {
w.drainCount.Add(1)
}
func (w *defaultWorker) ThreadDeactivatedNotification(_ int) {
w.drainCount.Add(-1)
w.activatedCount.Add(-1)
}
func (w *defaultWorker) ProvideRequest() *WorkerRequest {
return <-w.requestChan
}
func (w *defaultWorker) InjectRequest(r *WorkerRequest) {
w.requestChan <- r
}

View File

@@ -3,7 +3,6 @@ package frankenphp
import (
"io"
"net/http/httptest"
"sync/atomic"
"testing"
"time"
@@ -11,43 +10,23 @@ import (
"github.com/stretchr/testify/require"
)
// mockWorkerExtension implements the WorkerExtension interface
type mockWorkerExtension struct {
// mockWorker implements the Worker interface
type mockWorker struct {
Worker
}
func newMockWorkerExtension(name, fileName string, minThreads int) *mockWorkerExtension {
return &mockWorkerExtension{
Worker: Worker{
ExtensionName: name,
WorkerFileName: fileName,
WorkerEnv: nil,
MinThreads: minThreads,
RequestChan: make(chan *WorkerRequest[any, any], minThreads),
ActivatedCount: atomic.Int32{},
DrainCount: atomic.Int32{},
},
}
}
func (m *mockWorkerExtension) InjectRequest(r *WorkerRequest[any, any]) {
m.RequestChan <- r
}
func (m *mockWorkerExtension) GetActivatedCount() int {
return int(m.ActivatedCount.Load())
}
func TestWorkerExtension(t *testing.T) {
// Create a mock extension
mockExt := newMockWorkerExtension("mockWorker", "testdata/worker.php", 1)
// Create a mock worker extension
mockExt := &mockWorker{
Worker: NewWorker("mockWorker", "testdata/worker.php", 1, nil),
}
// Register the mock extension
RegisterExternalWorker(mockExt)
RegisterWorker(mockExt)
// Clean up external workers after test to avoid interfering with other tests
defer func() {
delete(externalWorkers, mockExt.Name())
delete(extensionWorkers, mockExt.Name())
}()
// Initialize FrankenPHP with a worker that has a different name than our extension
@@ -59,10 +38,10 @@ func TestWorkerExtension(t *testing.T) {
time.Sleep(100 * time.Millisecond)
// Verify that the extension's thread was activated
assert.GreaterOrEqual(t, mockExt.GetActivatedCount(), 1, "Thread should have been activated")
assert.GreaterOrEqual(t, int(mockExt.Worker.(*defaultWorker).activatedCount.Load()), 1, "Thread should have been activated")
// Create a test request
req := httptest.NewRequest("GET", "http://example.com/test/?foo=bar", nil)
req := httptest.NewRequest("GET", "https://example.com/test/?foo=bar", nil)
req.Header.Set("X-Test-Header", "test-value")
w := httptest.NewRecorder()
@@ -71,7 +50,7 @@ func TestWorkerExtension(t *testing.T) {
done := make(chan struct{})
// Inject the request into the worker through the extension
mockExt.InjectRequest(&WorkerRequest[any, any]{
mockExt.InjectRequest(&WorkerRequest{
Request: req,
Response: w,
AfterFunc: func(callbackReturn any) {