feat: introduces worker name option, use label on worker metrics instead (#1376)

* add worker name option and use it in logs and metrics, update tests

* fix missing reference for collector

* update tests

* update docs

* fix conflict

* add missing allowedDirectives

* update tests
This commit is contained in:
Indra Gunawan
2025-03-22 19:32:59 +08:00
committed by GitHub
parent 3bc426482a
commit 87315a19ae
17 changed files with 719 additions and 329 deletions

View File

@@ -45,6 +45,8 @@ func init() {
}
type workerConfig struct {
// Name for the worker
Name string `json:"name,omitempty"`
// FileName sets the path to the worker script.
FileName string `json:"file_name,omitempty"`
// Num sets the number of workers to start.
@@ -99,7 +101,7 @@ func (f *FrankenPHPApp) Start() error {
frankenphp.WithMaxWaitTime(f.MaxWaitTime),
}
for _, w := range f.Workers {
opts = append(opts, frankenphp.WithWorkers(repl.ReplaceKnown(w.FileName, ""), w.Num, w.Env, w.Watch))
opts = append(opts, frankenphp.WithWorkers(w.Name, repl.ReplaceKnown(w.FileName, ""), w.Num, w.Env, w.Watch))
}
frankenphp.Shutdown()
@@ -234,6 +236,11 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
for d.NextBlock(1) {
v := d.Val()
switch v {
case "name":
if !d.NextArg() {
return d.ArgErr()
}
wc.Name = d.Val()
case "file":
if !d.NextArg() {
return d.ArgErr()
@@ -267,17 +274,26 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
wc.Watch = append(wc.Watch, d.Val())
}
default:
allowedDirectives := "file, num, env, watch"
allowedDirectives := "name, file, num, env, watch"
return wrongSubDirectiveError("worker", allowedDirectives, v)
}
}
if wc.FileName == "" {
return errors.New(`the "file" argument must be specified`)
}
if wc.FileName == "" {
return errors.New(`the "file" argument must be specified`)
}
if frankenphp.EmbeddedAppPath != "" && filepath.IsLocal(wc.FileName) {
wc.FileName = filepath.Join(frankenphp.EmbeddedAppPath, wc.FileName)
if frankenphp.EmbeddedAppPath != "" && filepath.IsLocal(wc.FileName) {
wc.FileName = filepath.Join(frankenphp.EmbeddedAppPath, wc.FileName)
}
if wc.Name == "" {
// let worker initialization validate if the FileName is valid or not
name, _ := fastabs.FastAbs(wc.FileName)
if name == "" {
name = wc.FileName
}
wc.Name = name
}
f.Workers = append(f.Workers, wc)

View File

@@ -11,12 +11,12 @@ import (
"sync/atomic"
"testing"
"github.com/dunglas/frankenphp"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"github.com/caddyserver/caddy/v2"
"github.com/caddyserver/caddy/v2/caddytest"
"github.com/dunglas/frankenphp"
"github.com/dunglas/frankenphp/internal/fastabs"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
)
var testPort = "9080"
@@ -370,17 +370,13 @@ func TestMetrics(t *testing.T) {
// Fetch metrics
resp, err := http.Get("http://localhost:2999/metrics")
if err != nil {
t.Fatalf("failed to fetch metrics: %v", err)
}
require.NoError(t, err, "failed to fetch metrics")
defer resp.Body.Close()
// Read and parse metrics
metrics := new(bytes.Buffer)
_, err = metrics.ReadFrom(resp.Body)
if err != nil {
t.Fatalf("failed to read metrics: %v", err)
}
require.NoError(t, err, "failed to read metrics")
cpus := fmt.Sprintf("%d", frankenphp.MaxThreads)
@@ -432,6 +428,8 @@ func TestWorkerMetrics(t *testing.T) {
}
`, "caddyfile")
workerName, _ := fastabs.FastAbs("../testdata/index.php")
// Make some requests
for i := 0; i < 10; i++ {
wg.Add(1)
@@ -444,17 +442,13 @@ func TestWorkerMetrics(t *testing.T) {
// Fetch metrics
resp, err := http.Get("http://localhost:2999/metrics")
if err != nil {
t.Fatalf("failed to fetch metrics: %v", err)
}
require.NoError(t, err, "failed to fetch metrics")
defer resp.Body.Close()
// Read and parse metrics
metrics := new(bytes.Buffer)
_, err = metrics.ReadFrom(resp.Body)
if err != nil {
t.Fatalf("failed to read metrics: %v", err)
}
require.NoError(t, err, "failed to read metrics")
cpus := fmt.Sprintf("%d", frankenphp.MaxThreads)
@@ -468,29 +462,21 @@ func TestWorkerMetrics(t *testing.T) {
# TYPE frankenphp_busy_threads gauge
frankenphp_busy_threads 2
# HELP frankenphp_testdata_index_php_busy_workers Number of busy PHP workers for this worker
# TYPE frankenphp_testdata_index_php_busy_workers gauge
frankenphp_testdata_index_php_busy_workers 0
# HELP frankenphp_busy_workers Number of busy PHP workers for this worker
# TYPE frankenphp_busy_workers gauge
frankenphp_busy_workers{worker="` + workerName + `"} 0
# HELP frankenphp_testdata_index_php_total_workers Total number of PHP workers for this worker
# TYPE frankenphp_testdata_index_php_total_workers gauge
frankenphp_testdata_index_php_total_workers 2
# HELP frankenphp_total_workers Total number of PHP workers for this worker
# TYPE frankenphp_total_workers gauge
frankenphp_total_workers{worker="` + workerName + `"} 2
# HELP frankenphp_testdata_index_php_worker_request_count
# TYPE frankenphp_testdata_index_php_worker_request_count counter
frankenphp_testdata_index_php_worker_request_count 10
# HELP frankenphp_worker_request_count
# TYPE frankenphp_worker_request_count counter
frankenphp_worker_request_count{worker="` + workerName + `"} 10
# HELP frankenphp_testdata_index_php_ready_workers Running workers that have successfully called frankenphp_handle_request at least once
# TYPE frankenphp_testdata_index_php_ready_workers gauge
frankenphp_testdata_index_php_ready_workers 2
# HELP frankenphp_testdata_index_php_worker_crashes Number of PHP worker crashes for this worker
# TYPE frankenphp_testdata_index_php_worker_crashes counter
frankenphp_testdata_index_php_worker_crashes 0
# HELP frankenphp_testdata_index_php_worker_restarts Number of PHP worker restarts for this worker
# TYPE frankenphp_testdata_index_php_worker_restarts counter
frankenphp_testdata_index_php_worker_restarts 0
# HELP frankenphp_ready_workers Running workers that have successfully called frankenphp_handle_request at least once
# TYPE frankenphp_ready_workers gauge
frankenphp_ready_workers{worker="` + workerName + `"} 2
`
ctx := caddy.ActiveContext()
@@ -500,15 +486,105 @@ func TestWorkerMetrics(t *testing.T) {
strings.NewReader(expectedMetrics),
"frankenphp_total_threads",
"frankenphp_busy_threads",
"frankenphp_testdata_index_php_busy_workers",
"frankenphp_testdata_index_php_total_workers",
"frankenphp_testdata_index_php_worker_request_count",
"frankenphp_testdata_index_php_worker_crashes",
"frankenphp_testdata_index_php_worker_restarts",
"frankenphp_testdata_index_php_ready_workers",
"frankenphp_busy_workers",
"frankenphp_total_workers",
"frankenphp_worker_request_count",
"frankenphp_ready_workers",
))
}
func TestNamedWorkerMetrics(t *testing.T) {
var wg sync.WaitGroup
tester := caddytest.NewTester(t)
tester.InitServer(`
{
skip_install_trust
admin localhost:2999
http_port `+testPort+`
https_port 9443
frankenphp {
worker {
name my_app
file ../testdata/index.php
num 2
}
}
}
localhost:`+testPort+` {
route {
php {
root ../testdata
}
}
}
`, "caddyfile")
// Make some requests
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
tester.AssertGetResponse(fmt.Sprintf("http://localhost:"+testPort+"/index.php?i=%d", i), http.StatusOK, fmt.Sprintf("I am by birth a Genevese (%d)", i))
wg.Done()
}(i)
}
wg.Wait()
// Fetch metrics
resp, err := http.Get("http://localhost:2999/metrics")
require.NoError(t, err, "failed to fetch metrics")
defer resp.Body.Close()
// Read and parse metrics
metrics := new(bytes.Buffer)
_, err = metrics.ReadFrom(resp.Body)
require.NoError(t, err, "failed to read metrics")
cpus := fmt.Sprintf("%d", frankenphp.MaxThreads)
// Check metrics
expectedMetrics := `
# HELP frankenphp_total_threads Total number of PHP threads
# TYPE frankenphp_total_threads counter
frankenphp_total_threads ` + cpus + `
# HELP frankenphp_busy_threads Number of busy PHP threads
# TYPE frankenphp_busy_threads gauge
frankenphp_busy_threads 2
# HELP frankenphp_busy_workers Number of busy PHP workers for this worker
# TYPE frankenphp_busy_workers gauge
frankenphp_busy_workers{worker="my_app"} 0
# HELP frankenphp_total_workers Total number of PHP workers for this worker
# TYPE frankenphp_total_workers gauge
frankenphp_total_workers{worker="my_app"} 2
# HELP frankenphp_worker_request_count
# TYPE frankenphp_worker_request_count counter
frankenphp_worker_request_count{worker="my_app"} 10
# HELP frankenphp_ready_workers Running workers that have successfully called frankenphp_handle_request at least once
# TYPE frankenphp_ready_workers gauge
frankenphp_ready_workers{worker="my_app"} 2
`
ctx := caddy.ActiveContext()
require.NoError(t,
testutil.GatherAndCompare(
ctx.GetMetricsRegistry(),
strings.NewReader(expectedMetrics),
"frankenphp_total_threads",
"frankenphp_busy_threads",
"frankenphp_busy_workers",
"frankenphp_total_workers",
"frankenphp_worker_request_count",
"frankenphp_ready_workers",
),
)
}
func TestAutoWorkerConfig(t *testing.T) {
var wg sync.WaitGroup
tester := caddytest.NewTester(t)
@@ -533,6 +609,8 @@ func TestAutoWorkerConfig(t *testing.T) {
}
`, "caddyfile")
workerName, _ := fastabs.FastAbs("../testdata/index.php")
// Make some requests
for i := 0; i < 10; i++ {
wg.Add(1)
@@ -545,17 +623,13 @@ func TestAutoWorkerConfig(t *testing.T) {
// Fetch metrics
resp, err := http.Get("http://localhost:2999/metrics")
if err != nil {
t.Fatalf("failed to fetch metrics: %v", err)
}
require.NoError(t, err, "failed to fetch metrics")
defer resp.Body.Close()
// Read and parse metrics
metrics := new(bytes.Buffer)
_, err = metrics.ReadFrom(resp.Body)
if err != nil {
t.Fatalf("failed to read metrics: %v", err)
}
require.NoError(t, err, "failed to read metrics")
cpus := fmt.Sprintf("%d", frankenphp.MaxThreads)
workers := fmt.Sprintf("%d", frankenphp.MaxThreads-1)
@@ -570,29 +644,21 @@ func TestAutoWorkerConfig(t *testing.T) {
# TYPE frankenphp_busy_threads gauge
frankenphp_busy_threads ` + workers + `
# HELP frankenphp_testdata_index_php_busy_workers Number of busy PHP workers for this worker
# TYPE frankenphp_testdata_index_php_busy_workers gauge
frankenphp_testdata_index_php_busy_workers 0
# HELP frankenphp_busy_workers Number of busy PHP workers for this worker
# TYPE frankenphp_busy_workers gauge
frankenphp_busy_workers{worker="` + workerName + `"} 0
# HELP frankenphp_testdata_index_php_total_workers Total number of PHP workers for this worker
# TYPE frankenphp_testdata_index_php_total_workers gauge
frankenphp_testdata_index_php_total_workers ` + workers + `
# HELP frankenphp_total_workers Total number of PHP workers for this worker
# TYPE frankenphp_total_workers gauge
frankenphp_total_workers{worker="` + workerName + `"} ` + workers + `
# HELP frankenphp_testdata_index_php_worker_request_count
# TYPE frankenphp_testdata_index_php_worker_request_count counter
frankenphp_testdata_index_php_worker_request_count 10
# HELP frankenphp_worker_request_count
# TYPE frankenphp_worker_request_count counter
frankenphp_worker_request_count{worker="` + workerName + `"} 10
# HELP frankenphp_testdata_index_php_ready_workers Running workers that have successfully called frankenphp_handle_request at least once
# TYPE frankenphp_testdata_index_php_ready_workers gauge
frankenphp_testdata_index_php_ready_workers ` + workers + `
# HELP frankenphp_testdata_index_php_worker_crashes Number of PHP worker crashes for this worker
# TYPE frankenphp_testdata_index_php_worker_crashes counter
frankenphp_testdata_index_php_worker_crashes 0
# HELP frankenphp_testdata_index_php_worker_restarts Number of PHP worker restarts for this worker
# TYPE frankenphp_testdata_index_php_worker_restarts counter
frankenphp_testdata_index_php_worker_restarts 0
# HELP frankenphp_ready_workers Running workers that have successfully called frankenphp_handle_request at least once
# TYPE frankenphp_ready_workers gauge
frankenphp_ready_workers{worker="` + workerName + `"} ` + workers + `
`
ctx := caddy.ActiveContext()
@@ -602,12 +668,10 @@ func TestAutoWorkerConfig(t *testing.T) {
strings.NewReader(expectedMetrics),
"frankenphp_total_threads",
"frankenphp_busy_threads",
"frankenphp_testdata_index_php_busy_workers",
"frankenphp_testdata_index_php_total_workers",
"frankenphp_testdata_index_php_worker_request_count",
"frankenphp_testdata_index_php_worker_crashes",
"frankenphp_testdata_index_php_worker_restarts",
"frankenphp_testdata_index_php_ready_workers",
"frankenphp_busy_workers",
"frankenphp_total_workers",
"frankenphp_worker_request_count",
"frankenphp_ready_workers",
))
}
@@ -799,3 +863,213 @@ func getStatusCode(url string, t *testing.T) int {
defer resp.Body.Close()
return resp.StatusCode
}
func TestMultiWorkersMetrics(t *testing.T) {
var wg sync.WaitGroup
tester := caddytest.NewTester(t)
tester.InitServer(`
{
skip_install_trust
admin localhost:2999
http_port `+testPort+`
https_port 9443
frankenphp {
worker {
name service1
file ../testdata/index.php
num 2
}
worker {
name service2
file ../testdata/ini.php
num 3
}
}
}
localhost:`+testPort+` {
route {
php {
root ../testdata
}
}
}
example.com:`+testPort+` {
route {
php {
root ../testdata
}
}
}
`, "caddyfile")
// Make some requests
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
tester.AssertGetResponse(fmt.Sprintf("http://localhost:"+testPort+"/index.php?i=%d", i), http.StatusOK, fmt.Sprintf("I am by birth a Genevese (%d)", i))
wg.Done()
}(i)
}
wg.Wait()
// Fetch metrics
resp, err := http.Get("http://localhost:2999/metrics")
require.NoError(t, err, "failed to fetch metrics")
defer resp.Body.Close()
// Read and parse metrics
metrics := new(bytes.Buffer)
_, err = metrics.ReadFrom(resp.Body)
require.NoError(t, err, "failed to read metrics")
cpus := fmt.Sprintf("%d", frankenphp.MaxThreads)
// Check metrics
expectedMetrics := `
# HELP frankenphp_total_threads Total number of PHP threads
# TYPE frankenphp_total_threads counter
frankenphp_total_threads ` + cpus + `
# HELP frankenphp_busy_threads Number of busy PHP threads
# TYPE frankenphp_busy_threads gauge
frankenphp_busy_threads 5
# HELP frankenphp_busy_workers Number of busy PHP workers for this worker
# TYPE frankenphp_busy_workers gauge
frankenphp_busy_workers{worker="service1"} 0
# HELP frankenphp_total_workers Total number of PHP workers for this worker
# TYPE frankenphp_total_workers gauge
frankenphp_total_workers{worker="service1"} 2
frankenphp_total_workers{worker="service2"} 3
# HELP frankenphp_worker_request_count
# TYPE frankenphp_worker_request_count counter
frankenphp_worker_request_count{worker="service1"} 10
# HELP frankenphp_ready_workers Running workers that have successfully called frankenphp_handle_request at least once
# TYPE frankenphp_ready_workers gauge
frankenphp_ready_workers{worker="service1"} 2
frankenphp_ready_workers{worker="service2"} 3
`
ctx := caddy.ActiveContext()
require.NoError(t,
testutil.GatherAndCompare(
ctx.GetMetricsRegistry(),
strings.NewReader(expectedMetrics),
"frankenphp_total_threads",
"frankenphp_busy_threads",
"frankenphp_busy_workers",
"frankenphp_total_workers",
"frankenphp_worker_request_count",
"frankenphp_ready_workers",
))
}
func TestMultiWorkersMetricsWithDuplicateName(t *testing.T) {
var wg sync.WaitGroup
tester := caddytest.NewTester(t)
tester.InitServer(`
{
skip_install_trust
admin localhost:2999
http_port `+testPort+`
https_port 9443
frankenphp {
worker {
name service1
file ../testdata/index.php
num 2
}
worker {
name service1
file ../testdata/ini.php
num 3
}
}
}
localhost:`+testPort+` {
route {
php {
root ../testdata
}
}
}
example.com:`+testPort+` {
route {
php {
root ../testdata
}
}
}
`, "caddyfile")
// Make some requests
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
tester.AssertGetResponse(fmt.Sprintf("http://localhost:"+testPort+"/index.php?i=%d", i), http.StatusOK, fmt.Sprintf("I am by birth a Genevese (%d)", i))
wg.Done()
}(i)
}
wg.Wait()
// Fetch metrics
resp, err := http.Get("http://localhost:2999/metrics")
require.NoError(t, err, "failed to fetch metrics")
defer resp.Body.Close()
// Read and parse metrics
metrics := new(bytes.Buffer)
_, err = metrics.ReadFrom(resp.Body)
require.NoError(t, err, "failed to read metrics")
cpus := fmt.Sprintf("%d", frankenphp.MaxThreads)
// Check metrics
expectedMetrics := `
# HELP frankenphp_total_threads Total number of PHP threads
# TYPE frankenphp_total_threads counter
frankenphp_total_threads ` + cpus + `
# HELP frankenphp_busy_threads Number of busy PHP threads
# TYPE frankenphp_busy_threads gauge
frankenphp_busy_threads 5
# HELP frankenphp_busy_workers Number of busy PHP workers for this worker
# TYPE frankenphp_busy_workers gauge
frankenphp_busy_workers{worker="service1"} 0
# HELP frankenphp_total_workers Total number of PHP workers for this worker
# TYPE frankenphp_total_workers gauge
frankenphp_total_workers{worker="service1"} 5
# HELP frankenphp_worker_request_count
# TYPE frankenphp_worker_request_count counter
frankenphp_worker_request_count{worker="service1"} 10
# HELP frankenphp_ready_workers Running workers that have successfully called frankenphp_handle_request at least once
# TYPE frankenphp_ready_workers gauge
frankenphp_ready_workers{worker="service1"} 5
`
ctx := caddy.ActiveContext()
require.NoError(t,
testutil.GatherAndCompare(
ctx.GetMetricsRegistry(),
strings.NewReader(expectedMetrics),
"frankenphp_total_threads",
"frankenphp_busy_threads",
"frankenphp_busy_workers",
"frankenphp_total_workers",
"frankenphp_worker_request_count",
"frankenphp_ready_workers",
))
}

