mirror of
https://github.com/php/frankenphp.git
synced 2026-03-24 00:52:11 +01:00
Use sync.Pool to remove channel contention (#2025)
This PR adds `sync.Pool` direct handoff instead to both regular and worker threads, building on the semaphore-based FIFO approach in `fix/latency`. The pool eliminates the remaining channel contention and provides significant tail latency improvements, particularly for workers. By using `sync.Pool`, we achieve a lock-free per-P dispatch, reducing contention dramatically. The dispatcher tries the pool first and then falls back to the semaphore implementation. Benchmark Results Workers (8 threads, 500 connections, 15s) | Configuration | Req/sec | P50 | P75 | P90 | P99 | Threads | Global RQ | |----------------|---------|--------|--------|---------|---------|---------|-----------| | latency branch | 78,161 | 6.03ms | 8.08ms | 11.19ms | 18.38ms | 45 | 145 | | pool (this PR) | 76,259 | 6.06ms | 7.45ms | 9.03ms | 12.99ms | 46 | 217 | | Improvement | -2.4% | +0.5% | -7.8% | -19.3% | -29.3% | +1 | +49.7% | Regular Threads (8 threads, 500 connections, 15s) | Configuration | Req/sec | P50 | P75 | P90 | P99 | Threads | Global RQ | |----------------|---------|---------|---------|---------|---------|---------|-----------| | latency branch | 42,096 | 11.46ms | 12.35ms | 13.35ms | 17.18ms | 62 | 3 | | pool (this PR) | 42,592 | 11.40ms | 12.26ms | 13.27ms | 15.95ms | 67 | 11 | | Improvement | +1.2% | -0.5% | -0.7% | -0.6% | -7.2% | +5 | +267% | Low Load (10 connections, 1 thread, 15s) - Regular Threads Only to show no difference | Configuration | Req/sec | P50 | P99 | |----------------|---------|-------|-------| | baseline (main) | 38,354 | 222µs | 650µs | | latency branch | 37,977 | 225µs | 657µs | | pool (this PR) | 38,584 | 222µs | 643µs | | Improvement | +1.6% | -1.3% | -2.1% | ## Thread Affinity Tradeoff Workers previously had a low-index affinity and would target the same threads under a low load (t[0], t[1], etc.). This minimised resource initialisation when frameworks lazily created resources (e.g. database connections). The new behaviour in this PR uses `sync.Pool` which uses per-P (processor) locality, distributing requests across multiple threads even under low loads. I think this is actually a good thing, as the previous behaviour is actively dangerous in production scenarios. Consider a scenario with 120 worker threads for an i/o heavy workload (this is actually our advice in multiple issues). Under normal load, maybe only t[0-80] are usually active, and thus only 80 database connections are open. On a Black Friday, the load spikes, and we activate t[101] for the first time, but it exceeds a database connection limit of 100, causing cascading failures during peak loads. This is the worst possible time to discover resource limits. With `sync.Pool`, a normal load eventually cycles through all 120 workers, ensuring no surprises under load. It’s worth noting that with per-P locality there’s a high probability you’ll get the same worker on the same connection, keeping various caches (CPU L1/L2/L3, etc.). This is actually probably better than the low-index affinity for cache efficiency. ## Further improvements Further improvements will result in diminishing returns. Based on eBPF profiling of the pool implementation under load, the remaining overhead is well-distributed: Syscall profile: - futex: 902K calls, 211s total: significantly reduced from baseline’s 1.1M+ calls - sched_yield: 58K calls, 5.3s: intentional scheduler balancing (kept from earlier work) - File I/O (openat/newfstatat/close): ~2.8M operations, 11s: PHP script execution - nanosleep: 177K calls, 47s: timing operations Off-CPU profile shows time is now spent primarily on: - PHP memory allocation (_emalloc_192) and hash operations (zend_hash_index_find) - Go work-stealing and scheduling (normal runtime overhead) The Go-side dispatch optimisation in this PR has eliminated the primary bottleneck (futex contention on channels). The remaining time is spent on productive work (PHP execution) and unavoidable overhead (file I/O, timing). Future optimisation would need to focus on PHP internals, which are outside the scope of FrankenPHP’s Go dispatcher. * All benchmarks are done with significant tracing enabled and thus may be exaggerated under real workloads. --------- Signed-off-by: Robert Landers <landers.robert@gmail.com>
This commit is contained in:
@@ -13,8 +13,9 @@ import (
|
||||
type regularThread struct {
|
||||
contextHolder
|
||||
|
||||
state *threadState
|
||||
thread *phpThread
|
||||
state *threadState
|
||||
thread *phpThread
|
||||
workReady chan contextHolder // Channel to receive work directly
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -22,12 +23,14 @@ var (
|
||||
regularThreadMu = &sync.RWMutex{}
|
||||
regularRequestChan chan contextHolder
|
||||
regularSemaphore *semaphore.Weighted // FIFO admission control
|
||||
regularThreadPool sync.Pool // Pool of idle threads for direct handoff
|
||||
)
|
||||
|
||||
func convertToRegularThread(thread *phpThread) {
|
||||
thread.setHandler(®ularThread{
|
||||
thread: thread,
|
||||
state: thread.state,
|
||||
thread: thread,
|
||||
state: thread.state,
|
||||
workReady: make(chan contextHolder, 1), // Buffered to avoid blocking sender
|
||||
})
|
||||
attachRegularThread(thread)
|
||||
}
|
||||
@@ -77,13 +80,19 @@ func (handler *regularThread) waitForRequest() string {
|
||||
|
||||
handler.state.markAsWaiting(true)
|
||||
|
||||
var ch contextHolder
|
||||
// Put this thread in the pool for direct handoff
|
||||
regularThreadPool.Put(handler)
|
||||
|
||||
// Wait for work to be assigned (either via pool or fallback channel)
|
||||
var ch contextHolder
|
||||
select {
|
||||
case <-handler.thread.drainChan:
|
||||
// go back to beforeScriptExecution
|
||||
return handler.beforeScriptExecution()
|
||||
case ch = <-handler.workReady:
|
||||
// Work received via direct handoff from the pool
|
||||
case ch = <-regularRequestChan:
|
||||
// Fallback: work came via global channel
|
||||
}
|
||||
|
||||
handler.ctx = ch.ctx
|
||||
@@ -111,6 +120,18 @@ func handleRequestWithRegularPHPThreads(ch contextHolder) error {
|
||||
}
|
||||
defer regularSemaphore.Release(1)
|
||||
|
||||
// Fast path: try to get an idle thread from the pool
|
||||
if idle := regularThreadPool.Get(); idle != nil {
|
||||
handler := idle.(*regularThread)
|
||||
// Send work to the thread's dedicated channel
|
||||
handler.workReady <- ch
|
||||
metrics.DequeuedRequest()
|
||||
<-ch.frankenPHPContext.done
|
||||
metrics.StopRequest()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Slow path: no idle thread in pool, use the global channel
|
||||
regularRequestChan <- ch
|
||||
metrics.DequeuedRequest()
|
||||
<-ch.frankenPHPContext.done
|
||||
|
||||
@@ -22,7 +22,8 @@ type workerThread struct {
|
||||
workerFrankenPHPContext *frankenPHPContext
|
||||
workerContext context.Context
|
||||
backoff *exponentialBackoff
|
||||
isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet
|
||||
isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet
|
||||
workReady chan contextHolder // Channel to receive work directly from pool
|
||||
}
|
||||
|
||||
func convertToWorkerThread(thread *phpThread, worker *worker) {
|
||||
@@ -35,6 +36,7 @@ func convertToWorkerThread(thread *phpThread, worker *worker) {
|
||||
minBackoff: 100 * time.Millisecond,
|
||||
maxConsecutiveFailures: worker.maxConsecutiveFailures,
|
||||
},
|
||||
workReady: make(chan contextHolder, 1), // Buffered to avoid blocking sender
|
||||
})
|
||||
worker.attachThread(thread)
|
||||
}
|
||||
@@ -210,6 +212,9 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) {
|
||||
|
||||
handler.state.markAsWaiting(true)
|
||||
|
||||
// Put this thread in the pool for direct handoff
|
||||
handler.worker.threadPool.Put(handler)
|
||||
|
||||
var requestCH contextHolder
|
||||
select {
|
||||
case <-handler.thread.drainChan:
|
||||
@@ -225,7 +230,11 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) {
|
||||
|
||||
return false, nil
|
||||
case requestCH = <-handler.thread.requestChan:
|
||||
// Fast path: direct thread affinity
|
||||
case requestCH = <-handler.workReady:
|
||||
// Medium path: pool handoff
|
||||
case requestCH = <-handler.worker.requestChan:
|
||||
// Slow path: global channel
|
||||
}
|
||||
|
||||
handler.workerContext = requestCH.ctx
|
||||
|
||||
31
worker.go
31
worker.go
@@ -25,6 +25,7 @@ type worker struct {
|
||||
semaphore *semaphore.Weighted
|
||||
threads []*phpThread
|
||||
threadMutex sync.RWMutex
|
||||
threadPool sync.Pool // Pool of idle worker threads for direct handoff
|
||||
allowPathMatching bool
|
||||
maxConsecutiveFailures int
|
||||
onThreadReady func(int)
|
||||
@@ -249,23 +250,7 @@ func (worker *worker) isAtThreadLimit() bool {
|
||||
func (worker *worker) handleRequest(ch contextHolder) error {
|
||||
metrics.StartWorkerRequest(worker.name)
|
||||
|
||||
// dispatch requests to all worker threads in order
|
||||
worker.threadMutex.RLock()
|
||||
for _, thread := range worker.threads {
|
||||
select {
|
||||
case thread.requestChan <- ch:
|
||||
worker.threadMutex.RUnlock()
|
||||
<-ch.frankenPHPContext.done
|
||||
metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))
|
||||
|
||||
return nil
|
||||
default:
|
||||
// thread is busy, continue
|
||||
}
|
||||
}
|
||||
worker.threadMutex.RUnlock()
|
||||
|
||||
// if no thread was available, mark the request as queued and use semaphore admission control
|
||||
// mark the request as queued and use admission control
|
||||
metrics.QueuedWorkerRequest(worker.name)
|
||||
|
||||
workerScaleChan := scaleChan
|
||||
@@ -281,6 +266,18 @@ func (worker *worker) handleRequest(ch contextHolder) error {
|
||||
}
|
||||
defer worker.semaphore.Release(1)
|
||||
|
||||
// Fast path: try to get an idle thread from the pool
|
||||
if idle := worker.threadPool.Get(); idle != nil {
|
||||
handler := idle.(*workerThread)
|
||||
// Direct handoff - send work to the thread's dedicated channel
|
||||
handler.workReady <- ch
|
||||
metrics.DequeuedWorkerRequest(worker.name)
|
||||
<-ch.frankenPHPContext.done
|
||||
metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Slow path: no idle thread in pool, use the global channel
|
||||
worker.requestChan <- ch
|
||||
metrics.DequeuedWorkerRequest(worker.name)
|
||||
<-ch.frankenPHPContext.done
|
||||
|
||||
Reference in New Issue
Block a user