Skip to content

Commit

Permalink
Fix send/receive synchronization race condition (#12)
Browse files Browse the repository at this point in the history
* Fix send/receive synchronization race condition

* restore cleanup and submitJob name

* Rename submitJob

* Add locking of job submission to single-thread it

* Add some logging

* Fix premature dequeuing issue

* Remove extra logging

* Remove submit locks

* Remove drain debug logging

* rename constants and change queue size to 8
  • Loading branch information
toli-belo authored May 10, 2019
1 parent 6460f71 commit ec987fb
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 31 deletions.
70 changes: 43 additions & 27 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ var (
)

const (
WORK_HANDLE_DELAY_MS = 5 // milliseconds delay for re-try processing of work completion requests if handler hasn't been yet stored in hash map.
WorkHandleDelay = 5 // milliseconds delay for re-try processing of work completion requests if handler hasn't been yet stored in hash map.
InProgressQueueSize = 8
)

type connection struct {
Expand All @@ -39,8 +40,8 @@ type connection struct {
}

type channels struct {
outbound chan *request
expected chan *Response
inProgress chan *request
outbound chan *request
}

type ConnCloseHandler func(conn net.Conn) (err error)
Expand Down Expand Up @@ -165,12 +166,10 @@ func NewClient(connCloseHandler ConnCloseHandler,
addr := conn.RemoteAddr()

client = &Client{
net: addr.Network(),
addr: addr.String(),
conn: &connection{Conn: conn},
chans: &channels{
expected: make(chan *Response),
outbound: make(chan *request)},
net: addr.Network(),
addr: addr.String(),
conn: &connection{Conn: conn},
chans: &channels{outbound: make(chan *request), inProgress: make(chan *request, InProgressQueueSize)},
ResponseTimeout: DefaultTimeout,
responsePool: &sync.Pool{New: func() interface{} { return &Response{} }},
requestPool: &sync.Pool{New: func() interface{} { return &request{} }},
Expand Down Expand Up @@ -217,6 +216,7 @@ func (client *Client) writeLoop() {

conn := client.loadConn()
if conn == nil {
req.close()
client.requestPool.Put(req)
return
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func (client *Client) writeLoop() {
}
}

client.requestPool.Put(req)
chans.inProgress <- req
}
}

Expand Down Expand Up @@ -301,7 +301,8 @@ func (client *Client) reconnect(err error) error {
}

oldChans := client.loadChans()
close(oldChans.expected)
close(oldChans.inProgress)
client.drainInProgress()
close(oldChans.outbound)

conn, err := client.connOpenHandler()
Expand All @@ -321,14 +322,26 @@ func (client *Client) reconnect(err error) error {
// replace closed channels with new ones
_ = (*channels)(atomic.SwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&client.chans)),
unsafe.Pointer(&channels{expected: make(chan *Response), outbound: make(chan *request)})))
unsafe.Pointer(&channels{outbound: make(chan *request), inProgress: make(chan *request, InProgressQueueSize)})))

go client.readLoop()
go client.writeLoop()

return nil
}

func (client *Client) drainInProgress() {
defer func() {
recover()
}()

for req := range client.chans.inProgress {
req.close()
client.requestPool.Put(req) // recycle here since it didn't get to be processed
}

}

func (client *Client) loadConn() *connection {
return (*connection)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&client.conn))))
}
Expand Down Expand Up @@ -407,6 +420,7 @@ func (client *Client) readLoop() {
}

client.process(resp)

}

}
Expand All @@ -417,13 +431,14 @@ func (client *Client) process(resp *Response) {
// terminally should return it here.
switch resp.DataType {
case rt.PT_Error:

client.err(getError(resp.Data))

client.loadChans().expected <- resp

fallthrough
case rt.PT_StatusRes, rt.PT_JobCreated, rt.PT_EchoRes:
client.loadChans().expected <- resp
req := <-client.loadChans().inProgress
// recycle the request object, it's 2nd life has ended
req.expected <- resp
req.close()
client.requestPool.Put(req)
case rt.PT_WorkComplete, rt.PT_WorkFail, rt.PT_WorkException:
defer client.handlers.Delete(resp.Handle)
fallthrough
Expand All @@ -434,7 +449,7 @@ func (client *Client) process(resp *Response) {
var ok bool
if handler, ok = client.handlers.Load(resp.Handle); !ok {
// possibly the response arrived faster than the job handler was added to client.handlers, we'll wait a bit and give it another try
time.Sleep(WORK_HANDLE_DELAY_MS * time.Millisecond)
time.Sleep(WorkHandleDelay * time.Millisecond)
if handler, ok = client.handlers.Load(resp.Handle); !ok {
client.err(errors.New(fmt.Sprintf("unexpected %s response for \"%s\" with no handler", resp.DataType, resp.Handle)))
}
Expand Down Expand Up @@ -463,7 +478,7 @@ func (client *Client) request() *request {
return client.requestPool.Get().(*request)
}

func (client *Client) submit(pt rt.PT, funcname string, payload []byte) (handle string, err error) {
func (client *Client) submit(reqType rt.PT, funcname string, payload []byte) (handle string, err error) {

defer func() {
if e := safeCastError(recover(), "panic in submit()"); e != nil {
Expand All @@ -472,9 +487,10 @@ func (client *Client) submit(pt rt.PT, funcname string, payload []byte) (handle
}()

chans := client.loadChans()
chans.outbound <- client.request().submitJob(pt, funcname, IdGen.Id(), payload)
req := client.request().submitJob(reqType, funcname, IdGen.Id(), payload)
chans.outbound <- req

if res := <-chans.expected; res != nil {
if res := <-req.expected; res != nil {
var err error
if res.DataType == rt.PT_Error {
err = getError(res.Data)
Expand Down Expand Up @@ -582,17 +598,17 @@ func (client *Client) Status(handle string) (status *Status, err error) {
}()

chans := client.loadChans()
chans.outbound <- client.request().status(handle)
req := client.request().status(handle)
chans.outbound <- req

res := <-chans.expected
res := <-req.expected

if res == nil {
return nil, errors.New("Status response queue is empty, please resend")
}
status, err = res.Status()

client.responsePool.Put(res)

return
}

Expand All @@ -606,16 +622,16 @@ func (client *Client) Echo(data []byte) (echo []byte, err error) {
}()

chans := client.loadChans()
chans.outbound <- client.request().echo(data)
req := client.request().echo(data)
chans.outbound <- req

res := <-chans.expected
res := <-req.expected

if res == nil {
return nil, errors.New("Echo request got empty response, please resend")
}

echo = res.Data

client.responsePool.Put(res)

return
Expand Down
16 changes: 12 additions & 4 deletions client/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,25 @@ import (

// Request from client
type request struct {
pt rt.PT
data [][]byte
pt rt.PT
expected chan *Response
data [][]byte
}

func (req *request) close() {
if req.expected != nil {
close(req.expected)
}
}

func (req *request) args(args ...[]byte) {
req.data = req.data[:0]
req.data = append(req.data, args...)
req.expected = make(chan *Response, 1)
}

func (req *request) submitJob(pt rt.PT, funcname, id string, arg []byte) *request {
req.pt = pt
func (req *request) submitJob(reqType rt.PT, funcname, id string, arg []byte) *request {
req.pt = reqType

req.args([]byte(funcname), []byte(id), arg)

Expand Down

0 comments on commit ec987fb

Please sign in to comment.