diff --git a/threadregular.go b/threadregular.go index 57bafdd3..854da730 100644 --- a/threadregular.go +++ b/threadregular.go @@ -13,8 +13,9 @@ import ( type regularThread struct { contextHolder - state *threadState - thread *phpThread + state *threadState + thread *phpThread + workReady chan contextHolder // Channel to receive work directly } var ( @@ -22,12 +23,14 @@ var ( regularThreadMu = &sync.RWMutex{} regularRequestChan chan contextHolder regularSemaphore *semaphore.Weighted // FIFO admission control + regularThreadPool sync.Pool // Pool of idle threads for direct handoff ) func convertToRegularThread(thread *phpThread) { thread.setHandler(®ularThread{ - thread: thread, - state: thread.state, + thread: thread, + state: thread.state, + workReady: make(chan contextHolder, 1), // Buffered to avoid blocking sender }) attachRegularThread(thread) } @@ -77,13 +80,19 @@ func (handler *regularThread) waitForRequest() string { handler.state.markAsWaiting(true) - var ch contextHolder + // Put this thread in the pool for direct handoff + regularThreadPool.Put(handler) + // Wait for work to be assigned (either via pool or fallback channel) + var ch contextHolder select { case <-handler.thread.drainChan: // go back to beforeScriptExecution return handler.beforeScriptExecution() + case ch = <-handler.workReady: + // Work received via direct handoff from the pool case ch = <-regularRequestChan: + // Fallback: work came via global channel } handler.ctx = ch.ctx @@ -111,6 +120,18 @@ func handleRequestWithRegularPHPThreads(ch contextHolder) error { } defer regularSemaphore.Release(1) + // Fast path: try to get an idle thread from the pool + if idle := regularThreadPool.Get(); idle != nil { + handler := idle.(*regularThread) + // Send work to the thread's dedicated channel + handler.workReady <- ch + metrics.DequeuedRequest() + <-ch.frankenPHPContext.done + metrics.StopRequest() + return nil + } + + // Slow path: no idle thread in pool, use the global channel regularRequestChan <- ch metrics.DequeuedRequest() <-ch.frankenPHPContext.done diff --git a/threadworker.go b/threadworker.go index 09edc6ea..3bce21ba 100644 --- a/threadworker.go +++ b/threadworker.go @@ -22,7 +22,8 @@ type workerThread struct { workerFrankenPHPContext *frankenPHPContext workerContext context.Context backoff *exponentialBackoff - isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet + isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet + workReady chan contextHolder // Channel to receive work directly from pool } func convertToWorkerThread(thread *phpThread, worker *worker) { @@ -35,6 +36,7 @@ func convertToWorkerThread(thread *phpThread, worker *worker) { minBackoff: 100 * time.Millisecond, maxConsecutiveFailures: worker.maxConsecutiveFailures, }, + workReady: make(chan contextHolder, 1), // Buffered to avoid blocking sender }) worker.attachThread(thread) } @@ -210,6 +212,9 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { handler.state.markAsWaiting(true) + // Put this thread in the pool for direct handoff + handler.worker.threadPool.Put(handler) + var requestCH contextHolder select { case <-handler.thread.drainChan: @@ -225,7 +230,11 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { return false, nil case requestCH = <-handler.thread.requestChan: + // Fast path: direct thread affinity + case requestCH = <-handler.workReady: + // Medium path: pool handoff case requestCH = <-handler.worker.requestChan: + // Slow path: global channel } handler.workerContext = requestCH.ctx diff --git a/worker.go b/worker.go index 657cad76..fe0d8a0f 100644 --- a/worker.go +++ b/worker.go @@ -25,6 +25,7 @@ type worker struct { semaphore *semaphore.Weighted threads []*phpThread threadMutex sync.RWMutex + threadPool sync.Pool // Pool of idle worker threads for direct handoff allowPathMatching bool maxConsecutiveFailures int onThreadReady func(int) @@ -249,23 +250,7 @@ func (worker *worker) isAtThreadLimit() bool { func (worker *worker) handleRequest(ch contextHolder) error { metrics.StartWorkerRequest(worker.name) - // dispatch requests to all worker threads in order - worker.threadMutex.RLock() - for _, thread := range worker.threads { - select { - case thread.requestChan <- ch: - worker.threadMutex.RUnlock() - <-ch.frankenPHPContext.done - metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt)) - - return nil - default: - // thread is busy, continue - } - } - worker.threadMutex.RUnlock() - - // if no thread was available, mark the request as queued and use semaphore admission control + // mark the request as queued and use admission control metrics.QueuedWorkerRequest(worker.name) workerScaleChan := scaleChan @@ -281,6 +266,18 @@ func (worker *worker) handleRequest(ch contextHolder) error { } defer worker.semaphore.Release(1) + // Fast path: try to get an idle thread from the pool + if idle := worker.threadPool.Get(); idle != nil { + handler := idle.(*workerThread) + // Direct handoff - send work to the thread's dedicated channel + handler.workReady <- ch + metrics.DequeuedWorkerRequest(worker.name) + <-ch.frankenPHPContext.done + metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt)) + return nil + } + + // Slow path: no idle thread in pool, use the global channel worker.requestChan <- ch metrics.DequeuedWorkerRequest(worker.name) <-ch.frankenPHPContext.done