Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock free handles #934

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 33 additions & 30 deletions frankenphp.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"net/http"
"os"
"runtime"
"runtime/cgo"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -124,7 +123,7 @@ type FrankenPHPContext struct {
exitStatus C.int

done chan interface{}
currentWorkerRequest cgo.Handle
currentWorkerRequest handle
}

func clientHasClosed(r *http.Request) bool {
Expand Down Expand Up @@ -188,7 +187,6 @@ func NewRequestWithContext(r *http.Request, opts ...RequestOption) (*http.Reques
fc.scriptFilename = sanitizedPathJoin(fc.documentRoot, fc.scriptName)

c := context.WithValue(r.Context(), contextKey, fc)
c = context.WithValue(c, handleKey, Handles())

return r.WithContext(c), nil
}
Expand Down Expand Up @@ -308,7 +306,7 @@ func Init(options ...Option) error {

shutdownWG.Add(1)
done = make(chan struct{})
requestChan = make(chan *http.Request)
requestChan = make(chan *http.Request, opt.numThreads*2)
withinboredom marked this conversation as resolved.
Show resolved Hide resolved

if C.frankenphp_init(C.int(opt.numThreads)) != 0 {
return MainThreadCreationError
Expand Down Expand Up @@ -356,10 +354,10 @@ func getLogger() *zap.Logger {
return logger
}

func updateServerContext(request *http.Request, create bool, mrh C.uintptr_t) error {
func updateServerContext(request *http.Request, create bool, mrh C.uintptr_t) (*handle, error) {
fc, ok := FromContext(request.Context())
if !ok {
return InvalidRequestError
return nil, InvalidRequestError
}

authUser, authPassword, ok := request.BasicAuth()
Expand All @@ -379,7 +377,7 @@ func updateServerContext(request *http.Request, create bool, mrh C.uintptr_t) er
var err error
contentLength, err = strconv.Atoi(contentLengthStr)
if err != nil {
return fmt.Errorf("invalid Content-Length header: %w", err)
return nil, fmt.Errorf("invalid Content-Length header: %w", err)
}
}

Expand All @@ -399,14 +397,11 @@ func updateServerContext(request *http.Request, create bool, mrh C.uintptr_t) er

cRequestUri := C.CString(request.URL.RequestURI())

var rh cgo.Handle
var rh handle
if fc.responseWriter == nil {
h := cgo.NewHandle(request)
request.Context().Value(handleKey).(*handleList).AddHandle(h)
mrh = C.uintptr_t(h)
mrh = C.uintptr_t(newHandle(request))
} else {
rh = cgo.NewHandle(request)
request.Context().Value(handleKey).(*handleList).AddHandle(rh)
rh = newHandle(request)
}

ret := C.frankenphp_update_server_context(
Expand All @@ -426,10 +421,18 @@ func updateServerContext(request *http.Request, create bool, mrh C.uintptr_t) er
)

if ret > 0 {
return RequestContextCreationError
rh.Delete()
handle(mrh).Delete()
return nil, RequestContextCreationError
}

return nil
if rh != 0 {
return &rh, nil
}

rh = handle(mrh)

return &rh, nil
}

// ServeHTTP executes a PHP script according to the given context.
Expand Down Expand Up @@ -468,19 +471,17 @@ func go_handle_request() bool {
return false

case r := <-requestChan:
h := cgo.NewHandle(r)
r.Context().Value(handleKey).(*handleList).AddHandle(h)

fc, ok := FromContext(r.Context())
if !ok {
panic(InvalidRequestError)
}
defer func() {
maybeCloseContext(fc)
r.Context().Value(handleKey).(*handleList).FreeAll()
}()

if err := updateServerContext(r, true, 0); err != nil {
rh, err := updateServerContext(r, true, 0)
if err != nil {
rh.Delete()
panic(err)
withinboredom marked this conversation as resolved.
Show resolved Hide resolved
}

Expand All @@ -490,6 +491,8 @@ func go_handle_request() bool {
panic(ScriptExecutionError)
}

rh.Delete()

return true
}
}
Expand All @@ -502,7 +505,7 @@ func maybeCloseContext(fc *FrankenPHPContext) {

//export go_ub_write
func go_ub_write(rh C.uintptr_t, cBuf *C.char, length C.int) (C.size_t, C.bool) {
r := cgo.Handle(rh).Value().(*http.Request)
r := handle(rh).Value().(*http.Request)
fc, _ := FromContext(r.Context())

var writer io.Writer
Expand Down Expand Up @@ -539,7 +542,7 @@ var headerKeyCache = func() otter.Cache[string, string] {

//export go_register_variables
func go_register_variables(rh C.uintptr_t, trackVarsArray *C.zval) {
r := cgo.Handle(rh).Value().(*http.Request)
r := handle(rh).Value().(*http.Request)
fc := r.Context().Value(contextKey).(*FrankenPHPContext)

p := &runtime.Pinner{}
Expand Down Expand Up @@ -609,7 +612,7 @@ func go_register_variables(rh C.uintptr_t, trackVarsArray *C.zval) {
func go_apache_request_headers(rh, mrh C.uintptr_t) (*C.go_string, C.size_t, C.uintptr_t) {
if rh == 0 {
// worker mode, not handling a request
mr := cgo.Handle(mrh).Value().(*http.Request)
mr := handle(mrh).Value().(*http.Request)
mfc := mr.Context().Value(contextKey).(*FrankenPHPContext)

if c := mfc.logger.Check(zap.DebugLevel, "apache_request_headers() called in non-HTTP context"); c != nil {
Expand All @@ -618,10 +621,10 @@ func go_apache_request_headers(rh, mrh C.uintptr_t) (*C.go_string, C.size_t, C.u

return nil, 0, 0
}
r := cgo.Handle(rh).Value().(*http.Request)
r := handle(rh).Value().(*http.Request)

pinner := &runtime.Pinner{}
pinnerHandle := C.uintptr_t(cgo.NewHandle(pinner))
pinnerHandle := C.uintptr_t(newHandle(pinner))

headers := make([]C.go_string, 0, len(r.Header)*2)

Expand Down Expand Up @@ -652,7 +655,7 @@ func go_apache_request_cleanup(rh C.uintptr_t) {
return
}

h := cgo.Handle(rh)
h := handle(rh)
p := h.Value().(*runtime.Pinner)
p.Unpin()
h.Delete()
Expand All @@ -671,7 +674,7 @@ func addHeader(fc *FrankenPHPContext, cString *C.char, length C.int) {

//export go_write_headers
func go_write_headers(rh C.uintptr_t, status C.int, headers *C.zend_llist) {
r := cgo.Handle(rh).Value().(*http.Request)
r := handle(rh).Value().(*http.Request)
fc := r.Context().Value(contextKey).(*FrankenPHPContext)

if fc.responseWriter == nil {
Expand Down Expand Up @@ -699,7 +702,7 @@ func go_write_headers(rh C.uintptr_t, status C.int, headers *C.zend_llist) {

//export go_sapi_flush
func go_sapi_flush(rh C.uintptr_t) bool {
r := cgo.Handle(rh).Value().(*http.Request)
r := handle(rh).Value().(*http.Request)
fc := r.Context().Value(contextKey).(*FrankenPHPContext)

if fc.responseWriter == nil || clientHasClosed(r) {
Expand All @@ -715,7 +718,7 @@ func go_sapi_flush(rh C.uintptr_t) bool {

//export go_read_post
func go_read_post(rh C.uintptr_t, cBuf *C.char, countBytes C.size_t) (readBytes C.size_t) {
r := cgo.Handle(rh).Value().(*http.Request)
r := handle(rh).Value().(*http.Request)

p := unsafe.Slice((*byte)(unsafe.Pointer(cBuf)), countBytes)
var err error
Expand All @@ -730,7 +733,7 @@ func go_read_post(rh C.uintptr_t, cBuf *C.char, countBytes C.size_t) (readBytes

//export go_read_cookies
func go_read_cookies(rh C.uintptr_t) *C.char {
r := cgo.Handle(rh).Value().(*http.Request)
r := handle(rh).Value().(*http.Request)

cookies := r.Cookies()
if len(cookies) == 0 {
Expand Down
97 changes: 97 additions & 0 deletions handles.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2021 The Go Authors. All rights reserved.
withinboredom marked this conversation as resolved.
Show resolved Hide resolved
// Use of this source code is governed by a BSD-style
// license that can be found at https://github.com/golang/go/blob/master/LICENSE.

package frankenphp

import (
"sync"
"sync/atomic"
)

// handle is based on the CL here: https://go-review.googlesource.com/c/go/+/600875
type handle uintptr

// slot represents a slot in the handles slice for concurrent access.
type slot struct {
value any
}

var (
handles []*atomic.Pointer[slot]
releasedIdx sync.Pool
nilSlot = &slot{}
growLock sync.RWMutex
handLen atomic.Uint64
)

func init() {
handles = make([]*atomic.Pointer[slot], 0)
handLen = atomic.Uint64{}
releasedIdx.New = func() interface{} {
return nil
}
growLock = sync.RWMutex{}
}

func newHandle(v any) handle {
var h uint64 = 1
s := &slot{value: v}
for {
if released := releasedIdx.Get(); released != nil {
h = released.(uint64)
}

if h > handLen.Load() {
growLock.Lock()
handles = append(handles, &atomic.Pointer[slot]{})
handLen.Store(uint64(len(handles)))
growLock.Unlock()
}

growLock.RLock()
if handles[h-1].CompareAndSwap(nilSlot, s) {
growLock.RUnlock()
return handle(h)
} else if handles[h-1].CompareAndSwap(nil, s) {
growLock.RUnlock()
return handle(h)
} else {
h++
}
growLock.RUnlock()
}
}

func (h handle) Value() any {
growLock.RLock()
defer growLock.RUnlock()
if h > handle(len(handles)) {
panic("runtime/cgo: misuse of an invalid handle")
}

v := handles[h-1].Load()
if v == nil || v == nilSlot {
panic("runtime/cgo: misuse of an released handle")
}

return v.value
}

func (h handle) Delete() {
growLock.RLock()
defer growLock.RUnlock()
if h == 0 {
panic("runtime/cgo: misuse of an zero handle")
}

if h > handle(len(handles)) {
panic("runtime/cgo: misuse of an invalid handle")
}

if v := handles[h-1].Swap(nilSlot); v == nil || v == nilSlot {
panic("runtime/cgo: misuse of an released handle")
}
//nolint:staticcheck
releasedIdx.Put(uint64(h))
}
Loading
Loading