From 9822b3842e2f7b7eea950e939182e2e40a11b080 Mon Sep 17 00:00:00 2001 From: Rob Landers Date: Sun, 23 Nov 2025 20:39:07 +0100 Subject: [PATCH] Use sync.Pool to remove channel contention (#2025) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR adds `sync.Pool` direct handoff instead to both regular and worker threads, building on the semaphore-based FIFO approach in `fix/latency`. The pool eliminates the remaining channel contention and provides significant tail latency improvements, particularly for workers. By using `sync.Pool`, we achieve a lock-free per-P dispatch, reducing contention dramatically. The dispatcher tries the pool first and then falls back to the semaphore implementation. Benchmark Results Workers (8 threads, 500 connections, 15s) | Configuration | Req/sec | P50 | P75 | P90 | P99 | Threads | Global RQ | |----------------|---------|--------|--------|---------|---------|---------|-----------| | latency branch | 78,161 | 6.03ms | 8.08ms | 11.19ms | 18.38ms | 45 | 145 | | pool (this PR) | 76,259 | 6.06ms | 7.45ms | 9.03ms | 12.99ms | 46 | 217 | | Improvement | -2.4% | +0.5% | -7.8% | -19.3% | -29.3% | +1 | +49.7% | Regular Threads (8 threads, 500 connections, 15s) | Configuration | Req/sec | P50 | P75 | P90 | P99 | Threads | Global RQ | |----------------|---------|---------|---------|---------|---------|---------|-----------| | latency branch | 42,096 | 11.46ms | 12.35ms | 13.35ms | 17.18ms | 62 | 3 | | pool (this PR) | 42,592 | 11.40ms | 12.26ms | 13.27ms | 15.95ms | 67 | 11 | | Improvement | +1.2% | -0.5% | -0.7% | -0.6% | -7.2% | +5 | +267% | Low Load (10 connections, 1 thread, 15s) - Regular Threads Only to show no difference | Configuration | Req/sec | P50 | P99 | |----------------|---------|-------|-------| | baseline (main) | 38,354 | 222µs | 650µs | | latency branch | 37,977 | 225µs | 657µs | | pool (this PR) | 38,584 | 222µs | 643µs | | Improvement | +1.6% | -1.3% | -2.1% | ## Thread Affinity Tradeoff Workers previously had a low-index affinity and would target the same threads under a low load (t[0], t[1], etc.). This minimised resource initialisation when frameworks lazily created resources (e.g. database connections). The new behaviour in this PR uses `sync.Pool` which uses per-P (processor) locality, distributing requests across multiple threads even under low loads. I think this is actually a good thing, as the previous behaviour is actively dangerous in production scenarios. Consider a scenario with 120 worker threads for an i/o heavy workload (this is actually our advice in multiple issues). Under normal load, maybe only t[0-80] are usually active, and thus only 80 database connections are open. On a Black Friday, the load spikes, and we activate t[101] for the first time, but it exceeds a database connection limit of 100, causing cascading failures during peak loads. This is the worst possible time to discover resource limits. With `sync.Pool`, a normal load eventually cycles through all 120 workers, ensuring no surprises under load. It’s worth noting that with per-P locality there’s a high probability you’ll get the same worker on the same connection, keeping various caches (CPU L1/L2/L3, etc.). This is actually probably better than the low-index affinity for cache efficiency. ## Further improvements Further improvements will result in diminishing returns. Based on eBPF profiling of the pool implementation under load, the remaining overhead is well-distributed: Syscall profile: - futex: 902K calls, 211s total: significantly reduced from baseline’s 1.1M+ calls - sched_yield: 58K calls, 5.3s: intentional scheduler balancing (kept from earlier work) - File I/O (openat/newfstatat/close): ~2.8M operations, 11s: PHP script execution - nanosleep: 177K calls, 47s: timing operations Off-CPU profile shows time is now spent primarily on: - PHP memory allocation (_emalloc_192) and hash operations (zend_hash_index_find) - Go work-stealing and scheduling (normal runtime overhead) The Go-side dispatch optimisation in this PR has eliminated the primary bottleneck (futex contention on channels). The remaining time is spent on productive work (PHP execution) and unavoidable overhead (file I/O, timing). Future optimisation would need to focus on PHP internals, which are outside the scope of FrankenPHP’s Go dispatcher. * All benchmarks are done with significant tracing enabled and thus may be exaggerated under real workloads. --------- Signed-off-by: Robert Landers --- threadregular.go | 31 ++++++++++++++++++++++++++----- threadworker.go | 11 ++++++++++- worker.go | 31 ++++++++++++++----------------- 3 files changed, 50 insertions(+), 23 deletions(-) diff --git a/threadregular.go b/threadregular.go index 57bafdd3..854da730 100644 --- a/threadregular.go +++ b/threadregular.go @@ -13,8 +13,9 @@ import ( type regularThread struct { contextHolder - state *threadState - thread *phpThread + state *threadState + thread *phpThread + workReady chan contextHolder // Channel to receive work directly } var ( @@ -22,12 +23,14 @@ var ( regularThreadMu = &sync.RWMutex{} regularRequestChan chan contextHolder regularSemaphore *semaphore.Weighted // FIFO admission control + regularThreadPool sync.Pool // Pool of idle threads for direct handoff ) func convertToRegularThread(thread *phpThread) { thread.setHandler(®ularThread{ - thread: thread, - state: thread.state, + thread: thread, + state: thread.state, + workReady: make(chan contextHolder, 1), // Buffered to avoid blocking sender }) attachRegularThread(thread) } @@ -77,13 +80,19 @@ func (handler *regularThread) waitForRequest() string { handler.state.markAsWaiting(true) - var ch contextHolder + // Put this thread in the pool for direct handoff + regularThreadPool.Put(handler) + // Wait for work to be assigned (either via pool or fallback channel) + var ch contextHolder select { case <-handler.thread.drainChan: // go back to beforeScriptExecution return handler.beforeScriptExecution() + case ch = <-handler.workReady: + // Work received via direct handoff from the pool case ch = <-regularRequestChan: + // Fallback: work came via global channel } handler.ctx = ch.ctx @@ -111,6 +120,18 @@ func handleRequestWithRegularPHPThreads(ch contextHolder) error { } defer regularSemaphore.Release(1) + // Fast path: try to get an idle thread from the pool + if idle := regularThreadPool.Get(); idle != nil { + handler := idle.(*regularThread) + // Send work to the thread's dedicated channel + handler.workReady <- ch + metrics.DequeuedRequest() + <-ch.frankenPHPContext.done + metrics.StopRequest() + return nil + } + + // Slow path: no idle thread in pool, use the global channel regularRequestChan <- ch metrics.DequeuedRequest() <-ch.frankenPHPContext.done diff --git a/threadworker.go b/threadworker.go index 09edc6ea..3bce21ba 100644 --- a/threadworker.go +++ b/threadworker.go @@ -22,7 +22,8 @@ type workerThread struct { workerFrankenPHPContext *frankenPHPContext workerContext context.Context backoff *exponentialBackoff - isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet + isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet + workReady chan contextHolder // Channel to receive work directly from pool } func convertToWorkerThread(thread *phpThread, worker *worker) { @@ -35,6 +36,7 @@ func convertToWorkerThread(thread *phpThread, worker *worker) { minBackoff: 100 * time.Millisecond, maxConsecutiveFailures: worker.maxConsecutiveFailures, }, + workReady: make(chan contextHolder, 1), // Buffered to avoid blocking sender }) worker.attachThread(thread) } @@ -210,6 +212,9 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { handler.state.markAsWaiting(true) + // Put this thread in the pool for direct handoff + handler.worker.threadPool.Put(handler) + var requestCH contextHolder select { case <-handler.thread.drainChan: @@ -225,7 +230,11 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { return false, nil case requestCH = <-handler.thread.requestChan: + // Fast path: direct thread affinity + case requestCH = <-handler.workReady: + // Medium path: pool handoff case requestCH = <-handler.worker.requestChan: + // Slow path: global channel } handler.workerContext = requestCH.ctx diff --git a/worker.go b/worker.go index 657cad76..fe0d8a0f 100644 --- a/worker.go +++ b/worker.go @@ -25,6 +25,7 @@ type worker struct { semaphore *semaphore.Weighted threads []*phpThread threadMutex sync.RWMutex + threadPool sync.Pool // Pool of idle worker threads for direct handoff allowPathMatching bool maxConsecutiveFailures int onThreadReady func(int) @@ -249,23 +250,7 @@ func (worker *worker) isAtThreadLimit() bool { func (worker *worker) handleRequest(ch contextHolder) error { metrics.StartWorkerRequest(worker.name) - // dispatch requests to all worker threads in order - worker.threadMutex.RLock() - for _, thread := range worker.threads { - select { - case thread.requestChan <- ch: - worker.threadMutex.RUnlock() - <-ch.frankenPHPContext.done - metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt)) - - return nil - default: - // thread is busy, continue - } - } - worker.threadMutex.RUnlock() - - // if no thread was available, mark the request as queued and use semaphore admission control + // mark the request as queued and use admission control metrics.QueuedWorkerRequest(worker.name) workerScaleChan := scaleChan @@ -281,6 +266,18 @@ func (worker *worker) handleRequest(ch contextHolder) error { } defer worker.semaphore.Release(1) + // Fast path: try to get an idle thread from the pool + if idle := worker.threadPool.Get(); idle != nil { + handler := idle.(*workerThread) + // Direct handoff - send work to the thread's dedicated channel + handler.workReady <- ch + metrics.DequeuedWorkerRequest(worker.name) + <-ch.frankenPHPContext.done + metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt)) + return nil + } + + // Slow path: no idle thread in pool, use the global channel worker.requestChan <- ch metrics.DequeuedWorkerRequest(worker.name) <-ch.frankenPHPContext.done