mirror of
https://github.com/php/frankenphp.git
synced 2026-03-23 16:42:13 +01:00
perf: various optimizations (#2175)
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
@@ -249,7 +250,7 @@ func TestAddModuleWorkerViaAdminApi(t *testing.T) {
|
||||
initialDebugState := getDebugState(t, tester)
|
||||
initialWorkerCount := 0
|
||||
for _, thread := range initialDebugState.ThreadDebugStates {
|
||||
if thread.Name != "" && thread.Name != "ready" {
|
||||
if strings.HasPrefix(thread.Name, "Worker PHP Thread") {
|
||||
initialWorkerCount++
|
||||
}
|
||||
}
|
||||
@@ -286,7 +287,7 @@ func TestAddModuleWorkerViaAdminApi(t *testing.T) {
|
||||
workerFound := false
|
||||
filename, _ := fastabs.FastAbs("../testdata/worker-with-counter.php")
|
||||
for _, thread := range updatedDebugState.ThreadDebugStates {
|
||||
if thread.Name != "" && thread.Name != "ready" {
|
||||
if strings.HasPrefix(thread.Name, "Worker PHP Thread") {
|
||||
updatedWorkerCount++
|
||||
if thread.Name == "Worker PHP Thread - "+filename {
|
||||
workerFound = true
|
||||
|
||||
@@ -130,6 +130,11 @@ func (f *FrankenPHPModule) Provision(ctx caddy.Context) error {
|
||||
f.ResolveRootSymlink = &rrs
|
||||
}
|
||||
|
||||
// Always pre-compute absolute file names for fallback matching
|
||||
for i := range f.Workers {
|
||||
f.Workers[i].absFileName, _ = fastabs.FastAbs(f.Workers[i].FileName)
|
||||
}
|
||||
|
||||
if !needReplacement(f.Root) {
|
||||
root, err := fastabs.FastAbs(f.Root)
|
||||
if err != nil {
|
||||
@@ -145,13 +150,21 @@ func (f *FrankenPHPModule) Provision(ctx caddy.Context) error {
|
||||
|
||||
f.resolvedDocumentRoot = root
|
||||
|
||||
// Also resolve symlinks in worker file paths when resolve_root_symlink is true
|
||||
// Resolve symlinks in worker file paths
|
||||
for i, wc := range f.Workers {
|
||||
if !filepath.IsAbs(wc.FileName) {
|
||||
continue
|
||||
if filepath.IsAbs(wc.FileName) {
|
||||
resolvedPath, _ := filepath.EvalSymlinks(wc.FileName)
|
||||
f.Workers[i].FileName = resolvedPath
|
||||
f.Workers[i].absFileName = resolvedPath
|
||||
}
|
||||
resolvedPath, _ := filepath.EvalSymlinks(wc.FileName)
|
||||
f.Workers[i].FileName = resolvedPath
|
||||
}
|
||||
}
|
||||
|
||||
// Pre-compute relative match paths for all workers (requires resolved document root)
|
||||
docRootWithSep := f.resolvedDocumentRoot + string(filepath.Separator)
|
||||
for i := range f.Workers {
|
||||
if strings.HasPrefix(f.Workers[i].absFileName, docRootWithSep) {
|
||||
f.Workers[i].matchRelPath = filepath.ToSlash(f.Workers[i].absFileName[len(f.resolvedDocumentRoot):])
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package caddy
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
@@ -43,6 +44,8 @@ type workerConfig struct {
|
||||
|
||||
options []frankenphp.WorkerOption
|
||||
requestOptions []frankenphp.RequestOption
|
||||
absFileName string
|
||||
matchRelPath string // pre-computed relative URL path for fast matching
|
||||
}
|
||||
|
||||
func unmarshalWorker(d *caddyfile.Dispenser) (workerConfig, error) {
|
||||
@@ -171,15 +174,28 @@ func (wc *workerConfig) inheritEnv(env map[string]string) {
|
||||
}
|
||||
|
||||
func (wc *workerConfig) matchesPath(r *http.Request, documentRoot string) bool {
|
||||
|
||||
// try to match against a pattern if one is assigned
|
||||
if len(wc.MatchPath) != 0 {
|
||||
return (caddyhttp.MatchPath)(wc.MatchPath).Match(r)
|
||||
}
|
||||
|
||||
// if there is no pattern, try to match against the actual path (in the public directory)
|
||||
fullScriptPath, _ := fastabs.FastAbs(documentRoot + "/" + r.URL.Path)
|
||||
absFileName, _ := fastabs.FastAbs(wc.FileName)
|
||||
// fast path: compare the request URL path against the pre-computed relative path
|
||||
if wc.matchRelPath != "" {
|
||||
reqPath := r.URL.Path
|
||||
if reqPath == wc.matchRelPath {
|
||||
return true
|
||||
}
|
||||
|
||||
return fullScriptPath == absFileName
|
||||
// ensure leading slash for relative paths (see #2166)
|
||||
if reqPath == "" || reqPath[0] != '/' {
|
||||
reqPath = "/" + reqPath
|
||||
}
|
||||
|
||||
return path.Clean(reqPath) == wc.matchRelPath
|
||||
}
|
||||
|
||||
// fallback when documentRoot is dynamic (contains placeholders)
|
||||
fullPath, _ := fastabs.FastAbs(filepath.Join(documentRoot, r.URL.Path))
|
||||
|
||||
return fullPath == wc.absFileName
|
||||
}
|
||||
|
||||
33
cgi.go
33
cgi.go
@@ -67,6 +67,20 @@ var knownServerKeys = []string{
|
||||
"REQUEST_URI",
|
||||
}
|
||||
|
||||
// cStringHTTPMethods caches C string versions of common HTTP methods
|
||||
// to avoid allocations in pinCString on every request.
|
||||
var cStringHTTPMethods = map[string]*C.char{
|
||||
"GET": C.CString("GET"),
|
||||
"HEAD": C.CString("HEAD"),
|
||||
"POST": C.CString("POST"),
|
||||
"PUT": C.CString("PUT"),
|
||||
"DELETE": C.CString("DELETE"),
|
||||
"CONNECT": C.CString("CONNECT"),
|
||||
"OPTIONS": C.CString("OPTIONS"),
|
||||
"TRACE": C.CString("TRACE"),
|
||||
"PATCH": C.CString("PATCH"),
|
||||
}
|
||||
|
||||
// computeKnownVariables returns a set of CGI environment variables for the request.
|
||||
//
|
||||
// TODO: handle this case https://github.com/caddyserver/caddy/issues/3718
|
||||
@@ -84,8 +98,9 @@ func addKnownVariablesToServer(fc *frankenPHPContext, trackVarsArray *C.zval) {
|
||||
}
|
||||
|
||||
// Remove [] from IPv6 addresses
|
||||
ip = strings.Replace(ip, "[", "", 1)
|
||||
ip = strings.Replace(ip, "]", "", 1)
|
||||
if len(ip) > 0 && ip[0] == '[' {
|
||||
ip = ip[1 : len(ip)-1]
|
||||
}
|
||||
|
||||
var https, sslProtocol, sslCipher, rs string
|
||||
|
||||
@@ -98,7 +113,7 @@ func addKnownVariablesToServer(fc *frankenPHPContext, trackVarsArray *C.zval) {
|
||||
rs = "https"
|
||||
https = "on"
|
||||
|
||||
// and pass the protocol details in a manner compatible with apache's mod_ssl
|
||||
// and pass the protocol details in a manner compatible with Apache's mod_ssl
|
||||
// (which is why these have an SSL_ prefix and not TLS_).
|
||||
if v, ok := tlsProtocolStrings[request.TLS.Version]; ok {
|
||||
sslProtocol = v
|
||||
@@ -138,7 +153,7 @@ func addKnownVariablesToServer(fc *frankenPHPContext, trackVarsArray *C.zval) {
|
||||
if fc.originalRequest != nil {
|
||||
requestURI = fc.originalRequest.URL.RequestURI()
|
||||
} else {
|
||||
requestURI = request.URL.RequestURI()
|
||||
requestURI = fc.requestURI
|
||||
}
|
||||
|
||||
C.frankenphp_register_bulk(
|
||||
@@ -252,7 +267,7 @@ func splitCgiPath(fc *frankenPHPContext) {
|
||||
// TODO: is it possible to delay this and avoid saving everything in the context?
|
||||
// SCRIPT_FILENAME is the absolute path of SCRIPT_NAME
|
||||
fc.scriptFilename = sanitizedPathJoin(fc.documentRoot, fc.scriptName)
|
||||
fc.worker = getWorkerByPath(fc.scriptFilename)
|
||||
fc.worker = workersByPath[fc.scriptFilename]
|
||||
}
|
||||
|
||||
var splitSearchNonASCII = search.New(language.Und, search.IgnoreCase)
|
||||
@@ -329,7 +344,11 @@ func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info)
|
||||
return nil
|
||||
}
|
||||
|
||||
info.request_method = thread.pinCString(request.Method)
|
||||
if m, ok := cStringHTTPMethods[request.Method]; ok {
|
||||
info.request_method = m
|
||||
} else {
|
||||
info.request_method = thread.pinCString(request.Method)
|
||||
}
|
||||
info.query_string = thread.pinCString(request.URL.RawQuery)
|
||||
info.content_length = C.zend_long(request.ContentLength)
|
||||
|
||||
@@ -341,7 +360,7 @@ func go_update_request_info(threadIndex C.uintptr_t, info *C.sapi_request_info)
|
||||
info.path_translated = thread.pinCString(sanitizedPathJoin(fc.documentRoot, fc.pathInfo)) // See: http://www.oreilly.com/openbook/cgi/ch02_04.html
|
||||
}
|
||||
|
||||
info.request_uri = thread.pinCString(request.URL.RequestURI())
|
||||
info.request_uri = thread.pinCString(fc.requestURI)
|
||||
|
||||
info.proto_num = C.int(request.ProtoMajor*1000 + request.ProtoMinor)
|
||||
|
||||
|
||||
10
context.go
10
context.go
@@ -28,13 +28,15 @@ type frankenPHPContext struct {
|
||||
pathInfo string
|
||||
scriptName string
|
||||
scriptFilename string
|
||||
requestURI string
|
||||
|
||||
// Whether the request is already closed by us
|
||||
isDone bool
|
||||
|
||||
responseWriter http.ResponseWriter
|
||||
handlerParameters any
|
||||
handlerReturn any
|
||||
responseWriter http.ResponseWriter
|
||||
responseController *http.ResponseController
|
||||
handlerParameters any
|
||||
handlerReturn any
|
||||
|
||||
done chan any
|
||||
startedAt time.Time
|
||||
@@ -93,6 +95,8 @@ func NewRequestWithContext(r *http.Request, opts ...RequestOption) (*http.Reques
|
||||
splitCgiPath(fc)
|
||||
}
|
||||
|
||||
fc.requestURI = r.URL.RequestURI()
|
||||
|
||||
c := context.WithValue(r.Context(), contextKey, fc)
|
||||
|
||||
return r.WithContext(c), nil
|
||||
|
||||
@@ -611,7 +611,10 @@ func go_sapi_flush(threadIndex C.uintptr_t) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
if err := http.NewResponseController(fc.responseWriter).Flush(); err != nil {
|
||||
if fc.responseController == nil {
|
||||
fc.responseController = http.NewResponseController(fc.responseWriter)
|
||||
}
|
||||
if err := fc.responseController.Flush(); err != nil {
|
||||
ctx := thread.context()
|
||||
|
||||
if globalLogger.Enabled(ctx, slog.LevelWarn) {
|
||||
@@ -683,34 +686,28 @@ func getLogger(threadIndex C.uintptr_t) (*slog.Logger, context.Context) {
|
||||
func go_log(threadIndex C.uintptr_t, message *C.char, level C.int) {
|
||||
logger, ctx := getLogger(threadIndex)
|
||||
|
||||
m := C.GoString(message)
|
||||
le := syslogLevelInfo
|
||||
|
||||
if level >= C.int(syslogLevelEmerg) && level <= C.int(syslogLevelDebug) {
|
||||
le = syslogLevel(level)
|
||||
}
|
||||
|
||||
var slogLevel slog.Level
|
||||
switch le {
|
||||
case syslogLevelEmerg, syslogLevelAlert, syslogLevelCrit, syslogLevelErr:
|
||||
if logger.Enabled(ctx, slog.LevelError) {
|
||||
logger.LogAttrs(ctx, slog.LevelError, m, slog.String("syslog_level", le.String()))
|
||||
}
|
||||
|
||||
slogLevel = slog.LevelError
|
||||
case syslogLevelWarn:
|
||||
if logger.Enabled(ctx, slog.LevelWarn) {
|
||||
logger.LogAttrs(ctx, slog.LevelWarn, m, slog.String("syslog_level", le.String()))
|
||||
}
|
||||
|
||||
slogLevel = slog.LevelWarn
|
||||
case syslogLevelDebug:
|
||||
if logger.Enabled(ctx, slog.LevelDebug) {
|
||||
logger.LogAttrs(ctx, slog.LevelDebug, m, slog.String("syslog_level", le.String()))
|
||||
}
|
||||
|
||||
slogLevel = slog.LevelDebug
|
||||
default:
|
||||
if logger.Enabled(ctx, slog.LevelInfo) {
|
||||
logger.LogAttrs(ctx, slog.LevelInfo, m, slog.String("syslog_level", le.String()))
|
||||
}
|
||||
slogLevel = slog.LevelInfo
|
||||
}
|
||||
|
||||
if !logger.Enabled(ctx, slogLevel) {
|
||||
return
|
||||
}
|
||||
|
||||
logger.LogAttrs(ctx, slogLevel, C.GoString(message), slog.String("syslog_level", le.String()))
|
||||
}
|
||||
|
||||
//export go_log_attrs
|
||||
@@ -805,6 +802,8 @@ func resetGlobals() {
|
||||
globalCtx = context.Background()
|
||||
globalLogger = slog.Default()
|
||||
workers = nil
|
||||
workersByName = nil
|
||||
workersByPath = nil
|
||||
watcherIsEnabled = false
|
||||
globalMu.Unlock()
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ func init() {
|
||||
}
|
||||
|
||||
canonicalWD, err := filepath.EvalSymlinks(wd)
|
||||
if err != nil {
|
||||
if err == nil {
|
||||
wd = canonicalWD
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,40 +4,71 @@ import "C"
|
||||
import (
|
||||
"slices"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type State string
|
||||
type State int
|
||||
|
||||
const (
|
||||
// lifecycle States of a thread
|
||||
Reserved State = "reserved"
|
||||
Booting State = "booting"
|
||||
BootRequested State = "boot requested"
|
||||
ShuttingDown State = "shutting down"
|
||||
Done State = "done"
|
||||
Reserved State = iota
|
||||
Booting
|
||||
BootRequested
|
||||
ShuttingDown
|
||||
Done
|
||||
|
||||
// these States are 'stable' and safe to transition from at any time
|
||||
Inactive State = "inactive"
|
||||
Ready State = "ready"
|
||||
Inactive
|
||||
Ready
|
||||
|
||||
// States necessary for restarting workers
|
||||
Restarting State = "restarting"
|
||||
Yielding State = "yielding"
|
||||
Restarting
|
||||
Yielding
|
||||
|
||||
// States necessary for transitioning between different handlers
|
||||
TransitionRequested State = "transition requested"
|
||||
TransitionInProgress State = "transition in progress"
|
||||
TransitionComplete State = "transition complete"
|
||||
TransitionRequested
|
||||
TransitionInProgress
|
||||
TransitionComplete
|
||||
)
|
||||
|
||||
func (s State) String() string {
|
||||
switch s {
|
||||
case Reserved:
|
||||
return "reserved"
|
||||
case Booting:
|
||||
return "booting"
|
||||
case BootRequested:
|
||||
return "boot requested"
|
||||
case ShuttingDown:
|
||||
return "shutting down"
|
||||
case Done:
|
||||
return "done"
|
||||
case Inactive:
|
||||
return "inactive"
|
||||
case Ready:
|
||||
return "ready"
|
||||
case Restarting:
|
||||
return "restarting"
|
||||
case Yielding:
|
||||
return "yielding"
|
||||
case TransitionRequested:
|
||||
return "transition requested"
|
||||
case TransitionInProgress:
|
||||
return "transition in progress"
|
||||
case TransitionComplete:
|
||||
return "transition complete"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
type ThreadState struct {
|
||||
currentState State
|
||||
mu sync.RWMutex
|
||||
subscribers []stateSubscriber
|
||||
// how long threads have been waiting in stable states
|
||||
waitingSince time.Time
|
||||
isWaiting bool
|
||||
// how long threads have been waiting in stable states (unix ms, 0 = not waiting)
|
||||
waitingSince atomic.Int64
|
||||
}
|
||||
|
||||
type stateSubscriber struct {
|
||||
@@ -74,7 +105,7 @@ func (ts *ThreadState) CompareAndSwap(compareTo State, swapTo State) bool {
|
||||
}
|
||||
|
||||
func (ts *ThreadState) Name() string {
|
||||
return string(ts.Get())
|
||||
return ts.Get().String()
|
||||
}
|
||||
|
||||
func (ts *ThreadState) Get() State {
|
||||
@@ -97,20 +128,17 @@ func (ts *ThreadState) notifySubscribers(nextState State) {
|
||||
return
|
||||
}
|
||||
|
||||
var newSubscribers []stateSubscriber
|
||||
|
||||
// notify subscribers to the state change
|
||||
n := 0
|
||||
for _, sub := range ts.subscribers {
|
||||
if !slices.Contains(sub.states, nextState) {
|
||||
newSubscribers = append(newSubscribers, sub)
|
||||
|
||||
ts.subscribers[n] = sub
|
||||
n++
|
||||
continue
|
||||
}
|
||||
|
||||
close(sub.ch)
|
||||
}
|
||||
|
||||
ts.subscribers = newSubscribers
|
||||
ts.subscribers = ts.subscribers[:n]
|
||||
}
|
||||
|
||||
// block until the thread reaches a certain state
|
||||
@@ -156,39 +184,27 @@ func (ts *ThreadState) RequestSafeStateChange(nextState State) bool {
|
||||
|
||||
// MarkAsWaiting hints that the thread reached a stable state and is waiting for requests or shutdown
|
||||
func (ts *ThreadState) MarkAsWaiting(isWaiting bool) {
|
||||
ts.mu.Lock()
|
||||
if isWaiting {
|
||||
ts.isWaiting = true
|
||||
ts.waitingSince = time.Now()
|
||||
ts.waitingSince.Store(time.Now().UnixMilli())
|
||||
} else {
|
||||
ts.isWaiting = false
|
||||
ts.waitingSince.Store(0)
|
||||
}
|
||||
ts.mu.Unlock()
|
||||
}
|
||||
|
||||
// IsInWaitingState returns true if a thread is waiting for a request or shutdown
|
||||
func (ts *ThreadState) IsInWaitingState() bool {
|
||||
ts.mu.RLock()
|
||||
isWaiting := ts.isWaiting
|
||||
ts.mu.RUnlock()
|
||||
|
||||
return isWaiting
|
||||
return ts.waitingSince.Load() != 0
|
||||
}
|
||||
|
||||
// WaitTime returns the time since the thread is waiting in a stable state in ms
|
||||
func (ts *ThreadState) WaitTime() int64 {
|
||||
ts.mu.RLock()
|
||||
waitTime := int64(0)
|
||||
if ts.isWaiting {
|
||||
waitTime = time.Now().UnixMilli() - ts.waitingSince.UnixMilli()
|
||||
since := ts.waitingSince.Load()
|
||||
if since == 0 {
|
||||
return 0
|
||||
}
|
||||
ts.mu.RUnlock()
|
||||
|
||||
return waitTime
|
||||
return time.Now().UnixMilli() - since
|
||||
}
|
||||
|
||||
func (ts *ThreadState) SetWaitTime(t time.Time) {
|
||||
ts.mu.Lock()
|
||||
ts.waitingSince = t
|
||||
ts.mu.Unlock()
|
||||
ts.waitingSince.Store(t.UnixMilli())
|
||||
}
|
||||
|
||||
@@ -198,6 +198,9 @@ func go_get_custom_php_ini(disableTimeouts C.bool) *C.char {
|
||||
// Pass the php.ini overrides to PHP before startup
|
||||
// TODO: if needed this would also be possible on a per-thread basis
|
||||
var overrides strings.Builder
|
||||
|
||||
// 32 is an over-estimate for php.ini settings
|
||||
overrides.Grow(len(mainThread.phpIni) * 32)
|
||||
for k, v := range mainThread.phpIni {
|
||||
overrides.WriteString(k)
|
||||
overrides.WriteByte('=')
|
||||
|
||||
@@ -185,8 +185,12 @@ func TestFinishBootingAWorkerScript(t *testing.T) {
|
||||
|
||||
func TestReturnAnErrorIf2WorkersHaveTheSameFileName(t *testing.T) {
|
||||
workers = []*worker{}
|
||||
workersByName = map[string]*worker{}
|
||||
workersByPath = map[string]*worker{}
|
||||
w, err1 := newWorker(workerOpt{fileName: testDataPath + "/index.php"})
|
||||
workers = append(workers, w)
|
||||
workersByName[w.name] = w
|
||||
workersByPath[w.fileName] = w
|
||||
_, err2 := newWorker(workerOpt{fileName: testDataPath + "/index.php"})
|
||||
|
||||
assert.NoError(t, err1)
|
||||
@@ -195,8 +199,12 @@ func TestReturnAnErrorIf2WorkersHaveTheSameFileName(t *testing.T) {
|
||||
|
||||
func TestReturnAnErrorIf2ModuleWorkersHaveTheSameName(t *testing.T) {
|
||||
workers = []*worker{}
|
||||
workersByName = map[string]*worker{}
|
||||
workersByPath = map[string]*worker{}
|
||||
w, err1 := newWorker(workerOpt{fileName: testDataPath + "/index.php", name: "workername"})
|
||||
workers = append(workers, w)
|
||||
workersByName[w.name] = w
|
||||
workersByPath[w.fileName] = w
|
||||
_, err2 := newWorker(workerOpt{fileName: testDataPath + "/hello.php", name: "workername"})
|
||||
|
||||
assert.NoError(t, err1)
|
||||
@@ -242,9 +250,9 @@ func allPossibleTransitions(worker1Path string, worker2Path string) []func(*phpT
|
||||
thread.boot()
|
||||
}
|
||||
},
|
||||
func(thread *phpThread) { convertToWorkerThread(thread, getWorkerByPath(worker1Path)) },
|
||||
func(thread *phpThread) { convertToWorkerThread(thread, workersByPath[worker1Path]) },
|
||||
convertToInactiveThread,
|
||||
func(thread *phpThread) { convertToWorkerThread(thread, getWorkerByPath(worker2Path)) },
|
||||
func(thread *phpThread) { convertToWorkerThread(thread, workersByPath[worker2Path]) },
|
||||
convertToInactiveThread,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ type phpThread struct {
|
||||
threadIndex int
|
||||
requestChan chan contextHolder
|
||||
drainChan chan struct{}
|
||||
handlerMu sync.Mutex
|
||||
handlerMu sync.RWMutex
|
||||
handler threadHandler
|
||||
state *state.ThreadState
|
||||
sandboxedEnv map[string]*C.zend_string
|
||||
@@ -120,9 +120,9 @@ func (thread *phpThread) context() context.Context {
|
||||
}
|
||||
|
||||
func (thread *phpThread) name() string {
|
||||
thread.handlerMu.Lock()
|
||||
thread.handlerMu.RLock()
|
||||
name := thread.handler.name()
|
||||
thread.handlerMu.Unlock()
|
||||
thread.handlerMu.RUnlock()
|
||||
return name
|
||||
}
|
||||
|
||||
|
||||
@@ -158,7 +158,7 @@ func WithRequestLogger(logger *slog.Logger) RequestOption {
|
||||
func WithWorkerName(name string) RequestOption {
|
||||
return func(o *frankenPHPContext) error {
|
||||
if name != "" {
|
||||
o.worker = getWorkerByName(name)
|
||||
o.worker = workersByName[name]
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
20
scaling.go
20
scaling.go
@@ -84,6 +84,11 @@ func addWorkerThread(worker *worker) (*phpThread, error) {
|
||||
|
||||
// scaleWorkerThread adds a worker PHP thread automatically
|
||||
func scaleWorkerThread(worker *worker) {
|
||||
// probe CPU usage before acquiring the lock (avoids holding lock during 120ms sleep)
|
||||
if !cpu.ProbeCPUs(cpuProbeTime, maxCpuUsageForScaling, mainThread.done) {
|
||||
return
|
||||
}
|
||||
|
||||
scalingMu.Lock()
|
||||
defer scalingMu.Unlock()
|
||||
|
||||
@@ -91,11 +96,6 @@ func scaleWorkerThread(worker *worker) {
|
||||
return
|
||||
}
|
||||
|
||||
// probe CPU usage before scaling
|
||||
if !cpu.ProbeCPUs(cpuProbeTime, maxCpuUsageForScaling, mainThread.done) {
|
||||
return
|
||||
}
|
||||
|
||||
thread, err := addWorkerThread(worker)
|
||||
if err != nil {
|
||||
if globalLogger.Enabled(globalCtx, slog.LevelWarn) {
|
||||
@@ -114,6 +114,11 @@ func scaleWorkerThread(worker *worker) {
|
||||
|
||||
// scaleRegularThread adds a regular PHP thread automatically
|
||||
func scaleRegularThread() {
|
||||
// probe CPU usage before acquiring the lock (avoids holding lock during 120ms sleep)
|
||||
if !cpu.ProbeCPUs(cpuProbeTime, maxCpuUsageForScaling, mainThread.done) {
|
||||
return
|
||||
}
|
||||
|
||||
scalingMu.Lock()
|
||||
defer scalingMu.Unlock()
|
||||
|
||||
@@ -121,11 +126,6 @@ func scaleRegularThread() {
|
||||
return
|
||||
}
|
||||
|
||||
// probe CPU usage before scaling
|
||||
if !cpu.ProbeCPUs(cpuProbeTime, maxCpuUsageForScaling, mainThread.done) {
|
||||
return
|
||||
}
|
||||
|
||||
thread, err := addRegularThread()
|
||||
if err != nil {
|
||||
if globalLogger.Enabled(globalCtx, slog.LevelWarn) {
|
||||
|
||||
@@ -47,7 +47,7 @@ func TestScaleAWorkerThreadUpAndDown(t *testing.T) {
|
||||
autoScaledThread := phpThreads[2]
|
||||
|
||||
// scale up
|
||||
scaleWorkerThread(getWorkerByPath(workerPath))
|
||||
scaleWorkerThread(workersByPath[workerPath])
|
||||
assert.Equal(t, state.Ready, autoScaledThread.state.Get())
|
||||
|
||||
// on down-scale, the thread will be marked as inactive
|
||||
|
||||
32
worker.go
32
worker.go
@@ -37,6 +37,8 @@ type worker struct {
|
||||
|
||||
var (
|
||||
workers []*worker
|
||||
workersByName map[string]*worker
|
||||
workersByPath map[string]*worker
|
||||
watcherIsEnabled bool
|
||||
startupFailChan chan error
|
||||
)
|
||||
@@ -52,6 +54,8 @@ func initWorkers(opt []workerOpt) error {
|
||||
)
|
||||
|
||||
workers = make([]*worker, 0, len(opt))
|
||||
workersByName = make(map[string]*worker, len(opt))
|
||||
workersByPath = make(map[string]*worker, len(opt))
|
||||
|
||||
for _, o := range opt {
|
||||
w, err := newWorker(o)
|
||||
@@ -61,6 +65,10 @@ func initWorkers(opt []workerOpt) error {
|
||||
|
||||
totalThreadsToStart += w.num
|
||||
workers = append(workers, w)
|
||||
workersByName[w.name] = w
|
||||
if w.allowPathMatching {
|
||||
workersByPath[w.fileName] = w
|
||||
}
|
||||
}
|
||||
|
||||
startupFailChan = make(chan error, totalThreadsToStart)
|
||||
@@ -90,25 +98,6 @@ func initWorkers(opt []workerOpt) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func getWorkerByName(name string) *worker {
|
||||
for _, w := range workers {
|
||||
if w.name == name {
|
||||
return w
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getWorkerByPath(path string) *worker {
|
||||
for _, w := range workers {
|
||||
if w.fileName == path && w.allowPathMatching {
|
||||
return w
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func newWorker(o workerOpt) (*worker, error) {
|
||||
// Order is important!
|
||||
@@ -118,6 +107,7 @@ func newWorker(o workerOpt) (*worker, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
|
||||
}
|
||||
|
||||
absFileName, err = fastabs.FastAbs(absFileName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
|
||||
@@ -135,10 +125,10 @@ func newWorker(o workerOpt) (*worker, error) {
|
||||
// they can only be matched by their name, not by their path
|
||||
allowPathMatching := !strings.HasPrefix(o.name, "m#")
|
||||
|
||||
if w := getWorkerByPath(absFileName); w != nil && allowPathMatching {
|
||||
if w := workersByPath[absFileName]; w != nil && allowPathMatching {
|
||||
return w, fmt.Errorf("two workers cannot have the same filename: %q", absFileName)
|
||||
}
|
||||
if w := getWorkerByName(o.name); w != nil {
|
||||
if w := workersByName[o.name]; w != nil {
|
||||
return w, fmt.Errorf("two workers cannot have the same name: %q", o.name)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user