Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add future stateful lock #36333

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 34 additions & 23 deletions internal/util/cgo/futures.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,12 @@ func Async(ctx context.Context, f CGOAsyncFunction, opts ...Opt) Future {

ctx, cancel := context.WithCancel(ctx)
future := &futureImpl{
closure: f,
ctx: ctx,
ctxCancel: cancel,
releaserOnce: sync.Once{},
future: cFuturePtr,
opts: options,
state: newFutureState(),
closure: f,
ctx: ctx,
ctxCancel: cancel,
future: cFuturePtr,
opts: options,
state: newFutureState(),
}

// register the future to do timeout notification.
Expand All @@ -106,29 +105,33 @@ func Async(ctx context.Context, f CGOAsyncFunction, opts ...Opt) Future {
}

type futureImpl struct {
ctx context.Context
ctxCancel context.CancelFunc
future *C.CFuture
closure CGOAsyncFunction
opts *options
state futureState
releaserOnce sync.Once
ctx context.Context
ctxCancel context.CancelFunc
future *C.CFuture
closure CGOAsyncFunction
opts *options
state futureState
}

// Context return the context of the future.
func (f *futureImpl) Context() context.Context {
return f.ctx
}

// BlockUntilReady block until the future is ready or canceled.
func (f *futureImpl) BlockUntilReady() {
f.blockUntilReady()
}

// BlockAndLeakyGet block until the future is ready or canceled, and return the leaky result.
func (f *futureImpl) BlockAndLeakyGet() (unsafe.Pointer, error) {
f.blockUntilReady()

if !f.state.intoConsumed() {
guard := f.state.LockForConsume()
if guard == nil {
return nil, ErrConsumed
}
defer guard.Unlock()

var ptr unsafe.Pointer
var status C.CStatus
Expand All @@ -144,21 +147,31 @@ func (f *futureImpl) BlockAndLeakyGet() (unsafe.Pointer, error) {
return ptr, err
}

// Release the resource of the future.
func (f *futureImpl) Release() {
// block until ready to release the future.
f.blockUntilReady()

guard := f.state.LockForRelease()
if guard == nil {
return
}
defer guard.Unlock()

// release the future.
getCGOCaller().call("future_destroy", func() {
C.future_destroy(f.future)
})
}

// cancel the future with error.
func (f *futureImpl) cancel(err error) {
if !f.state.checkUnready() {
// only unready future can be canceled.
// a ready future' cancel make no sense.
// only unready future can be canceled.
guard := f.state.LockForCancel()
if guard == nil {
return
}
defer guard.Unlock()

if errors.IsAny(err, context.DeadlineExceeded, context.Canceled) {
getCGOCaller().call("future_cancel", func() {
Expand All @@ -169,8 +182,9 @@ func (f *futureImpl) cancel(err error) {
panic("unreachable: invalid cancel error type")
}

// blockUntilReady block until the future is ready or canceled.
func (f *futureImpl) blockUntilReady() {
if !f.state.checkUnready() {
if !f.state.CheckUnready() {
// only unready future should be block until ready.
return
}
Expand All @@ -183,10 +197,7 @@ func (f *futureImpl) blockUntilReady() {
mu.Lock()

// mark the future as ready at go side to avoid more cgo calls.
f.state.intoReady()
f.state.IntoReady()
// notify the future manager that the future is ready.
f.ctxCancel()
if f.opts.releaser != nil {
f.releaserOnce.Do(f.opts.releaser)
}
}
26 changes: 26 additions & 0 deletions internal/util/cgo/futures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,32 @@ func TestMain(m *testing.M) {
}
}

func TestFutureWithConcurrentReleaseAndCancel(t *testing.T) {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
future := createFutureWithTestCase(context.Background(), testCase{
interval: 100 * time.Millisecond,
loopCnt: 10,
caseNo: 100,
})
wg.Add(3)
// Double release should be ok.
go func() {
defer wg.Done()
future.Release()
}()
go func() {
defer wg.Done()
future.Release()
}()
go func() {
defer wg.Done()
future.cancel(context.DeadlineExceeded)
}()
}
wg.Wait()
}

func TestFutureWithSuccessCase(t *testing.T) {
// Test success case.
future := createFutureWithTestCase(context.Background(), testCase{
Expand Down
14 changes: 2 additions & 12 deletions internal/util/cgo/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,17 @@ package cgo

func getDefaultOpt() *options {
return &options{
name: "unknown",
releaser: nil,
name: "unknown",
}
}

type options struct {
name string
releaser func()
name string
}

// Opt is the option type for future.
type Opt func(*options)

// WithReleaser sets the releaser function.
// When a future is ready, the releaser function will be called once.
func WithReleaser(releaser func()) Opt {
return func(o *options) {
o.releaser = releaser
}
}

// WithName sets the name of the future.
// Only used for metrics.
func WithName(name string) Opt {
Expand Down
87 changes: 74 additions & 13 deletions internal/util/cgo/state.go
Original file line number Diff line number Diff line change
@@ -1,38 +1,99 @@
package cgo

import "go.uber.org/atomic"
import (
"sync"
)

const (
stateUnready int32 = iota
stateUnready state = iota
stateReady
stateConsumed
stateDestoryed
)

// newFutureState creates a new futureState.
func newFutureState() futureState {
return futureState{
inner: atomic.NewInt32(stateUnready),
mu: sync.Mutex{},
inner: stateUnready,
}
}

type state int32

// futureState is a state machine for future.
// unready --BlockUntilReady--> ready --BlockAndLeakyGet--> consumed
type futureState struct {
inner *atomic.Int32
mu sync.Mutex
inner state
}

// intoReady sets the state to ready.
func (s *futureState) intoReady() {
s.inner.CompareAndSwap(stateUnready, stateReady)
// LockForCancel locks the state for cancel.
func (s *futureState) LockForCancel() *lockGuard {
s.mu.Lock()
// only unready future can be canceled.
// cancel on a ready future make no sense.
if s.inner != stateUnready {
s.mu.Unlock()
return nil
}
return &lockGuard{
locker: s,
target: stateUnready,
}
}

// intoConsumed sets the state to consumed.
// if the state is not ready, it does nothing and returns false.
func (s *futureState) intoConsumed() bool {
return s.inner.CompareAndSwap(stateReady, stateConsumed)
// LockForConsume locks the state for consume.
func (s *futureState) LockForConsume() *lockGuard {
s.mu.Lock()
if s.inner != stateReady {
s.mu.Unlock()
return nil
}
return &lockGuard{
locker: s,
target: stateConsumed,
}
}

// LockForRelease locks the state for release.
func (s *futureState) LockForRelease() *lockGuard {
s.mu.Lock()
if s.inner != stateReady && s.inner != stateConsumed {
s.mu.Unlock()
return nil
}
return &lockGuard{
locker: s,
target: stateDestoryed,
}
}

// checkUnready checks if the state is unready.
func (s *futureState) checkUnready() bool {
return s.inner.Load() == stateUnready
func (s *futureState) CheckUnready() bool {
s.mu.Lock()
defer s.mu.Unlock()

return s.inner == stateUnready
}

// IntoReady changes the state to ready.
func (s *futureState) IntoReady() {
s.mu.Lock()
if s.inner == stateUnready {
s.inner = stateReady
}
s.mu.Unlock()
}

// lockGuard is a guard for futureState.
type lockGuard struct {
locker *futureState
target state
}

// Unlock unlocks the state.
func (lg *lockGuard) Unlock() {
lg.locker.inner = lg.target
lg.locker.mu.Unlock()
}
Loading