mirror of
https://github.com/php/frankenphp.git
synced 2026-03-24 00:52:11 +01:00
perf: use hot worker threads when possible (#1126)
Co-authored-by: Alliballibaba <alliballibaba@gmail.com> Co-authored-by: Kévin Dunglas <kevin@dunglas.fr>
This commit is contained in:
committed by
GitHub
parent
e5ca97308e
commit
1c3ce114f6
@@ -478,15 +478,15 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error
|
||||
fc.startedAt = time.Now()
|
||||
|
||||
isWorker := fc.responseWriter == nil
|
||||
isWorkerRequest := false
|
||||
|
||||
rc := requestChan
|
||||
// Detect if a worker is available to handle this request
|
||||
if !isWorker {
|
||||
if worker, ok := workers[fc.scriptFilename]; ok {
|
||||
isWorkerRequest = true
|
||||
metrics.StartWorkerRequest(fc.scriptFilename)
|
||||
rc = worker.requestChan
|
||||
worker.handleRequest(request)
|
||||
<-fc.done
|
||||
metrics.StopWorkerRequest(fc.scriptFilename, time.Since(fc.startedAt))
|
||||
return nil
|
||||
} else {
|
||||
metrics.StartRequest()
|
||||
}
|
||||
@@ -494,16 +494,12 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case rc <- request:
|
||||
case requestChan <- request:
|
||||
<-fc.done
|
||||
}
|
||||
|
||||
if !isWorker {
|
||||
if isWorkerRequest {
|
||||
metrics.StopWorkerRequest(fc.scriptFilename, time.Since(fc.startedAt))
|
||||
} else {
|
||||
metrics.StopRequest()
|
||||
}
|
||||
metrics.StopRequest()
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -17,6 +17,7 @@ type phpThread struct {
|
||||
mainRequest *http.Request
|
||||
workerRequest *http.Request
|
||||
worker *worker
|
||||
requestChan chan *http.Request
|
||||
knownVariableKeys map[string]*C.zend_string
|
||||
}
|
||||
|
||||
|
||||
26
worker.go
26
worker.go
@@ -21,6 +21,8 @@ type worker struct {
|
||||
num int
|
||||
env PreparedEnv
|
||||
requestChan chan *http.Request
|
||||
threads []*phpThread
|
||||
threadMutex sync.RWMutex
|
||||
}
|
||||
|
||||
const maxWorkerErrorBackoff = 1 * time.Second
|
||||
@@ -44,6 +46,7 @@ func initWorkers(opt []workerOpt) error {
|
||||
|
||||
for _, o := range opt {
|
||||
worker, err := newWorker(o)
|
||||
worker.threads = make([]*phpThread, 0, o.num)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -189,6 +192,23 @@ func (worker *worker) startNewWorkerThread() {
|
||||
}
|
||||
}
|
||||
|
||||
func (worker *worker) handleRequest(r *http.Request) {
|
||||
worker.threadMutex.RLock()
|
||||
// dispatch requests to all worker threads in order
|
||||
for _, thread := range worker.threads {
|
||||
select {
|
||||
case thread.requestChan <- r:
|
||||
worker.threadMutex.RUnlock()
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
worker.threadMutex.RUnlock()
|
||||
// if no thread was available, fan the request out to all threads
|
||||
// TODO: theoretically there could be autoscaling of threads here
|
||||
worker.requestChan <- r
|
||||
}
|
||||
|
||||
func stopWorkers() {
|
||||
workersAreDone.Store(true)
|
||||
close(workersDone)
|
||||
@@ -242,7 +262,10 @@ func assignThreadToWorker(thread *phpThread) {
|
||||
if !workersAreReady.Load() {
|
||||
workersReadyWG.Done()
|
||||
}
|
||||
// TODO: we can also store all threads assigned to the worker if needed
|
||||
thread.requestChan = make(chan *http.Request)
|
||||
worker.threadMutex.Lock()
|
||||
worker.threads = append(worker.threads, thread)
|
||||
worker.threadMutex.Unlock()
|
||||
}
|
||||
|
||||
//export go_frankenphp_worker_handle_request_start
|
||||
@@ -269,6 +292,7 @@ func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
|
||||
|
||||
return C.bool(false)
|
||||
case r = <-thread.worker.requestChan:
|
||||
case r = <-thread.requestChan:
|
||||
}
|
||||
|
||||
thread.workerRequest = r
|
||||
|
||||
Reference in New Issue
Block a user