Refactoring.

This commit is contained in:
Alliballibaba
2025-09-14 13:07:54 +02:00
parent e50593a11d
commit 57c2ed4378
4 changed files with 33 additions and 43 deletions

View File

@@ -8,15 +8,15 @@ import (
"time"
)
// limit of tracked path children
// hard limit of tracked paths
const maxTrackedPaths = 1000
// path parts longer than this are considered a wildcard
// path parts longer than this are considered a slug
const charLimitWildcard = 50
var (
// requests taking longer than this are considered slow (var for tests)
slowRequestThreshold = 1000 * time.Millisecond
slowRequestThreshold = 1500 * time.Millisecond
// % of autoscaled threads that are not marked as low latency (var for tests)
slowThreadPercentile = 40
@@ -33,7 +33,7 @@ func initLatencyTracking() {
}
// trigger latency tracking while scaling threads
func triggerLatencyTrackingIfNeeded(thread *phpThread) {
func triggerLatencyTracking(thread *phpThread) {
if isNearThreadLimit() {
latencyTrackingEnabled.Store(true)
thread.isLowLatencyThread = true
@@ -41,7 +41,7 @@ func triggerLatencyTrackingIfNeeded(thread *phpThread) {
}
}
func stopLatencyTrackingIfNeeded() {
func stopLatencyTracking() {
if latencyTrackingEnabled.Load() && !isNearThreadLimit() {
latencyTrackingEnabled.Store(false)
logger.Debug("latency tracking disabled")
@@ -98,7 +98,6 @@ func isHighLatencyRequest(fc *frankenPHPContext) bool {
return false
}
// TODO: query?
func normalizePath(path string) string {
pathLen := len(path)
if pathLen > 1 && path[pathLen-1] == '/' {
@@ -126,6 +125,7 @@ func normalizePath(path string) string {
// determine if a path part is a wildcard
func normalizePathPart(part string) string {
if len(part) > charLimitWildcard {
// TODO: better slug detection?
return ":slug"
}

View File

@@ -63,7 +63,7 @@ func addRegularThread() (*phpThread, error) {
if thread == nil {
return nil, ErrMaxThreadsReached
}
triggerLatencyTrackingIfNeeded(thread)
triggerLatencyTracking(thread)
convertToRegularThread(thread)
thread.state.waitFor(stateReady, stateShuttingDown, stateReserved)
return thread, nil
@@ -74,7 +74,7 @@ func addWorkerThread(worker *worker) (*phpThread, error) {
if thread == nil {
return nil, ErrMaxThreadsReached
}
triggerLatencyTrackingIfNeeded(thread)
triggerLatencyTracking(thread)
convertToWorkerThread(thread, worker)
thread.state.waitFor(stateReady, stateShuttingDown, stateReserved)
return thread, nil
@@ -223,5 +223,5 @@ func deactivateThreads() {
// }
}
stopLatencyTrackingIfNeeded()
stopLatencyTracking()
}

View File

@@ -5,19 +5,21 @@ import (
"time"
)
// threadPool manages a pool of PHP threads
// used for both worker and regular threads
type threadPool struct {
threads []*phpThread
mu sync.RWMutex
ch chan *frankenPHPContext
fastChan chan *frankenPHPContext
threads []*phpThread
mu sync.RWMutex
ch chan *frankenPHPContext
lowLatencyChan chan *frankenPHPContext
}
func newThreadPool(capacity int) *threadPool {
return &threadPool{
threads: make([]*phpThread, 0, capacity),
mu: sync.RWMutex{},
ch: make(chan *frankenPHPContext),
fastChan: make(chan *frankenPHPContext),
threads: make([]*phpThread, 0, capacity),
mu: sync.RWMutex{},
ch: make(chan *frankenPHPContext),
lowLatencyChan: make(chan *frankenPHPContext),
}
}
@@ -38,17 +40,10 @@ func (p *threadPool) detach(thread *phpThread) {
p.mu.Unlock()
}
func (p *threadPool) len() int {
p.mu.RLock()
l := len(p.threads)
p.mu.RUnlock()
return l
}
// get the correct request chan for queued requests
func (p *threadPool) requestChan(thread *phpThread) chan *frankenPHPContext {
if thread.isLowLatencyThread {
return p.fastChan
return p.lowLatencyChan
}
return p.ch
}
@@ -72,32 +67,29 @@ func (p *threadPool) dispatchRequest(fc *frankenPHPContext) bool {
}
// dispatch request to all threads, triggering scaling or timeouts as needed
func (p *threadPool) queueRequest(fc *frankenPHPContext, isFastRequest bool) bool {
var fastChan chan *frankenPHPContext
if isFastRequest {
fastChan = p.fastChan
func (p *threadPool) queueRequest(fc *frankenPHPContext, isLowLatencyRequest bool) bool {
var lowLatencyChan chan *frankenPHPContext
if isLowLatencyRequest {
lowLatencyChan = p.lowLatencyChan
}
var timeoutChan <-chan time.Time
if maxWaitTime > 0 {
timeoutChan = time.After(maxWaitTime)
}
for {
select {
case p.ch <- fc:
return true
case fastChan <- fc:
return true
case lowLatencyChan <- fc:
return true // 'low laten'
case scaleChan <- fc:
// the request has triggered scaling, continue to wait for a thread
case <-timeoutChan(maxWaitTime):
case <-timeoutChan:
// the request has timed out stalling
fc.reject(504, "Gateway Timeout")
return false
}
}
}
func timeoutChan(timeout time.Duration) <-chan time.Time {
if timeout == 0 {
return nil
}
return time.After(timeout)
}

View File

@@ -13,9 +13,7 @@ type regularThread struct {
requestContext *frankenPHPContext
}
var (
regularThreadPool *threadPool
)
var regularThreadPool *threadPool
func initRegularPHPThreads(num int) {
regularThreadPool = newThreadPool(num)