diff --git a/latencytracking.go b/latencytracking.go index cb1102f6..c84e7719 100644 --- a/latencytracking.go +++ b/latencytracking.go @@ -8,15 +8,15 @@ import ( "time" ) -// limit of tracked path children +// hard limit of tracked paths const maxTrackedPaths = 1000 -// path parts longer than this are considered a wildcard +// path parts longer than this are considered a slug const charLimitWildcard = 50 var ( // requests taking longer than this are considered slow (var for tests) - slowRequestThreshold = 1000 * time.Millisecond + slowRequestThreshold = 1500 * time.Millisecond // % of autoscaled threads that are not marked as low latency (var for tests) slowThreadPercentile = 40 @@ -33,7 +33,7 @@ func initLatencyTracking() { } // trigger latency tracking while scaling threads -func triggerLatencyTrackingIfNeeded(thread *phpThread) { +func triggerLatencyTracking(thread *phpThread) { if isNearThreadLimit() { latencyTrackingEnabled.Store(true) thread.isLowLatencyThread = true @@ -41,7 +41,7 @@ func triggerLatencyTrackingIfNeeded(thread *phpThread) { } } -func stopLatencyTrackingIfNeeded() { +func stopLatencyTracking() { if latencyTrackingEnabled.Load() && !isNearThreadLimit() { latencyTrackingEnabled.Store(false) logger.Debug("latency tracking disabled") @@ -98,7 +98,6 @@ func isHighLatencyRequest(fc *frankenPHPContext) bool { return false } -// TODO: query? func normalizePath(path string) string { pathLen := len(path) if pathLen > 1 && path[pathLen-1] == '/' { @@ -126,6 +125,7 @@ func normalizePath(path string) string { // determine if a path part is a wildcard func normalizePathPart(part string) string { if len(part) > charLimitWildcard { + // TODO: better slug detection? return ":slug" } diff --git a/scaling.go b/scaling.go index b96dff33..db0102f8 100644 --- a/scaling.go +++ b/scaling.go @@ -63,7 +63,7 @@ func addRegularThread() (*phpThread, error) { if thread == nil { return nil, ErrMaxThreadsReached } - triggerLatencyTrackingIfNeeded(thread) + triggerLatencyTracking(thread) convertToRegularThread(thread) thread.state.waitFor(stateReady, stateShuttingDown, stateReserved) return thread, nil @@ -74,7 +74,7 @@ func addWorkerThread(worker *worker) (*phpThread, error) { if thread == nil { return nil, ErrMaxThreadsReached } - triggerLatencyTrackingIfNeeded(thread) + triggerLatencyTracking(thread) convertToWorkerThread(thread, worker) thread.state.waitFor(stateReady, stateShuttingDown, stateReserved) return thread, nil @@ -223,5 +223,5 @@ func deactivateThreads() { // } } - stopLatencyTrackingIfNeeded() + stopLatencyTracking() } diff --git a/threadpool.go b/threadpool.go index 37424d7b..443fb8ef 100644 --- a/threadpool.go +++ b/threadpool.go @@ -5,19 +5,21 @@ import ( "time" ) +// threadPool manages a pool of PHP threads +// used for both worker and regular threads type threadPool struct { - threads []*phpThread - mu sync.RWMutex - ch chan *frankenPHPContext - fastChan chan *frankenPHPContext + threads []*phpThread + mu sync.RWMutex + ch chan *frankenPHPContext + lowLatencyChan chan *frankenPHPContext } func newThreadPool(capacity int) *threadPool { return &threadPool{ - threads: make([]*phpThread, 0, capacity), - mu: sync.RWMutex{}, - ch: make(chan *frankenPHPContext), - fastChan: make(chan *frankenPHPContext), + threads: make([]*phpThread, 0, capacity), + mu: sync.RWMutex{}, + ch: make(chan *frankenPHPContext), + lowLatencyChan: make(chan *frankenPHPContext), } } @@ -38,17 +40,10 @@ func (p *threadPool) detach(thread *phpThread) { p.mu.Unlock() } -func (p *threadPool) len() int { - p.mu.RLock() - l := len(p.threads) - p.mu.RUnlock() - return l -} - // get the correct request chan for queued requests func (p *threadPool) requestChan(thread *phpThread) chan *frankenPHPContext { if thread.isLowLatencyThread { - return p.fastChan + return p.lowLatencyChan } return p.ch } @@ -72,32 +67,29 @@ func (p *threadPool) dispatchRequest(fc *frankenPHPContext) bool { } // dispatch request to all threads, triggering scaling or timeouts as needed -func (p *threadPool) queueRequest(fc *frankenPHPContext, isFastRequest bool) bool { - var fastChan chan *frankenPHPContext - if isFastRequest { - fastChan = p.fastChan +func (p *threadPool) queueRequest(fc *frankenPHPContext, isLowLatencyRequest bool) bool { + var lowLatencyChan chan *frankenPHPContext + if isLowLatencyRequest { + lowLatencyChan = p.lowLatencyChan + } + + var timeoutChan <-chan time.Time + if maxWaitTime > 0 { + timeoutChan = time.After(maxWaitTime) } for { select { case p.ch <- fc: return true - case fastChan <- fc: - return true + case lowLatencyChan <- fc: + return true // 'low laten' case scaleChan <- fc: // the request has triggered scaling, continue to wait for a thread - case <-timeoutChan(maxWaitTime): + case <-timeoutChan: // the request has timed out stalling fc.reject(504, "Gateway Timeout") return false } } } - -func timeoutChan(timeout time.Duration) <-chan time.Time { - if timeout == 0 { - return nil - } - - return time.After(timeout) -} diff --git a/threadregular.go b/threadregular.go index 61571f6c..4d46c852 100644 --- a/threadregular.go +++ b/threadregular.go @@ -13,9 +13,7 @@ type regularThread struct { requestContext *frankenPHPContext } -var ( - regularThreadPool *threadPool -) +var regularThreadPool *threadPool func initRegularPHPThreads(num int) { regularThreadPool = newThreadPool(num)