View File

@@ -58,6 +58,7 @@ Optionally, the number of threads to create and [worker scripts](worker.md) to s
num <num> # Sets the number of PHP threads to start, defaults to 2x the number of available CPUs.
env <key> <value> # Sets an extra environment variable to the given value. Can be specified more than once for multiple environment variables.
watch <path> # Sets the path to watch for file changes. Can be specified more than once for multiple paths.
name <name> # Sets the name of the worker, used in logs and metrics. Default: absolute path of worker file
}
}
}

View File

@@ -2,14 +2,16 @@
When [Caddy metrics](https://caddyserver.com/docs/metrics) are enabled, FrankenPHP exposes the following metrics:
* `frankenphp_[worker]_total_workers`: The total number of workers.
* `frankenphp_[worker]_busy_workers`: The number of workers currently processing a request.
* `frankenphp_[worker]_worker_request_time`: The time spent processing requests by all workers.
* `frankenphp_[worker]_worker_request_count`: The number of requests processed by all workers.
* `frankenphp_[worker]_ready_workers`: The number of workers that have called `frankenphp_handle_request` at least once.
* `frankenphp_[worker]_worker_crashes`: The number of times a worker has unexpectedly terminated.
* `frankenphp_[worker]_worker_restarts`: The number of times a worker has been deliberately restarted.
* `frankenphp_total_threads`: The total number of PHP threads.
* `frankenphp_busy_threads`: The number of PHP threads currently processing a request (running workers always consume a thread).
* `frankenphp_queue_depth`: The number of regular queued requests
* `frankenphp_total_workers{worker="[worker_name]"}`: The total number of workers.
* `frankenphp_busy_workers{worker="[worker_name]"}`: The number of workers currently processing a request.
* `frankenphp_worker_request_time{worker="[worker_name]"}`: The time spent processing requests by all workers.
* `frankenphp_worker_request_count{worker="[worker_name]"}`: The number of requests processed by all workers.
* `frankenphp_ready_workers{worker="[worker_name]"}`: The number of workers that have called `frankenphp_handle_request` at least once.
* `frankenphp_worker_crashes{worker="[worker_name]"}`: The number of times a worker has unexpectedly terminated.
* `frankenphp_worker_restarts{worker="[worker_name]"}`: The number of times a worker has been deliberately restarted.
* `frankenphp_worker_queue_depth{worker="[worker_name]"}`: The number of queued requests.
For worker metrics, the `[worker]` placeholder is replaced by the worker script path in the Caddyfile.
For worker metrics, the `[worker_name]` placeholder is replaced by the worker name in the Caddyfile, otherwise absolute path of worker file will be used.

View File

@@ -160,7 +160,7 @@ func calculateMaxThreads(opt *opt) (int, int, int, error) {
// https://github.com/dunglas/frankenphp/issues/126
opt.workers[i].num = maxProcs
}
metrics.TotalWorkers(w.fileName, w.num)
metrics.TotalWorkers(w.name, w.num)
numWorkers += opt.workers[i].num
}

