Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: pending waiter #497

Merged
merged 5 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*.dylib
debug
bin/
logs/

# Test binary, build with `go test -c`
*.test
Expand Down
2 changes: 1 addition & 1 deletion pending/pendinghandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type (
)

// NewHandler new instance of PendingHandler
func NewHandler(size int) Handler {
func NewHandler() Handler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sizedoes not matter 😉

return Handler{
cond: sync.NewCond(&sync.Mutex{}),
}
Expand Down
61 changes: 17 additions & 44 deletions session/resthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/qlik-oss/gopherciser/globals/constant"
"github.com/qlik-oss/gopherciser/helpers"
"github.com/qlik-oss/gopherciser/logger"
"github.com/qlik-oss/gopherciser/pending"
"github.com/qlik-oss/gopherciser/runid"
"github.com/qlik-oss/gopherciser/statistics"
"github.com/qlik-oss/gopherciser/version"
Expand All @@ -38,14 +39,13 @@ type (

// RestHandler handles waiting for pending requests and responses
RestHandler struct {
reqCounterCond *sync.Cond
reqCounter int
timeout time.Duration
Client *http.Client
trafficLogger enigma.TrafficLogger
headers *HeaderJar
virtualProxy string
ctx context.Context
timeout time.Duration
Client *http.Client
trafficLogger enigma.TrafficLogger
headers *HeaderJar
virtualProxy string
ctx context.Context
pending *pending.Handler
}

// RestRequest represents a REST request and its response
Expand Down Expand Up @@ -188,15 +188,14 @@ func addCustomHeaders(req *http.Request, logEntry *logger.LogEntry) {
}

// NewRestHandler new instance of RestHandler
func NewRestHandler(ctx context.Context, trafficLogger enigma.TrafficLogger, headerjar *HeaderJar, virtualProxy string, timeout time.Duration) *RestHandler {
func NewRestHandler(ctx context.Context, trafficLogger enigma.TrafficLogger, headerjar *HeaderJar, virtualProxy string, timeout time.Duration, pendingHandler *pending.Handler) *RestHandler {
return &RestHandler{
reqCounter: 0,
reqCounterCond: sync.NewCond(&sync.Mutex{}),
trafficLogger: trafficLogger,
headers: headerjar,
virtualProxy: virtualProxy,
timeout: timeout,
ctx: ctx,
trafficLogger: trafficLogger,
headers: headerjar,
virtualProxy: virtualProxy,
timeout: timeout,
ctx: ctx,
pending: pendingHandler,
}
}

Expand Down Expand Up @@ -229,32 +228,6 @@ func (method RestMethod) String() string {
return str
}

// WaitForPending uses double locking of mutex to wait until mutex is unlocked by
// loop listening for pending req/resp
func (handler *RestHandler) WaitForPending() {
handler.reqCounterCond.L.Lock()
for handler.reqCounter > 0 {
handler.reqCounterCond.Wait()
}
handler.reqCounterCond.L.Unlock()
}

// IncPending increase pending requests
func (handler *RestHandler) IncPending() {
handler.reqCounterCond.L.Lock()
handler.reqCounter++
handler.reqCounterCond.Broadcast()
handler.reqCounterCond.L.Unlock()
}

// DecPending increase finished requests
func (handler *RestHandler) DecPending(request *RestRequest) {
handler.reqCounterCond.L.Lock()
handler.reqCounter--
handler.reqCounterCond.Broadcast()
handler.reqCounterCond.L.Unlock()
}

// DefaultClient creates client instance with default client settings
func DefaultClient(allowUntrusted bool, state *State) (*http.Client, error) {
// todo client values are currently from http.DefaultTransport, should choose better values depending on
Expand Down Expand Up @@ -567,12 +540,12 @@ func (handler *RestHandler) QueueRequest(actionState *action.State, failOnError
// QueueRequestWithCallback Async request with callback, set warnOnError to log warning instead of registering error for request
func (handler *RestHandler) QueueRequestWithCallback(actionState *action.State, failOnError bool,
request *RestRequest, logEntry *logger.LogEntry, callback func(err error, req *RestRequest)) {
handler.IncPending()
handler.pending.IncPending()

startTS := time.Now()
go func() {
stall := time.Since(startTS)
defer handler.DecPending(request)
defer handler.pending.DecPending()
var errRequest error
var panicErr error
failRequest := func(err error) {
Expand Down
10 changes: 7 additions & 3 deletions session/resthandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/qlik-oss/gopherciser/action"
"github.com/qlik-oss/gopherciser/enigmahandlers"
"github.com/qlik-oss/gopherciser/logger"
"github.com/qlik-oss/gopherciser/pending"
"github.com/stretchr/testify/assert"
)

Expand All @@ -29,16 +30,19 @@ func TestResthandler(t *testing.T) {
defer ts.Close()

actionState := action.State{}
pendingHandler := pending.NewHandler()

restHandler := NewRestHandler(context.Background(), &enigmahandlers.TrafficLogger{}, NewHeaderJar(), "", 10*time.Second)
restHandler := NewRestHandler(context.Background(), &enigmahandlers.TrafficLogger{}, NewHeaderJar(), "", 10*time.Second, &pendingHandler)
restHandler.Client = http.DefaultClient
getRequest := RestRequest{
Method: GET,
ContentType: "application/json",
Destination: ts.URL,
}
restHandler.QueueRequest(&actionState, true, &getRequest, &logger.LogEntry{})
restHandler.WaitForPending()
timeOutContext, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
pendingHandler.WaitForPending(timeOutContext)

assert.Equal(t, "get!", string(getRequest.ResponseBody))

Expand All @@ -49,7 +53,7 @@ func TestResthandler(t *testing.T) {
Content: []byte("data!"),
}
restHandler.QueueRequest(&actionState, true, &postRequest, &logger.LogEntry{})
restHandler.WaitForPending()
pendingHandler.WaitForPending(timeOutContext)

assert.Equal(t, "data!", string(postRequest.Content))
}
Expand Down
24 changes: 9 additions & 15 deletions session/sessionstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,16 +202,13 @@ func newSessionState(ctx context.Context, outputsDir string, timeout time.Durati
sessionCtx, cancel := context.WithCancel(ctx)

state := &State{
Timeout: timeout,
ArtifactMap: NewArtifactMap(),
OutputsDir: outputsDir,
User: user,
HeaderJar: NewHeaderJar(),
VirtualProxy: virtualProxy,
// Buffer size for the pendingHandler has been chosen after evaluation tests towards sense
// with medium amount of objects in the sheets. Evaluation was done before introducing spinLoopPending
// in pendingHandler and could possibly be lowered, this would however require re-evaluation.
Pending: pending.NewHandler(32),
Timeout: timeout,
ArtifactMap: NewArtifactMap(),
OutputsDir: outputsDir,
User: user,
HeaderJar: NewHeaderJar(),
VirtualProxy: virtualProxy,
Pending: pending.NewHandler(),
RequestMetrics: &requestmetrics.RequestMetrics{},
Counters: counters,
customStates: make(map[string]interface{}),
Expand All @@ -222,7 +219,7 @@ func newSessionState(ctx context.Context, outputsDir string, timeout time.Durati
events: make(map[int]*Event),
reconnect: ReconnectInfo{
reconnectFunc: nil,
pendingReconnection: pending.NewHandler(32),
pendingReconnection: pending.NewHandler(),
},
}

Expand Down Expand Up @@ -286,7 +283,7 @@ func (state *State) SetLogEntry(entry *logger.LogEntry) {
state.trafficLogger = enigmahandlers.NewTrafficRequestCounter(state.Counters)
}

state.Rest = NewRestHandler(state.ctx, state.trafficLogger, state.HeaderJar, state.VirtualProxy, state.Timeout)
state.Rest = NewRestHandler(state.ctx, state.trafficLogger, state.HeaderJar, state.VirtualProxy, state.Timeout, &state.Pending)
}

// TrafficLogger returns the current trafficLogger
Expand Down Expand Up @@ -336,9 +333,6 @@ func (state *State) IsAbortTriggered() bool {
// Wait for all pending requests to finish, returns true if action state has been marked as failed
func (state *State) Wait(actionState *action.State) bool {
state.Pending.WaitForPending(state.ctx)
if state.Rest != nil {
state.Rest.WaitForPending()
}
return actionState.Failed
}

Expand Down
42 changes: 41 additions & 1 deletion session/sessionstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/goccy/go-json"
"github.com/pkg/errors"
Expand Down Expand Up @@ -213,7 +214,7 @@ func TestState_SessionVariables(t *testing.T) {
func setupStateForCLTest() (*State, *eventCounter, *eventCounter, *eventCounter, *eventCounter) {
counters := &statistics.ExecutionCounters{}
state := New(context.Background(), "", 60, nil, 1, 1, "", false, counters)
state.Rest = NewRestHandler(state.ctx, state.trafficLogger, state.HeaderJar, state.VirtualProxy, state.Timeout)
state.Rest = NewRestHandler(state.ctx, state.trafficLogger, state.HeaderJar, state.VirtualProxy, state.Timeout, &state.Pending)

event0 := registerEvent(state, 0)
event1 := registerEvent(state, 1)
Expand Down Expand Up @@ -403,3 +404,42 @@ func TestStateTemplateArtifactMap(t *testing.T) {
})

}

func TestPendingWaiter(t *testing.T) {
counters := &statistics.ExecutionCounters{}
state := New(context.Background(), "", 120, nil, 1, 1, "", false, counters)
Comment on lines +408 to +410
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this simple test break the old one? If so, good! If not it would be a good idea to create such a test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I created it as a failing test with old one, and doesn't fail with new implementation

state.Rest = NewRestHandler(state.ctx, state.trafficLogger, state.HeaderJar, state.VirtualProxy, state.Timeout, &state.Pending)
actionState := &action.State{}

firstDone := false
secondDone := false
thirdDone := false

state.QueueRequest(func(ctx context.Context) error {
<-time.After(50 * time.Millisecond)
firstDone = true
return nil
}, actionState, true, "")

state.Rest.QueueRequestWithCallback(actionState, true, nil, state.LogEntry, func(err error, req *RestRequest) {
<-time.After(100 * time.Millisecond)
secondDone = true
state.QueueRequest(func(ctx context.Context) error {
<-time.After(500 * time.Millisecond)
thirdDone = true
return nil
}, actionState, true, "")
})

state.Wait(actionState)

if !firstDone {
t.Error("first request not waited for")
}
if !secondDone {
t.Error("second request not waited for")
}
if !thirdDone {
t.Error("third request not waited for")
}
}