Files
archived-frankenphp/scaling.go
Mads Jon Nielsen c099d665a2 feat(caddy): configurable max_idle_time for autoscaled threads (#2225)
Add configurable max_idle_time for autoscaled threads

The idle timeout for autoscaled threads is currently hardcoded to 5
seconds. With bursty traffic patterns, this causes threads to be
deactivated too quickly, leading to repeated cold-start overhead when
the next burst arrives.

This PR replaces the hardcoded constant with a configurable
max_idle_time directive, allowing users to tune how long idle
autoscaled threads stay alive before deactivation. The default remains 5
seconds, preserving existing behavior.

  Usage:

  Caddyfile:
````
  frankenphp {
      max_idle_time 30s
  }
````
  JSON config:
```
  {
      "frankenphp": {
          "max_idle_time": "30s"
      }
  }
````

  Changes:
  - New max_idle_time Caddyfile directive and JSON config option
  - New WithMaxIdleTime functional option
- Replaced hardcoded maxThreadIdleTime constant with configurable
maxIdleTime variable
  - Added tests for custom and default idle time behavior
  - Updated docs
2026-03-06 14:43:37 +01:00

249 lines
7.2 KiB
Go

package frankenphp
import (
"errors"
"log/slog"
"sync"
"time"
"github.com/dunglas/frankenphp/internal/cpu"
"github.com/dunglas/frankenphp/internal/state"
)
const (
// requests have to be stalled for at least this amount of time before scaling
minStallTime = 5 * time.Millisecond
// time to check for CPU usage before scaling a single thread
cpuProbeTime = 120 * time.Millisecond
// do not scale over this amount of CPU usage
maxCpuUsageForScaling = 0.8
// downscale idle threads every x seconds
downScaleCheckTime = 5 * time.Second
// max amount of threads stopped in one iteration of downScaleCheckTime
maxTerminationCount = 10
// default time an autoscaled thread may be idle before being deactivated
defaultMaxIdleTime = 5 * time.Second
)
var (
ErrMaxThreadsReached = errors.New("max amount of overall threads reached")
maxIdleTime = defaultMaxIdleTime
scaleChan chan *frankenPHPContext
autoScaledThreads = []*phpThread{}
scalingMu = new(sync.RWMutex)
)
func initAutoScaling(mainThread *phpMainThread) {
if mainThread.maxThreads <= mainThread.numThreads {
scaleChan = nil
return
}
scalingMu.Lock()
scaleChan = make(chan *frankenPHPContext)
maxScaledThreads := mainThread.maxThreads - mainThread.numThreads
autoScaledThreads = make([]*phpThread, 0, maxScaledThreads)
scalingMu.Unlock()
go startUpscalingThreads(maxScaledThreads, scaleChan, mainThread.done)
go startDownScalingThreads(mainThread.done)
}
func drainAutoScaling() {
scalingMu.Lock()
if globalLogger.Enabled(globalCtx, slog.LevelDebug) {
globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "shutting down autoscaling", slog.Int("autoScaledThreads", len(autoScaledThreads)))
}
scalingMu.Unlock()
}
func addRegularThread() (*phpThread, error) {
thread := getInactivePHPThread()
if thread == nil {
return nil, ErrMaxThreadsReached
}
convertToRegularThread(thread)
thread.state.WaitFor(state.Ready, state.ShuttingDown, state.Reserved)
return thread, nil
}
func addWorkerThread(worker *worker) (*phpThread, error) {
thread := getInactivePHPThread()
if thread == nil {
return nil, ErrMaxThreadsReached
}
convertToWorkerThread(thread, worker)
thread.state.WaitFor(state.Ready, state.ShuttingDown, state.Reserved)
return thread, nil
}
// scaleWorkerThread adds a worker PHP thread automatically
func scaleWorkerThread(worker *worker) {
// probe CPU usage before acquiring the lock (avoids holding lock during 120ms sleep)
if !cpu.ProbeCPUs(cpuProbeTime, maxCpuUsageForScaling, mainThread.done) {
return
}
scalingMu.Lock()
defer scalingMu.Unlock()
if !mainThread.state.Is(state.Ready) {
return
}
thread, err := addWorkerThread(worker)
if err != nil {
if globalLogger.Enabled(globalCtx, slog.LevelWarn) {
globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "could not increase max_threads, consider raising this limit", slog.String("worker", worker.name), slog.Any("error", err))
}
return
}
autoScaledThreads = append(autoScaledThreads, thread)
if globalLogger.Enabled(globalCtx, slog.LevelInfo) {
globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "upscaling worker thread", slog.String("worker", worker.name), slog.Int("thread", thread.threadIndex), slog.Int("num_threads", len(autoScaledThreads)))
}
}
// scaleRegularThread adds a regular PHP thread automatically
func scaleRegularThread() {
// probe CPU usage before acquiring the lock (avoids holding lock during 120ms sleep)
if !cpu.ProbeCPUs(cpuProbeTime, maxCpuUsageForScaling, mainThread.done) {
return
}
scalingMu.Lock()
defer scalingMu.Unlock()
if !mainThread.state.Is(state.Ready) {
return
}
thread, err := addRegularThread()
if err != nil {
if globalLogger.Enabled(globalCtx, slog.LevelWarn) {
globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "could not increase max_threads, consider raising this limit", slog.Any("error", err))
}
return
}
autoScaledThreads = append(autoScaledThreads, thread)
if globalLogger.Enabled(globalCtx, slog.LevelInfo) {
globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "upscaling regular thread", slog.Int("thread", thread.threadIndex), slog.Int("num_threads", len(autoScaledThreads)))
}
}
func startUpscalingThreads(maxScaledThreads int, scale chan *frankenPHPContext, done chan struct{}) {
for {
scalingMu.Lock()
scaledThreadCount := len(autoScaledThreads)
scalingMu.Unlock()
if scaledThreadCount >= maxScaledThreads {
// we have reached max_threads, check again later
select {
case <-done:
return
case <-time.After(downScaleCheckTime):
continue
}
}
select {
case fc := <-scale:
timeSinceStalled := time.Since(fc.startedAt)
// if the request has not been stalled long enough, wait and repeat
if timeSinceStalled < minStallTime {
select {
case <-done:
return
case <-time.After(minStallTime - timeSinceStalled):
continue
}
}
// if the request has been stalled long enough, scale
if fc.worker == nil {
scaleRegularThread()
continue
}
// check for max worker threads here again in case requests overflowed while waiting
if fc.worker.isAtThreadLimit() {
if globalLogger.Enabled(globalCtx, slog.LevelInfo) {
globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "cannot scale worker thread, max threads reached for worker", slog.String("worker", fc.worker.name))
}
continue
}
scaleWorkerThread(fc.worker)
case <-done:
return
}
}
}
func startDownScalingThreads(done chan struct{}) {
for {
select {
case <-done:
return
case <-time.After(downScaleCheckTime):
deactivateThreads()
}
}
}
// deactivateThreads checks all threads and removes those that have been inactive for too long
func deactivateThreads() {
stoppedThreadCount := 0
scalingMu.Lock()
defer scalingMu.Unlock()
for i := len(autoScaledThreads) - 1; i >= 0; i-- {
thread := autoScaledThreads[i]
// the thread might have been stopped otherwise, remove it
if thread.state.Is(state.Reserved) {
autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...)
continue
}
waitTime := thread.state.WaitTime()
if stoppedThreadCount > maxTerminationCount || waitTime == 0 {
continue
}
// convert threads to inactive if they have been idle for too long
if thread.state.Is(state.Ready) && waitTime > maxIdleTime.Milliseconds() {
convertToInactiveThread(thread)
stoppedThreadCount++
autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...)
if globalLogger.Enabled(globalCtx, slog.LevelInfo) {
globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "downscaling thread", slog.Int("thread", thread.threadIndex), slog.Int64("wait_time", waitTime), slog.Int("num_threads", len(autoScaledThreads)))
}
continue
}
// TODO: Completely stopping threads is more memory efficient
// Some PECL extensions like #1296 will prevent threads from fully stopping (they leak memory)
// Reactivate this if there is a better solution or workaround
// if thread.state.Is(state.Inactive) && waitTime > maxThreadIdleTime.Milliseconds() {
// logger.LogAttrs(nil, slog.LevelDebug, "auto-stopping thread", slog.Int("thread", thread.threadIndex))
// thread.shutdown()
// stoppedThreadCount++
// autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...)
// continue
// }
}
}