View File

@@ -9,7 +9,6 @@ import (
"context"
"errors"
"fmt"
"github.com/dunglas/frankenphp/internal/fastabs"
"io"
"log"
"mime/multipart"
@@ -28,6 +27,7 @@ import (
"testing"
"github.com/dunglas/frankenphp"
"github.com/dunglas/frankenphp/internal/fastabs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
@@ -65,7 +65,7 @@ func runTest(t *testing.T, test func(func(http.ResponseWriter, *http.Request), *
initOpts := []frankenphp.Option{frankenphp.WithLogger(opts.logger)}
if opts.workerScript != "" {
initOpts = append(initOpts, frankenphp.WithWorkers(testDataDir+opts.workerScript, opts.nbWorkers, opts.env, opts.watch))
initOpts = append(initOpts, frankenphp.WithWorkers("workerName", testDataDir+opts.workerScript, opts.nbWorkers, opts.env, opts.watch))
}
initOpts = append(initOpts, opts.initOpts...)
if opts.phpIni != nil {

1
go.mod
View File

@@ -20,6 +20,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dolthub/maphash v0.1.0 // indirect
github.com/gammazero/deque v1.0.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect

2
go.sum
View File

@@ -16,6 +16,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/maypok86/otter v1.2.4 h1:HhW1Pq6VdJkmWwcZZq19BlEQkHtI8xgsQzBVXJU0nfc=
github.com/maypok86/otter v1.2.4/go.mod h1:mKLfoI7v1HOmQMwFgX4QkRk23mX6ge3RDvjdHOWG4R4=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=

View File

@@ -2,18 +2,12 @@ package frankenphp
import (
"errors"
"regexp"
"sync"
"time"
"github.com/dunglas/frankenphp/internal/fastabs"
"github.com/prometheus/client_golang/prometheus"
)
var metricsNameRegex = regexp.MustCompile(`\W+`)
var metricsNameFixRegex = regexp.MustCompile(`^_+|_+$`)
const (
StopReasonCrash = iota
StopReasonRestart
@@ -91,172 +85,163 @@ type PrometheusMetrics struct {
registry prometheus.Registerer
totalThreads prometheus.Counter
busyThreads prometheus.Gauge
totalWorkers map[string]prometheus.Gauge
busyWorkers map[string]prometheus.Gauge
readyWorkers map[string]prometheus.Gauge
workerCrashes map[string]prometheus.Counter
workerRestarts map[string]prometheus.Counter
workerRequestTime map[string]prometheus.Counter
workerRequestCount map[string]prometheus.Counter
workerQueueDepth map[string]prometheus.Gauge
totalWorkers *prometheus.GaugeVec
busyWorkers *prometheus.GaugeVec
readyWorkers *prometheus.GaugeVec
workerCrashes *prometheus.CounterVec
workerRestarts *prometheus.CounterVec
workerRequestTime *prometheus.CounterVec
workerRequestCount *prometheus.CounterVec
workerQueueDepth *prometheus.GaugeVec
queueDepth prometheus.Gauge
mu sync.Mutex
}
func (m *PrometheusMetrics) getLabels(name string) prometheus.Labels {
return prometheus.Labels{"worker": name}
}
func (m *PrometheusMetrics) StartWorker(name string) {
m.busyThreads.Inc()
// tests do not register workers before starting them
if _, ok := m.totalWorkers[name]; !ok {
if m.totalWorkers == nil {
return
}
m.totalWorkers[name].Inc()
m.totalWorkers.With(m.getLabels(name)).Inc()
}
func (m *PrometheusMetrics) ReadyWorker(name string) {
if _, ok := m.totalWorkers[name]; !ok {
if m.totalWorkers == nil {
return
}
m.readyWorkers[name].Inc()
m.readyWorkers.With(m.getLabels(name)).Inc()
}
func (m *PrometheusMetrics) StopWorker(name string, reason StopReason) {
m.busyThreads.Dec()
// tests do not register workers before starting them
if _, ok := m.totalWorkers[name]; !ok {
if m.totalWorkers == nil {
return
}
m.totalWorkers[name].Dec()
m.readyWorkers[name].Dec()
metricLabels := m.getLabels(name)
m.totalWorkers.With(metricLabels).Dec()
m.readyWorkers.With(metricLabels).Dec()
if reason == StopReasonCrash {
m.workerCrashes[name].Inc()
m.workerCrashes.With(metricLabels).Inc()
} else if reason == StopReasonRestart {
m.workerRestarts[name].Inc()
} else if reason == StopReasonShutdown {
m.totalWorkers[name].Dec()
m.workerRestarts.With(metricLabels).Inc()
}
}
func (m *PrometheusMetrics) getIdentity(name string) (string, error) {
actualName, err := fastabs.FastAbs(name)
if err != nil {
return name, err
}
return actualName, nil
}
func (m *PrometheusMetrics) TotalWorkers(name string, _ int) {
func (m *PrometheusMetrics) TotalWorkers(string, int) {
m.mu.Lock()
defer m.mu.Unlock()
identity, err := m.getIdentity(name)
if err != nil {
// do not create metrics, let error propagate when worker is started
return
}
const ns, sub = "frankenphp", "worker"
basicLabels := []string{"worker"}
subsystem := getWorkerNameForMetrics(name)
if _, ok := m.totalWorkers[identity]; !ok {
m.totalWorkers[identity] = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "frankenphp",
Subsystem: subsystem,
if m.totalWorkers == nil {
m.totalWorkers = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "total_workers",
Help: "Total number of PHP workers for this worker",
})
if err := m.registry.Register(m.totalWorkers[identity]); err != nil &&
}, basicLabels)
if err := m.registry.Register(m.totalWorkers); err != nil &&
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
panic(err)
}
}
if _, ok := m.workerCrashes[identity]; !ok {
m.workerCrashes[identity] = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "frankenphp",
Subsystem: subsystem,
Name: "worker_crashes",
Help: "Number of PHP worker crashes for this worker",
})
if err := m.registry.Register(m.workerCrashes[identity]); err != nil &&
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
panic(err)
}
}
if _, ok := m.workerRestarts[identity]; !ok {
m.workerRestarts[identity] = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "frankenphp",
Subsystem: subsystem,
Name: "worker_restarts",
Help: "Number of PHP worker restarts for this worker",
})
if err := m.registry.Register(m.workerRestarts[identity]); err != nil &&
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
panic(err)
}
}
if _, ok := m.readyWorkers[identity]; !ok {
m.readyWorkers[identity] = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "frankenphp",
Subsystem: subsystem,
if m.readyWorkers == nil {
m.readyWorkers = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "ready_workers",
Help: "Running workers that have successfully called frankenphp_handle_request at least once",
})
if err := m.registry.Register(m.readyWorkers[identity]); err != nil &&
}, basicLabels)
if err := m.registry.Register(m.readyWorkers); err != nil &&
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
panic(err)
}
}
if _, ok := m.busyWorkers[identity]; !ok {
m.busyWorkers[identity] = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "frankenphp",
Subsystem: subsystem,
if m.busyWorkers == nil {
m.busyWorkers = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "busy_workers",
Help: "Number of busy PHP workers for this worker",
})
if err := m.registry.Register(m.busyWorkers[identity]); err != nil &&
}, basicLabels)
if err := m.registry.Register(m.busyWorkers); err != nil &&
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
panic(err)
}
}
if _, ok := m.workerRequestTime[identity]; !ok {
m.workerRequestTime[identity] = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "frankenphp",
Subsystem: subsystem,
Name: "worker_request_time",
})
if err := m.registry.Register(m.workerRequestTime[identity]); err != nil &&
if m.workerCrashes == nil {
m.workerCrashes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: sub,
Name: "crashes",
Help: "Number of PHP worker crashes for this worker",
}, basicLabels)
if err := m.registry.Register(m.workerCrashes); err != nil &&
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
panic(err)
}
}
if _, ok := m.workerRequestCount[identity]; !ok {
m.workerRequestCount[identity] = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "frankenphp",
Subsystem: subsystem,
Name: "worker_request_count",
})
if err := m.registry.Register(m.workerRequestCount[identity]); err != nil &&
if m.workerRestarts == nil {
m.workerRestarts = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: sub,
Name: "restarts",
Help: "Number of PHP worker restarts for this worker",
}, basicLabels)
if err := m.registry.Register(m.workerRestarts); err != nil &&
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
panic(err)
}
}
if _, ok := m.workerQueueDepth[identity]; !ok {
m.workerQueueDepth[identity] = prometheus.NewGauge(prometheus.GaugeOpts{
if m.workerRequestTime == nil {
m.workerRequestTime = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: sub,
Name: "request_time",
}, basicLabels)
if err := m.registry.Register(m.workerRequestTime); err != nil &&
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
panic(err)
}
}
if m.workerRequestCount == nil {
m.workerRequestCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: sub,
Name: "request_count",
}, basicLabels)
if err := m.registry.Register(m.workerRequestCount); err != nil &&
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
panic(err)
}
}
if m.workerQueueDepth == nil {
m.workerQueueDepth = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "frankenphp",
Subsystem: subsystem,
Name: "worker_queue_depth",
})
m.registry.MustRegister(m.workerQueueDepth[identity])
Subsystem: sub,
Name: "queue_depth",
}, basicLabels)
if err := m.registry.Register(m.workerQueueDepth); err != nil &&
!errors.As(err, &prometheus.AlreadyRegisteredError{}) {
panic(err)
}
}
}
@@ -273,34 +258,35 @@ func (m *PrometheusMetrics) StopRequest() {
}
func (m *PrometheusMetrics) StopWorkerRequest(name string, duration time.Duration) {
if _, ok := m.workerRequestTime[name]; !ok {
if m.workerRequestTime == nil {
return
}
m.workerRequestCount[name].Inc()
m.busyWorkers[name].Dec()
m.workerRequestTime[name].Add(duration.Seconds())
metricLabels := m.getLabels(name)
m.workerRequestCount.With(metricLabels).Inc()
m.busyWorkers.With(metricLabels).Dec()
m.workerRequestTime.With(metricLabels).Add(duration.Seconds())
}
func (m *PrometheusMetrics) StartWorkerRequest(name string) {
if _, ok := m.busyWorkers[name]; !ok {
if m.busyWorkers == nil {
return
}
m.busyWorkers[name].Inc()
m.busyWorkers.With(m.getLabels(name)).Inc()
}
func (m *PrometheusMetrics) QueuedWorkerRequest(name string) {
if _, ok := m.workerQueueDepth[name]; !ok {
if m.workerQueueDepth == nil {
return
}
m.workerQueueDepth[name].Inc()
m.workerQueueDepth.With(m.getLabels(name)).Inc()
}
func (m *PrometheusMetrics) DequeuedWorkerRequest(name string) {
if _, ok := m.workerQueueDepth[name]; !ok {
if m.workerQueueDepth == nil {
return
}
m.workerQueueDepth[name].Dec()
m.workerQueueDepth.With(m.getLabels(name)).Dec()
}
func (m *PrometheusMetrics) QueuedRequest() {
@@ -316,36 +302,44 @@ func (m *PrometheusMetrics) Shutdown() {
m.registry.Unregister(m.busyThreads)
m.registry.Unregister(m.queueDepth)
for _, g := range m.totalWorkers {
m.registry.Unregister(g)
if m.totalWorkers != nil {
m.registry.Unregister(m.totalWorkers)
m.totalWorkers = nil
}
for _, g := range m.busyWorkers {
m.registry.Unregister(g)
if m.busyWorkers != nil {
m.registry.Unregister(m.busyWorkers)
m.busyWorkers = nil
}
for _, c := range m.workerRequestTime {
m.registry.Unregister(c)
if m.workerRequestTime != nil {
m.registry.Unregister(m.workerRequestTime)
m.workerRequestTime = nil
}
for _, c := range m.workerRequestCount {
m.registry.Unregister(c)
if m.workerRequestCount != nil {
m.registry.Unregister(m.workerRequestCount)
m.workerRequestCount = nil
}
for _, c := range m.workerCrashes {
m.registry.Unregister(c)
if m.workerCrashes != nil {
m.registry.Unregister(m.workerCrashes)
m.workerCrashes = nil
}
for _, c := range m.workerRestarts {
m.registry.Unregister(c)
if m.workerRestarts != nil {
m.registry.Unregister(m.workerRestarts)
m.workerRestarts = nil
}
for _, g := range m.readyWorkers {
m.registry.Unregister(g)
if m.readyWorkers != nil {
m.registry.Unregister(m.readyWorkers)
m.readyWorkers = nil
}
for _, g := range m.workerQueueDepth {
m.registry.Unregister(g)
if m.workerQueueDepth != nil {
m.registry.Unregister(m.workerQueueDepth)
m.workerQueueDepth = nil
}
m.totalThreads = prometheus.NewCounter(prometheus.CounterOpts{
@@ -356,14 +350,6 @@ func (m *PrometheusMetrics) Shutdown() {
Name: "frankenphp_busy_threads",
Help: "Number of busy PHP threads",
})
m.totalWorkers = map[string]prometheus.Gauge{}
m.busyWorkers = map[string]prometheus.Gauge{}
m.workerRequestTime = map[string]prometheus.Counter{}
m.workerRequestCount = map[string]prometheus.Counter{}
m.workerRestarts = map[string]prometheus.Counter{}
m.workerCrashes = map[string]prometheus.Counter{}
m.readyWorkers = map[string]prometheus.Gauge{}
m.workerQueueDepth = map[string]prometheus.Gauge{}
m.queueDepth = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "frankenphp_queue_depth",
Help: "Number of regular queued requests",
@@ -385,13 +371,6 @@ func (m *PrometheusMetrics) Shutdown() {
}
}
func getWorkerNameForMetrics(name string) string {
name = metricsNameRegex.ReplaceAllString(name, "_")
name = metricsNameFixRegex.ReplaceAllString(name, "")
return name
}
func NewPrometheusMetrics(registry prometheus.Registerer) *PrometheusMetrics {
if registry == nil {
registry = prometheus.NewRegistry()
@@ -407,18 +386,18 @@ func NewPrometheusMetrics(registry prometheus.Registerer) *PrometheusMetrics {
Name: "frankenphp_busy_threads",
Help: "Number of busy PHP threads",
}),
totalWorkers: map[string]prometheus.Gauge{},
busyWorkers: map[string]prometheus.Gauge{},
workerRequestTime: map[string]prometheus.Counter{},
workerRequestCount: map[string]prometheus.Counter{},
workerRestarts: map[string]prometheus.Counter{},
workerCrashes: map[string]prometheus.Counter{},
readyWorkers: map[string]prometheus.Gauge{},
workerQueueDepth: map[string]prometheus.Gauge{},
queueDepth: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "frankenphp_queue_depth",
Help: "Number of regular queued requests",
}),
totalWorkers: nil,
busyWorkers: nil,
workerRequestTime: nil,
workerRequestCount: nil,
workerRestarts: nil,
workerCrashes: nil,
readyWorkers: nil,
workerQueueDepth: nil,
}
if err := m.registry.Register(m.totalThreads); err != nil &&

View File

@@ -1,86 +1,195 @@
package frankenphp
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
)
func TestGetWorkerNameForMetrics(t *testing.T) {
tests := []struct {
input string
expected string
}{
{"worker-1", "worker_1"},
{"worker@name", "worker_name"},
{"worker name", "worker_name"},
{"worker/name", "worker_name"},
{"worker.name", "worker_name"},
{"////worker////name...//worker", "worker_name_worker"},
}
for _, test := range tests {
result := getWorkerNameForMetrics(test.input)
assert.Equal(t, test.expected, result)
}
}
func createPrometheusMetrics() *PrometheusMetrics {
return &PrometheusMetrics{
registry: prometheus.NewRegistry(),
totalThreads: prometheus.NewCounter(prometheus.CounterOpts{Name: "total_threads"}),
busyThreads: prometheus.NewGauge(prometheus.GaugeOpts{Name: "busy_threads"}),
totalWorkers: make(map[string]prometheus.Gauge),
busyWorkers: make(map[string]prometheus.Gauge),
workerRequestTime: make(map[string]prometheus.Counter),
workerRequestCount: make(map[string]prometheus.Counter),
workerCrashes: make(map[string]prometheus.Counter),
workerRestarts: make(map[string]prometheus.Counter),
workerQueueDepth: make(map[string]prometheus.Gauge),
readyWorkers: make(map[string]prometheus.Gauge),
mu: sync.Mutex{},
registry: prometheus.NewRegistry(),
totalThreads: prometheus.NewCounter(prometheus.CounterOpts{Name: "frankenphp_total_threads"}),
busyThreads: prometheus.NewGauge(prometheus.GaugeOpts{Name: "frankenphp_busy_threads"}),
queueDepth: prometheus.NewGauge(prometheus.GaugeOpts{Name: "frankenphp_queue_depth"}),
mu: sync.Mutex{},
}
}
func TestPrometheusMetrics_TotalWorkers(t *testing.T) {
m := createPrometheusMetrics()
tests := []struct {
name string
worker string
num int
}{
{"SetWorkers", "test_worker", 5},
}
require.Nil(t, m.totalWorkers)
require.Nil(t, m.busyWorkers)
require.Nil(t, m.readyWorkers)
require.Nil(t, m.workerCrashes)
require.Nil(t, m.workerRestarts)
require.Nil(t, m.workerRequestTime)
require.Nil(t, m.workerRequestCount)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m.TotalWorkers(tt.worker, tt.num)
actualName, _ := m.getIdentity(tt.worker)
_, ok := m.totalWorkers[actualName]
require.True(t, ok)
})
}
m.TotalWorkers("test_worker", 2)
require.NotNil(t, m.totalWorkers)
require.NotNil(t, m.busyWorkers)
require.NotNil(t, m.readyWorkers)
require.NotNil(t, m.workerCrashes)
require.NotNil(t, m.workerRestarts)
require.NotNil(t, m.workerRequestTime)
require.NotNil(t, m.workerRequestCount)
}
func TestPrometheusMetrics_StopWorkerRequest(t *testing.T) {
m := createPrometheusMetrics()
m.TotalWorkers("test_worker", 2)
m.StopWorkerRequest("test_worker", 2*time.Second)
name := "test_worker"
_, ok := m.workerRequestTime[name]
require.False(t, ok)
inputs := []struct {
name string
c prometheus.Collector
metadata string
expect string
}{
{
name: "Testing WorkerRequestCount",
c: m.workerRequestCount,
metadata: `
# HELP frankenphp_worker_request_count
# TYPE frankenphp_worker_request_count counter
`,
expect: `
frankenphp_worker_request_count{worker="test_worker"} 1
`,
},
{
name: "Testing BusyWorkers",
c: m.busyWorkers,
metadata: `
# HELP frankenphp_busy_workers Number of busy PHP workers for this worker
# TYPE frankenphp_busy_workers gauge
`,
expect: `
frankenphp_busy_workers{worker="test_worker"} -1
`,
},
{
name: "Testing WorkerRequestTime",
c: m.workerRequestTime,
metadata: `
# HELP frankenphp_worker_request_time
# TYPE frankenphp_worker_request_time counter
`,
expect: `
frankenphp_worker_request_time{worker="test_worker"} 2
`,
},
}
for _, input := range inputs {
t.Run(input.name, func(t *testing.T) {
require.NoError(t, testutil.CollectAndCompare(input.c, strings.NewReader(input.metadata+input.expect)))
})
}
}
func TestPrometheusMetrics_StartWorkerRequest(t *testing.T) {
m := createPrometheusMetrics()
m.TotalWorkers("test_worker", 2)
m.StartWorkerRequest("test_worker")
name := "test_worker"
_, ok := m.workerRequestCount[name]
require.False(t, ok)
inputs := []struct {
name string
c prometheus.Collector
metadata string
expect string
}{
{
name: "Testing BusyWorkers",
c: m.busyWorkers,
metadata: `
# HELP frankenphp_busy_workers Number of busy PHP workers for this worker
# TYPE frankenphp_busy_workers gauge
`,
expect: `
frankenphp_busy_workers{worker="test_worker"} 1
`,
},
}
for _, input := range inputs {
t.Run(input.name, func(t *testing.T) {
require.NoError(t, testutil.CollectAndCompare(input.c, strings.NewReader(input.metadata+input.expect)))
})
}
}
func TestPrometheusMetrics_TestStopReasonCrash(t *testing.T) {
m := createPrometheusMetrics()
m.TotalWorkers("test_worker", 2)
m.StopWorker("test_worker", StopReasonCrash)
inputs := []struct {
name string
c prometheus.Collector
metadata string
expect string
}{
{
name: "Testing BusyThreads",
c: m.busyThreads,
metadata: `
# HELP frankenphp_busy_threads
# TYPE frankenphp_busy_threads gauge
`,
expect: `
frankenphp_busy_threads -1
`,
},
{
name: "Testing TotalWorkers",
c: m.totalWorkers,
metadata: `
# HELP frankenphp_total_workers Total number of PHP workers for this worker
# TYPE frankenphp_total_workers gauge
`,
expect: `
frankenphp_total_workers{worker="test_worker"} -1
`,
},
{
name: "Testing ReadyWorkers",
c: m.readyWorkers,
metadata: `
# HELP frankenphp_ready_workers Running workers that have successfully called frankenphp_handle_request at least once
# TYPE frankenphp_ready_workers gauge
`,
expect: `
frankenphp_ready_workers{worker="test_worker"} -1
`,
},
{
name: "Testing WorkerCrashes",
c: m.workerCrashes,
metadata: `
# HELP frankenphp_worker_crashes Number of PHP worker crashes for this worker
# TYPE frankenphp_worker_crashes counter
`,
expect: `
frankenphp_worker_crashes{worker="test_worker"} 1
`,
},
}
for _, input := range inputs {
t.Run(input.name, func(t *testing.T) {
require.NoError(t, testutil.CollectAndCompare(input.c, strings.NewReader(input.metadata+input.expect)))
})
}
}

View File

@@ -23,6 +23,7 @@ type opt struct {
}
type workerOpt struct {
name string
fileName string
num int
env PreparedEnv
@@ -55,9 +56,9 @@ func WithMetrics(m Metrics) Option {
}
// WithWorkers configures the PHP workers to start.
func WithWorkers(fileName string, num int, env map[string]string, watch []string) Option {
func WithWorkers(name string, fileName string, num int, env map[string]string, watch []string) Option {
return func(o *opt) error {
o.workers = append(o.workers, workerOpt{fileName, num, PrepareEnv(env), watch})
o.workers = append(o.workers, workerOpt{name, fileName, num, PrepareEnv(env), watch})
return nil
}

View File

@@ -88,12 +88,14 @@ func TestTransitionThreadsWhileDoingRequests(t *testing.T) {
isDone := atomic.Bool{}
wg := sync.WaitGroup{}
worker1Path := testDataPath + "/transition-worker-1.php"
worker1Name := "worker-1"
worker2Path := testDataPath + "/transition-worker-2.php"
worker2Name := "worker-2"
assert.NoError(t, Init(
WithNumThreads(numThreads),
WithWorkers(worker1Path, 1, map[string]string{}, []string{}),
WithWorkers(worker2Path, 1, map[string]string{}, []string{}),
WithWorkers(worker1Name, worker1Path, 1, map[string]string{"ENV1": "foo"}, []string{}),
WithWorkers(worker2Name, worker2Path, 1, map[string]string{"ENV1": "foo"}, []string{}),
WithLogger(zap.NewNop()),
))

View File

@@ -101,7 +101,7 @@ func scaleWorkerThread(worker *worker) {
thread, err := addWorkerThread(worker)
if err != nil {
if c := logger.Check(zapcore.WarnLevel, "could not increase max_threads, consider raising this limit"); c != nil {
c.Write(zap.String("worker", worker.fileName), zap.Error(err))
c.Write(zap.String("worker", worker.name), zap.Error(err))
}
return
}

View File

@@ -31,11 +31,12 @@ func TestScaleARegularThreadUpAndDown(t *testing.T) {
}
func TestScaleAWorkerThreadUpAndDown(t *testing.T) {
workerName := "worker1"
workerPath := testDataPath + "/transition-worker-1.php"
assert.NoError(t, Init(
WithNumThreads(2),
WithMaxThreads(3),
WithWorkers(workerPath, 1, map[string]string{}, []string{}),
WithWorkers(workerName, workerPath, 1, map[string]string{}, []string{}),
WithLogger(zap.NewNop()),
))

View File

@@ -76,7 +76,7 @@ func (handler *workerThread) name() string {
func setupWorkerScript(handler *workerThread, worker *worker) {
handler.backoff.wait()
metrics.StartWorker(worker.fileName)
metrics.StartWorker(worker.name)
// Create a dummy request to set up the worker
fc, err := newDummyContext(
@@ -96,7 +96,7 @@ func setupWorkerScript(handler *workerThread, worker *worker) {
handler.isBootingScript = true
clearSandboxedEnv(handler.thread)
if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
c.Write(zap.String("worker", worker.fileName), zap.Int("thread", handler.thread.threadIndex))
c.Write(zap.String("worker", worker.name), zap.Int("thread", handler.thread.threadIndex))
}
}
@@ -114,30 +114,30 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) {
// on exit status 0 we just run the worker script again
if exitStatus == 0 && !handler.isBootingScript {
// TODO: make the max restart configurable
metrics.StopWorker(worker.fileName, StopReasonRestart)
metrics.StopWorker(worker.name, StopReasonRestart)
handler.backoff.recordSuccess()
if c := logger.Check(zapcore.DebugLevel, "restarting"); c != nil {
c.Write(zap.String("worker", worker.fileName))
c.Write(zap.String("worker", worker.name))
}
return
}
// worker has thrown a fatal error or has not reached frankenphp_handle_request
metrics.StopWorker(worker.fileName, StopReasonCrash)
metrics.StopWorker(worker.name, StopReasonCrash)
if !handler.isBootingScript {
// fatal error (could be due to timeouts, etc.)
return
}
logger.Error("worker script has not reached frankenphp_handle_request", zap.String("worker", worker.fileName))
logger.Error("worker script has not reached frankenphp_handle_request", zap.String("worker", worker.name))
// panic after exponential backoff if the worker has never reached frankenphp_handle_request
if handler.backoff.recordFailure() {
if !watcherIsEnabled && !handler.state.is(stateReady) {
logger.Panic("too many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", handler.backoff.failureCount))
logger.Panic("too many consecutive worker failures", zap.String("worker", worker.name), zap.Int("failures", handler.backoff.failureCount))
}
logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", handler.backoff.failureCount))
logger.Warn("many consecutive worker failures", zap.String("worker", worker.name), zap.Int("failures", handler.backoff.failureCount))
}
}
@@ -147,7 +147,7 @@ func (handler *workerThread) waitForWorkerRequest() bool {
handler.thread.Unpin()
if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
c.Write(zap.String("worker", handler.worker.fileName))
c.Write(zap.String("worker", handler.worker.name))
}
// Clear the first dummy request created to initialize the worker
@@ -162,7 +162,7 @@ func (handler *workerThread) waitForWorkerRequest() bool {
// 'stateTransitionComplete' is only true on the first boot of the worker script,
// while 'isBootingScript' is true on every boot of the worker script
if handler.state.is(stateTransitionComplete) {
metrics.ReadyWorker(handler.worker.fileName)
metrics.ReadyWorker(handler.worker.name)
handler.state.set(stateReady)
}
@@ -172,7 +172,7 @@ func (handler *workerThread) waitForWorkerRequest() bool {
select {
case <-handler.thread.drainChan:
if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
c.Write(zap.String("worker", handler.worker.fileName))
c.Write(zap.String("worker", handler.worker.name))
}
// flush the opcache when restarting due to watcher or admin api
@@ -190,13 +190,13 @@ func (handler *workerThread) waitForWorkerRequest() bool {
handler.state.markAsWaiting(false)
if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
c.Write(zap.String("worker", handler.worker.fileName), zap.String("url", fc.request.RequestURI))
c.Write(zap.String("worker", handler.worker.name), zap.String("url", fc.request.RequestURI))
}
if err := updateServerContext(handler.thread, fc, true); err != nil {
// Unexpected error or invalid request
if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
c.Write(zap.String("worker", handler.worker.fileName), zap.String("url", fc.request.RequestURI), zap.Error(err))
c.Write(zap.String("worker", handler.worker.name), zap.String("url", fc.request.RequestURI), zap.Error(err))
}
fc.rejectBadRequest(err.Error())
handler.workerContext = nil

View File

@@ -4,15 +4,16 @@ package frankenphp
import "C"
import (
"fmt"
"github.com/dunglas/frankenphp/internal/fastabs"
"sync"
"time"
"github.com/dunglas/frankenphp/internal/fastabs"
"github.com/dunglas/frankenphp/internal/watcher"
)
// represents a worker script and can have many threads assigned to it
type worker struct {
name string
fileName string
num int
env PreparedEnv
@@ -75,6 +76,7 @@ func newWorker(o workerOpt) (*worker, error) {
o.env["FRANKENPHP_WORKER\x00"] = "1"
w := &worker{
name: o.name,
fileName: absFileName,
num: o.num,
env: o.env,
@@ -170,7 +172,7 @@ func (worker *worker) countThreads() int {
}
func (worker *worker) handleRequest(fc *frankenPHPContext) {
metrics.StartWorkerRequest(fc.scriptFilename)
metrics.StartWorkerRequest(worker.name)
// dispatch requests to all worker threads in order
worker.threadMutex.RLock()
@@ -179,7 +181,7 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) {
case thread.requestChan <- fc:
worker.threadMutex.RUnlock()
<-fc.done
metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
metrics.StopWorkerRequest(worker.name, time.Since(fc.startedAt))
return
default:
// thread is busy, continue
@@ -188,13 +190,13 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) {
worker.threadMutex.RUnlock()
// if no thread was available, mark the request as queued and apply the scaling strategy
metrics.QueuedWorkerRequest(fc.scriptFilename)
metrics.QueuedWorkerRequest(worker.name)
for {
select {
case worker.requestChan <- fc:
metrics.DequeuedWorkerRequest(fc.scriptFilename)
metrics.DequeuedWorkerRequest(worker.name)
<-fc.done
metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
metrics.StopWorkerRequest(worker.name, time.Since(fc.startedAt))
return
case scaleChan <- fc:
// the request has triggered scaling, continue to wait for a thread

View File

@@ -118,8 +118,8 @@ func TestWorkerGetOpt(t *testing.T) {
func ExampleServeHTTP_workers() {
if err := frankenphp.Init(
frankenphp.WithWorkers("worker1.php", 4, map[string]string{"ENV1": "foo"}, []string{}),
frankenphp.WithWorkers("worker2.php", 2, map[string]string{"ENV2": "bar"}, []string{}),
frankenphp.WithWorkers("worker1", "worker1.php", 4, map[string]string{"ENV1": "foo"}, []string{}),
frankenphp.WithWorkers("worker1", "worker2.php", 2, map[string]string{"ENV2": "bar"}, []string{}),
); err != nil {
panic(err)
}