Files
archived-frankenphp/worker.go
Marc 1d74b2caa8 feat: define domain specific workers in php_server and php blocks (#1509)
* add module (php_server directive) based workers

* refactor moduleID to uintptr for faster comparisons

* let workers inherit environment variables and root from php_server

* caddy can shift FrankenPHPModules in memory for some godforsaken reason, can't rely on them staying the same

* remove debugging statement

* fix tests

* refactor moduleID to uint64 for faster comparisons

* actually allow multiple workers per script filename

* remove logging

* utility function

* reuse existing worker with same filename and environment when calling newWorker with a filepath that already has a suitable worker, simply add number of threads

* no cleanup happens between tests, so restore old global worker overwriting logic

* add test, use getWorker(ForContext) function in frankenphp.go as well

* bring error on second global worker with the same filename again

* refactor to using name instead of moduleID

* nicer name

* nicer name

* add more tests

* remove test case already covered by previous test

* revert back to single variable, moduleIDs no longer relevant

* update comment

* figure out the worker to use in FrankenPHPModule::ServeHTTP

* add caddy/config_tests, add --retry 5 to download

* add caddy/config_tests

* sum up logic a bit, put worker thread addition into moduleWorkers parsing, before workers are actually created

* implement suggestions as far as possible

* fixup

* remove tags

* feat: download the mostly static binary when possible (#1467)

* feat: download the mostly static binary when possible

* cs

* docs: remove wildcard matcher from root directive (#1513)

* docs: update README with additional documentation links

Add link to classic mode, efficiently serving large static files and monitoring FrankenPHP

Signed-off-by: Romain Bastide <romain.bastide@orange.com>

* ci: combine dependabot updates for one group to 1 pull-request

* feat: compatibility with libphp.dylib on macOS

* feat: upgrade to Caddy 2.10

* feat: upgrade to Caddy 2.10

* chore: run prettier

* fix: build-static.sh consecutive builds (#1496)

* fix consecutive builds

* use minor version in PHP_VERSION

* install jq in centos container

* fix "arm64" download arch for spc binary

* jq is not available as a rpm download

* linter

* specify php 8.4 default

specify 8.4 so we manually switch to 8.5 when we make sure it works
allows to run without jq installed

* Apply suggestions from code review

Co-authored-by: Kévin Dunglas <kevin@dunglas.fr>

---------

Co-authored-by: Kévin Dunglas <kevin@dunglas.fr>

* chore: update Go and toolchain version (#1526)

* apply suggestions one be one - scriptpath only

* generate unique worker names by filename and number

* support worker config from embedded apps

* rename back to make sure we don't accidentally add FrankenPHPApp workers to the slice

* fix test after changing error message

* use 🧩 for module workers

* use 🌍 for global workers :)

* revert 1c414cebbc

* revert 4cc8893ced

* apply suggestions

* add dynamic config loading test of module worker

* fix test

* minor changes

---------

Signed-off-by: Romain Bastide <romain.bastide@orange.com>
Co-authored-by: Kévin Dunglas <kevin@dunglas.fr>
Co-authored-by: Indra Gunawan <hello@indra.my.id>
Co-authored-by: Romain Bastide <romain.bastide@orange.com>
2025-05-05 16:14:19 +02:00

234 lines
5.2 KiB
Go

package frankenphp
// #include "frankenphp.h"
import "C"
import (
"fmt"
"strings"
"sync"
"time"
"github.com/dunglas/frankenphp/internal/fastabs"
"github.com/dunglas/frankenphp/internal/watcher"
)
// represents a worker script and can have many threads assigned to it
type worker struct {
name string
fileName string
num int
env PreparedEnv
requestChan chan *frankenPHPContext
threads []*phpThread
threadMutex sync.RWMutex
}
var (
workers map[string]*worker
watcherIsEnabled bool
)
func initWorkers(opt []workerOpt) error {
workers = make(map[string]*worker, len(opt))
workersReady := sync.WaitGroup{}
directoriesToWatch := getDirectoriesToWatch(opt)
watcherIsEnabled = len(directoriesToWatch) > 0
for _, o := range opt {
worker, err := newWorker(o)
if err != nil {
return err
}
workersReady.Add(o.num)
for i := 0; i < worker.num; i++ {
thread := getInactivePHPThread()
convertToWorkerThread(thread, worker)
go func() {
thread.state.waitFor(stateReady)
workersReady.Done()
}()
}
}
workersReady.Wait()
if !watcherIsEnabled {
return nil
}
watcherIsEnabled = true
if err := watcher.InitWatcher(directoriesToWatch, RestartWorkers, logger); err != nil {
return err
}
return nil
}
func getWorkerKey(name string, filename string) string {
key := filename
if strings.HasPrefix(name, "m#") {
key = name
}
return key
}
func newWorker(o workerOpt) (*worker, error) {
absFileName, err := fastabs.FastAbs(o.fileName)
if err != nil {
return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
}
key := getWorkerKey(o.name, absFileName)
if _, ok := workers[key]; ok {
return nil, fmt.Errorf("two workers cannot use the same key %q", key)
}
for _, w := range workers {
if w.name == o.name {
return w, fmt.Errorf("two workers cannot have the same name: %q", o.name)
}
}
if o.env == nil {
o.env = make(PreparedEnv, 1)
}
if o.name == "" {
o.name = absFileName
}
o.env["FRANKENPHP_WORKER\x00"] = "1"
w := &worker{
name: o.name,
fileName: absFileName,
num: o.num,
env: o.env,
requestChan: make(chan *frankenPHPContext),
threads: make([]*phpThread, 0, o.num),
}
workers[key] = w
return w, nil
}
// EXPERIMENTAL: DrainWorkers finishes all worker scripts before a graceful shutdown
func DrainWorkers() {
_ = drainWorkerThreads()
}
func drainWorkerThreads() []*phpThread {
ready := sync.WaitGroup{}
drainedThreads := make([]*phpThread, 0)
for _, worker := range workers {
worker.threadMutex.RLock()
ready.Add(len(worker.threads))
for _, thread := range worker.threads {
if !thread.state.requestSafeStateChange(stateRestarting) {
// no state change allowed == thread is shutting down
// we'll proceed to restart all other threads anyways
continue
}
close(thread.drainChan)
drainedThreads = append(drainedThreads, thread)
go func(thread *phpThread) {
thread.state.waitFor(stateYielding)
ready.Done()
}(thread)
}
worker.threadMutex.RUnlock()
}
ready.Wait()
return drainedThreads
}
func drainWatcher() {
if watcherIsEnabled {
watcher.DrainWatcher()
}
}
// RestartWorkers attempts to restart all workers gracefully
func RestartWorkers() {
// disallow scaling threads while restarting workers
scalingMu.Lock()
defer scalingMu.Unlock()
threadsToRestart := drainWorkerThreads()
for _, thread := range threadsToRestart {
thread.drainChan = make(chan struct{})
thread.state.set(stateReady)
}
}
func getDirectoriesToWatch(workerOpts []workerOpt) []string {
directoriesToWatch := []string{}
for _, w := range workerOpts {
directoriesToWatch = append(directoriesToWatch, w.watch...)
}
return directoriesToWatch
}
func (worker *worker) attachThread(thread *phpThread) {
worker.threadMutex.Lock()
worker.threads = append(worker.threads, thread)
worker.threadMutex.Unlock()
}
func (worker *worker) detachThread(thread *phpThread) {
worker.threadMutex.Lock()
for i, t := range worker.threads {
if t == thread {
worker.threads = append(worker.threads[:i], worker.threads[i+1:]...)
break
}
}
worker.threadMutex.Unlock()
}
func (worker *worker) countThreads() int {
worker.threadMutex.RLock()
l := len(worker.threads)
worker.threadMutex.RUnlock()
return l
}
func (worker *worker) handleRequest(fc *frankenPHPContext) {
metrics.StartWorkerRequest(worker.name)
// dispatch requests to all worker threads in order
worker.threadMutex.RLock()
for _, thread := range worker.threads {
select {
case thread.requestChan <- fc:
worker.threadMutex.RUnlock()
<-fc.done
metrics.StopWorkerRequest(worker.name, time.Since(fc.startedAt))
return
default:
// thread is busy, continue
}
}
worker.threadMutex.RUnlock()
// if no thread was available, mark the request as queued and apply the scaling strategy
metrics.QueuedWorkerRequest(worker.name)
for {
select {
case worker.requestChan <- fc:
metrics.DequeuedWorkerRequest(worker.name)
<-fc.done
metrics.StopWorkerRequest(worker.name, time.Since(fc.startedAt))
return
case scaleChan <- fc:
// the request has triggered scaling, continue to wait for a thread
case <-timeoutChan(maxWaitTime):
// the request has timed out stalling
fc.reject(504, "Gateway Timeout")
return
}
}
}