fix: only drain workers on graceful shutdown (#1405)

* Only drains workers on shutdown.

* trigger build

* Marks func as experimental.

---------

Co-authored-by: Alliballibaba <alliballibaba@gmail.com>
This commit is contained in:
Alexander Stecher
2025-02-28 12:10:00 +01:00
committed by GitHub
parent 619c903386
commit 3ba4e257a1
4 changed files with 31 additions and 19 deletions

View File

@@ -113,7 +113,7 @@ func (f *FrankenPHPApp) Stop() error {
// note: Exiting() is currently marked as 'experimental'
// https://github.com/caddyserver/caddy/blob/e76405d55058b0a3e5ba222b44b5ef00516116aa/caddy.go#L810
if caddy.Exiting() {
frankenphp.Shutdown()
frankenphp.DrainWorkers()
}
// reset configuration so it doesn't bleed into later tests

View File

@@ -374,7 +374,7 @@ func Shutdown() {
return
}
drainWorkers()
drainWatcher()
drainAutoScaling()
drainPHPThreads()

View File

@@ -380,9 +380,9 @@ func (m *PrometheusMetrics) Shutdown() {
}
if err := m.registry.Register(m.queueDepth); err != nil &&
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
panic(err)
}
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
panic(err)
}
}
func getWorkerNameForMetrics(name string) string {
@@ -432,9 +432,9 @@ func NewPrometheusMetrics(registry prometheus.Registerer) *PrometheusMetrics {
}
if err := m.registry.Register(m.queueDepth); err != nil &&
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
panic(err)
}
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
panic(err)
}
return m
}

View File

@@ -86,18 +86,14 @@ func newWorker(o workerOpt) (*worker, error) {
return w, nil
}
func drainWorkers() {
watcher.DrainWatcher()
// EXPERIMENTAL: DrainWorkers finishes all worker scripts before a graceful shutdown
func DrainWorkers() {
_ = drainWorkerThreads()
}
// RestartWorkers attempts to restart all workers gracefully
func RestartWorkers() {
// disallow scaling threads while restarting workers
scalingMu.Lock()
defer scalingMu.Unlock()
func drainWorkerThreads() []*phpThread {
ready := sync.WaitGroup{}
threadsToRestart := make([]*phpThread, 0)
drainedThreads := make([]*phpThread, 0)
for _, worker := range workers {
worker.threadMutex.RLock()
ready.Add(len(worker.threads))
@@ -108,7 +104,7 @@ func RestartWorkers() {
continue
}
close(thread.drainChan)
threadsToRestart = append(threadsToRestart, thread)
drainedThreads = append(drainedThreads, thread)
go func(thread *phpThread) {
thread.state.waitFor(stateYielding)
ready.Done()
@@ -116,9 +112,25 @@ func RestartWorkers() {
}
worker.threadMutex.RUnlock()
}
ready.Wait()
return drainedThreads
}
func drainWatcher() {
if watcherIsEnabled {
watcher.DrainWatcher()
}
}
// RestartWorkers attempts to restart all workers gracefully
func RestartWorkers() {
// disallow scaling threads while restarting workers
scalingMu.Lock()
defer scalingMu.Unlock()
threadsToRestart := drainWorkerThreads()
for _, thread := range threadsToRestart {
thread.drainChan = make(chan struct{})
thread.state.set(stateReady)