-
-
Notifications
You must be signed in to change notification settings - Fork 247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: decouple worker threads from non-worker threads #1137
base: main
Are you sure you want to change the base?
Conversation
# Conflicts: # frankenphp.c # frankenphp.go # php_thread.go # worker.go
Hmm that segfault is interesting, It's probably not fully safe to execute a PHP script while calling |
# Conflicts: # frankenphp.go
This approach seems to actually fix fibers? @dunglas @withinboredom At least tests were red in #1151 and now the |
frankenphp.c
Outdated
while (true) { | ||
char *scriptName = go_frankenphp_on_thread_work(thread_index); | ||
|
||
// if the script name is NULL, the thread should exit | ||
if (scriptName == NULL) { | ||
break; | ||
} | ||
|
||
// if the script name is not empty, execute the PHP script | ||
if (strlen(scriptName) != 0) { | ||
int exit_status = frankenphp_execute_script(scriptName); | ||
go_frankenphp_after_thread_work(thread_index, exit_status); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Running the 'thread work' loop like this seems to fix fibers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe calling the methods before_script_execution
and after_script_execution
would make more sense then.
The reason fibers fail is go-C-go, and fibers move the stack, which causes go to panic. If you can remove one of those, go is quite happy. |
So C-go-C should be fine then 👍 |
As long as it isn't possible to end up with C-go-C-go, yeah. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking pretty good. I see a lot of uses of atomics, which are usually red flags unless you are doing low-level things.
For example, it may be better to put into the worker struct a boolean for "isRestarting" that gets unconditionally set to false
once a worker is started. When restarting, set to true
. Then just check that boolean. Otherwise, the workersAreRestarting
is bound to a specific time. There's a couple other cases I pointed out where you've bound the execution to time, when it would be better to bind it to the struct lifetime (threadsAreBooting
and isReady
).
In all, I think this is a great step in the right direction! I've checked it out and will give it a whirl.
98ba095
to
06ebd67
Compare
I'll be honest, after spending awhile on this branch and trying to make some changes... I don't think this is a step in the right direction. The "hooks" system makes things quite complex to debug, especially since hooks can be swapped out during execution. All the boolean flags and waitgroups makes it even harder to find out where things are going wrong and why. In essence, it feels like a Rube Goldberg machine when stepping line by line. It's pretty fun to do, but it doesn't feel very solid and feels easy to break. Have you considered modeling this as a state machine? Maybe something like this: type workerState int
const (
workerStateInactive workerState = iota
workerStateReady
workerStateActive
workerStateDrain
)
type workerStateMachine struct {
currentState workerState
booting bool
mu sync.RWMutex
subscribers map[workerState][]chan struct{}
}
// Transition models a state machine for workers.
// A worker thread moves from inactive to ready to active to drain to inactive.
// Inactive means a thread is not currently uninitialized.
// Ready means a thread has prepared to run a request.
// Active means a thread is running requests or assigned to a worker.
// Drain means a thread will not accept new requests and return to Inactive after the current request.
func (w *workerStateMachine) Transition(nextState workerState) {
w.mu.Lock()
defer w.mu.Unlock()
if w.currentState == nextState {
return
}
notifySubs := func(state workerState) {
if c, ok := w.subscribers[state]; ok {
for _, ch := range c {
close(ch)
}
delete(w.subscribers, state)
}
}
switch w.currentState {
case workerStateInactive:
switch nextState {
case workerStateInactive:
return
case workerStateActive:
panic("worker cannot transition from inactive to active")
case workerStateReady:
w.currentState = workerStateReady
notifySubs(workerStateReady)
return
case workerStateDrain:
w.currentState = workerStateDrain
notifySubs(workerStateDrain)
return
}
case workerStateReady:
switch nextState {
case workerStateInactive:
panic("worker cannot transition from ready to inactive")
case workerStateActive:
if w.booting {
w.booting = false
}
w.currentState = workerStateActive
notifySubs(workerStateActive)
return
case workerStateReady:
return
case workerStateDrain:
w.currentState = workerStateDrain
notifySubs(workerStateDrain)
return
}
case workerStateActive:
switch nextState {
case workerStateInactive:
panic("worker cannot transition from active to inactive")
case workerStateActive:
return
case workerStateReady:
panic("worker cannot transition from active to ready")
case workerStateDrain:
w.currentState = workerStateDrain
notifySubs(workerStateDrain)
return
}
case workerStateDrain:
switch nextState {
case workerStateInactive:
w.currentState = workerStateInactive
notifySubs(workerStateInactive)
return
case workerStateActive:
panic("worker cannot transition from drain to active")
case workerStateReady:
panic("worker cannot transition from drain to ready")
case workerStateDrain:
return
}
}
}
func (w *workerStateMachine) CurrentState() workerState {
w.mu.RLock()
defer w.mu.RUnlock()
return w.currentState
}
func (w *workerStateMachine) IsBooting() bool {
w.mu.RLock()
defer w.mu.RUnlock()
return w.booting
}
// WaitForNext blocks until the given state has transitioned.
func (w *workerStateMachine) WaitForNext(state workerState) {
w.mu.Lock()
if w.currentState == state {
return
}
if w.subscribers == nil {
w.subscribers = make(map[workerState][]chan struct{})
}
if _, ok := w.subscribers[state]; !ok {
w.subscribers[state] = []chan struct{}{}
}
ch := make(chan struct{})
w.subscribers[state] = append(w.subscribers[state], ch)
barrier := sync.WaitGroup{}
barrier.Add(1)
go func() {
<-ch
barrier.Done()
}()
w.mu.Unlock()
barrier.Wait()
}
// WaitFor blocks until the given state has transitioned, or returns immediately if the state has already transitioned.
func (w *workerStateMachine) WaitFor(state workerState) {
w.mu.RLock()
if w.currentState >= state {
w.mu.RUnlock()
return
}
w.mu.RUnlock()
// todo: a race can happen in this empty space
w.WaitForNext(state)
} Using this would alleviate a lot of need for waitgroups and booleans. For example, when initializing threads, you could simply do something like: ready := sync.WaitGroup{}
for _, thread := range phpThreads {
thread.setInactive()
ready.Add(1)
go func() {
thread.currentState.WaitFor(workerStateReady)
ready.Done()
}()
if !C.frankenphp_new_php_thread(C.uintptr_t(thread.threadIndex)) {
panic(fmt.Sprintf("unable to create thread %d", thread.threadIndex))
}
}
ready.Wait() Then in the switches, just define what needs to happen on each state change per the thread lifetime. Workers and cgi-requests can be handled similarly. What do you think? |
Yeah a state machine definitely makes sense to abstract away some of the I actually quite like |
I just discovered a major issue with this implementation. Try to output a large response and you'll discover that |
Actually, this is likely to be the same case in |
Or maybe it is my branch of php-master. I'll check out an official branch later. |
Hmm that would be weird, |
Yeah, I highly suspect it is an issue with my experimental build of php. I only saw it when it doesn't write the entire response out in one go, fwiw. |
This PR refactors how threads are started and is meant as a step towards scaling threads at runtime.
How worker threads are currently started:
Currently, worker threads are started from regular threads via sending a special request
to ServeHTTP. The disadvantage here is that between sending the special worker request
and receiving it, we are losing control over which thread becomes a worker thread.
Worker threads and regular threads are inevitable coupled to each other.
How worker threads are started with this PR:
This PR decouples worker threads from regular threads and makes the
php_thread
structa wrapper around the thread's lifetime.
A 'PHP thread' is currently just a
pthread
with its ownTSRM
storage (this doesn'tnecessarily have to be tied to a real thread in the future as discussed in #1090).
The thread starts, does some work in a loop and then stops. This PR makes it possible
to configure these 3 lifetime hooks from the go side via the
php_thread
struct:This allows re-using the same mechanism for regular threads as well as worker threads.
It also makes it easier to create other potential types of threads in the future
(like 'scheduled workers' or 'task workers').
Additionally, it now would also be possible to grab an 'idle thread', exchange it's hooks and
turn it into a different type of thread at runtime without stopping the underlying thread.
(This PR doesn't go that far though)