Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(metrics): handle the case where the worker is already assigned to a thread #1171

Merged
merged 14 commits into from
Nov 21, 2024
Merged
8 changes: 4 additions & 4 deletions caddy/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestWorkerWithInactiveWatcher(t *testing.T) {
{
skip_install_trust
admin localhost:2999
http_port 9080
http_port `+testPort+`

frankenphp {
worker {
Expand All @@ -26,13 +26,13 @@ func TestWorkerWithInactiveWatcher(t *testing.T) {
}
}

localhost:9080 {
localhost:`+testPort+` {
root ../testdata
rewrite worker-with-watcher.php
php
}
`, "caddyfile")

tester.AssertGetResponse("http://localhost:9080", http.StatusOK, "requests:1")
tester.AssertGetResponse("http://localhost:9080", http.StatusOK, "requests:2")
tester.AssertGetResponse("http://localhost:"+testPort, http.StatusOK, "requests:1")
tester.AssertGetResponse("http://localhost:"+testPort, http.StatusOK, "requests:2")
}
6 changes: 4 additions & 2 deletions php_thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import "C"
import (
"net/http"
"runtime"
"sync"
"unsafe"
)

Expand All @@ -19,6 +20,7 @@ type phpThread struct {
worker *worker
requestChan chan *http.Request
knownVariableKeys map[string]*C.zend_string
readiedOnce sync.Once
}

func initPHPThreads(numThreads int) {
Expand All @@ -28,7 +30,7 @@ func initPHPThreads(numThreads int) {
}
}

func (thread phpThread) getActiveRequest() *http.Request {
func (thread *phpThread) getActiveRequest() *http.Request {
if thread.workerRequest != nil {
return thread.workerRequest
}
Expand All @@ -46,5 +48,5 @@ func (thread *phpThread) pinString(s string) *C.char {

// C strings must be null-terminated
func (thread *phpThread) pinCString(s string) *C.char {
return thread.pinString(s+"\x00")
return thread.pinString(s + "\x00")
}
68 changes: 43 additions & 25 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/http"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/dunglas/frankenphp/internal/watcher"
Expand All @@ -24,6 +23,7 @@ type worker struct {
requestChan chan *http.Request
threads []*phpThread
threadMutex sync.RWMutex
ready chan struct{}
}

const maxWorkerErrorBackoff = 1 * time.Second
Expand All @@ -32,34 +32,28 @@ const maxWorkerConsecutiveFailures = 6

var (
watcherIsEnabled bool
workersReadyWG sync.WaitGroup
workerShutdownWG sync.WaitGroup
workersAreReady atomic.Bool
workersAreDone atomic.Bool
workersDone chan interface{}
workers = make(map[string]*worker)
)

func initWorkers(opt []workerOpt) error {
workersDone = make(chan interface{})
workersAreReady.Store(false)
workersAreDone.Store(false)

for _, o := range opt {
worker, err := newWorker(o)
worker.threads = make([]*phpThread, 0, o.num)
if err != nil {
return err
}
workersReadyWG.Add(worker.num)
for i := 0; i < worker.num; i++ {
go worker.startNewWorkerThread()
}
for i := 0; i < worker.num; i++ {
<-worker.ready
withinboredom marked this conversation as resolved.
Show resolved Hide resolved
}
}

workersReadyWG.Wait()
workersAreReady.Store(true)

return nil
}

Expand All @@ -80,7 +74,13 @@ func newWorker(o workerOpt) (*worker, error) {
}

o.env["FRANKENPHP_WORKER\x00"] = "1"
w := &worker{fileName: absFileName, num: o.num, env: o.env, requestChan: make(chan *http.Request)}
w := &worker{
fileName: absFileName,
num: o.num,
env: o.env,
requestChan: make(chan *http.Request),
ready: make(chan struct{}),
}
workers[absFileName] = w

return w, nil
Expand All @@ -95,14 +95,14 @@ func (worker *worker) startNewWorkerThread() {
backingOffLock := sync.RWMutex{}

for {

// if the worker can stay up longer than backoff*2, it is probably an application error
upFunc := sync.Once{}
go func() {
backingOffLock.RLock()
wait := backoff * 2
backingOffLock.RUnlock()
time.Sleep(wait)
worker.ready <- struct{}{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is the correct approach. What if the worker script takes longer than 200ms to start and then fails? Additionally, this will unnecessarily delay startup in most cases.
This line should probably be moved somewhere to metrics.ReadyWorker, right after frankenphp_handle_request() was called.

Copy link
Collaborator Author

@withinboredom withinboredom Nov 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I'm not sure why we need to wait for them to be fully ready other than for ensuring that tests can be run. This implementation ensures that we can continue and can be reasonably confident that the workers are executing and not immediately crashing. Whether the application is ready to serve or not is in the domain of the application through liveness and health checks.

In fact, in the current implementation, it is continuing before all threads are ready anyway.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently it will definitely wait for all threads to be ready via the workersReadyWG, which only reaches 0 after each thread has called frankenphp_handle_request(). With this implementation we could reach a 'ready' state even if the worker script doesn't contain frankenphp_handle_request()
I guess it depends on when you would consider the worker thread to be 'ready', but why go with 'reasonable confidence' when you can have absolute confidence instead? (and be faster in most cases)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently it will definitely wait for all threads to be ready via the workersReadyWG, which only reaches 0 after each thread has called frankenphp_handle_request().

I honestly don't remember what I ran into last night that specifically triggered this rabbit hole, other than turning on the watcher caused all kinds of problems with the metrics being output. TL;DR: I was just trying to simplify things to understand why it is causing issues, then running into other issues as I went, yak shaving my way through it all.

I ran into a number of minor issues that sometimes made it past the atomics, like restarting during shutdown. Atomics are terrible unless you are using CAS, and you are not guaranteed to get the latest value ... so I deleted them. From there, it was just figuring out what was going wrong with the WorkersReadyWG because as mentioned, the current architecture doesn't really make sense for them, but we don't want to run tests until workers are started.

So, most of the refactoring is just in getting rid of atomics. This then uncovered new failing tests 100% of the time instead of randomly, particularly when restarting workers... and so on.

After all this, I still don't know why watchers specifically caused my issues 😆

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it depends on when you would consider the worker thread to be 'ready', but why go with 'reasonable confidence' when you can have absolute confidence instead? (and be faster in most cases)

I moved it.

TL;DR: I was just trying to simplify things to understand why it is causing issues

I don't think I'll merge this PR, but continue working on a PR for your PR. This metric is fubar for now, but I think it is worth focusing on the work there.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I originally intended my refactor PR to just be a small step towards more control over PHP threads. But then I realized it could potentially also fix Fibers and it kinda ended up doing more than I wanted it to.
Problem with these big PRs is that they kind of block other PRs (like this one).

Copy link
Collaborator Author

@withinboredom withinboredom Nov 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should take smaller steps in our PR. Unfortunately, I never did figure out what was broken here, short of refactoring the entire worker stuff. :(

It might be worth working on an actual design in a Discussion? That way we'd be able to work on different parts independently and break it down into smaller PRs. Maybe we could take a 'strangler fig' approach and reimplement it separate from the current code. So, we could just use build tags to switch between worker implementations (at least while we are working on it), and people could test it more easily.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upFunc.Do(func() {
backingOffLock.Lock()
defer backingOffLock.Unlock()
Expand Down Expand Up @@ -146,8 +146,14 @@ func (worker *worker) startNewWorkerThread() {
fc := r.Context().Value(contextKey).(*FrankenPHPContext)

// if we are done, exit the loop that restarts the worker script
if workersAreDone.Load() {
break
select {
case _, ok := <-workersDone:
if !ok {
return
}
// continue on since the channel is still open
default:
// continue on since the channel is still open
}

// on exit status 0 we just run the worker script again
Expand Down Expand Up @@ -185,12 +191,7 @@ func (worker *worker) startNewWorkerThread() {
metrics.StopWorker(worker.fileName, StopReasonCrash)
}

metrics.StopWorker(worker.fileName, StopReasonShutdown)

// TODO: check if the termination is expected
if c := logger.Check(zapcore.DebugLevel, "terminated"); c != nil {
c.Write(zap.String("worker", worker.fileName))
}
// unreachable
}

func (worker *worker) handleRequest(r *http.Request) {
Expand All @@ -211,7 +212,6 @@ func (worker *worker) handleRequest(r *http.Request) {
}

func stopWorkers() {
workersAreDone.Store(true)
close(workersDone)
}

Expand Down Expand Up @@ -254,15 +254,11 @@ func restartWorkers(workerOpts []workerOpt) {

func assignThreadToWorker(thread *phpThread) {
fc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
metrics.ReadyWorker(fc.scriptFilename)
worker, ok := workers[fc.scriptFilename]
if !ok {
panic("worker not found for script: " + fc.scriptFilename)
}
thread.worker = worker
if !workersAreReady.Load() {
workersReadyWG.Done()
}
thread.requestChan = make(chan *http.Request)
worker.threadMutex.Lock()
worker.threads = append(worker.threads, thread)
Expand All @@ -273,10 +269,23 @@ func assignThreadToWorker(thread *phpThread) {
func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
thread := phpThreads[threadIndex]

select {
case _, ok := <-workersDone:
if !ok {
// attempted to restart during shutdown
return C.bool(false)
}
default:
}

// we assign a worker to the thread if it doesn't have one already
if thread.worker == nil {
assignThreadToWorker(thread)
}
thread.readiedOnce.Do(func() {
// inform metrics that the worker is ready
metrics.ReadyWorker(thread.worker.fileName)
})

if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
c.Write(zap.String("worker", thread.worker.fileName))
Expand Down Expand Up @@ -343,5 +352,14 @@ func go_frankenphp_finish_request(threadIndex C.uintptr_t, isWorkerRequest bool)

if isWorkerRequest {
thread.Unpin()
workers[fc.scriptFilename].threadMutex.Lock()
defer workers[fc.scriptFilename].threadMutex.Unlock()
for i, t := range workers[fc.scriptFilename].threads {
if t == thread {
// remove thread from worker threads
workers[fc.scriptFilename].threads = append(workers[fc.scriptFilename].threads[:i], workers[fc.scriptFilename].threads[i+1:]...)
break
}
}
withinboredom marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading