Skip to content

Commit

Permalink
Sync/Wait for processing handler (#14)
Browse files Browse the repository at this point in the history
Use a key-specific wait for delayed additions into the handler map.
  • Loading branch information
toli-belo authored Aug 16, 2019
1 parent 10982be commit fb8c16d
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 13 deletions.
21 changes: 8 additions & 13 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
)

const (
WorkHandleDelay = 5 // milliseconds delay for re-try processing of work completion requests if handler hasn't been yet stored in hash map.
WorkHandleTimeoutMs = 500 // milliseconds delay for re-try processing of work completion requests if handler hasn't been yet stored in hash map.
InProgressQueueSize = 8
)

Expand All @@ -52,7 +52,7 @@ type ConnOpenHandler func() (conn net.Conn, err error)
type Client struct {
reconnectState uint32
net, addr string
handlers sync.Map
handlers *HandlerMap
conn *connection
//rw *bufio.ReadWriter
chans *channels
Expand Down Expand Up @@ -169,6 +169,7 @@ func NewClient(connCloseHandler ConnCloseHandler,
net: addr.Network(),
addr: addr.String(),
conn: &connection{Conn: conn},
handlers: NewHandlerMap(),
chans: &channels{outbound: make(chan *request), inProgress: make(chan *request, InProgressQueueSize)},
ResponseTimeout: DefaultTimeout,
responsePool: &sync.Pool{New: func() interface{} { return &Response{} }},
Expand Down Expand Up @@ -443,22 +444,16 @@ func (client *Client) process(resp *Response) {
// These alternate conditions should not happen so long as
// everyone is following the specification.
var handler interface{}
var ok bool
if handler, ok = client.handlers.Load(resp.Handle); !ok {
// possibly the response arrived faster than the job handler was added to client.handlers, we'll wait a bit and give it another try
time.Sleep(WorkHandleDelay * time.Millisecond)
if handler, ok = client.handlers.Load(resp.Handle); !ok {
client.err(errors.New(fmt.Sprintf("unexpected %s response for \"%s\" with no handler", resp.DataType, resp.Handle)))
}
}
if ok {
var ok = false
if handler, ok = client.handlers.Get(resp.Handle, WorkHandleTimeoutMs); !ok {
client.err(errors.New(fmt.Sprintf("unexpected %s response for \"%s\" with no handler", resp.DataType, resp.Handle)))
} else {
if h, ok := handler.(ResponseHandler); ok {
h(resp)
} else {
client.err(errors.New(fmt.Sprintf("Could not cast handler to ResponseHandler for %v", resp.Handle)))
}
}

client.responsePool.Put(resp)
}
}
Expand Down Expand Up @@ -521,7 +516,7 @@ func (client *Client) Do(funcname string, payload []byte,

handle, err = client.submit(pt, funcname, payload)

client.handlers.Store(handle, h)
client.handlers.Put(handle, h)

return
}
Expand Down
115 changes: 115 additions & 0 deletions client/handler_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package client

import (
"container/list"
"golang.org/x/net/context"

"sync"
"time"
)

type HandlerMap struct {
mu sync.Mutex
innerMap map[string]ResponseHandler
waitersMap map[string]*list.List
}

type waiter struct {
ready chan<- struct{} // Closed when semaphore acquired.
}

func NewHandlerMap() *HandlerMap {
return &HandlerMap{sync.Mutex{},
make(map[string]ResponseHandler, 100),
make(map[string]*list.List, 100),
}
}

func (m *HandlerMap) GetCounts() (counts int, waiters int) {
m.mu.Lock()
defer m.mu.Unlock()
return len(m.innerMap), len(m.waitersMap)
}

func (m *HandlerMap) Put(key string, value ResponseHandler) {
m.mu.Lock()
defer m.mu.Unlock()
m.innerMap[key] = value
// signal to any waiters here
if waiters, ok := m.waitersMap[key]; ok {
for {
next := waiters.Front()
if next == nil {
break // No more waiters blocked.
}
w := next.Value.(waiter)
waiters.Remove(next)
close(w.ready)
}
delete(m.waitersMap, key)
}
}

func (m *HandlerMap) Delete(key string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.innerMap, key)
}

func (m *HandlerMap) Get(key string, timeoutMs int) (value ResponseHandler, ok bool) {
m.mu.Lock()

// optimistic check first
value, ok = m.innerMap[key]
if ok {
m.mu.Unlock()
return
}

// let's remember the current time
curTime := time.Now()
maxTime := curTime.Add(time.Duration(timeoutMs) * time.Millisecond)

for time.Now().Before(maxTime) && !ok {
value, ok = m.innerMap[key]
if !ok {
nsLeft := maxTime.Sub(time.Now()).Nanoseconds()
ctx, _ := context.WithTimeout(context.Background(), time.Duration(nsLeft)*time.Nanosecond)

waiters, wok := m.waitersMap[key]
if !wok {
waiters = &list.List{}
m.waitersMap[key] = waiters
}
ready := make(chan struct{})
w := waiter{ready: ready}
elem := waiters.PushBack(w)
m.mu.Unlock() // unlock before we start waiting on stuff

select {
case <-ctx.Done():
m.mu.Lock()
select {
case <-ready:
// in case we got signalled during cancellation
continue
default:
// we got timeout, let's remove
waiters.Remove(elem)
if waiters.Len() == 0 {
delete(m.waitersMap, key)
}
}
m.mu.Unlock()
return

case <-ready:
m.mu.Lock() // going back to the loop, gotta lock
continue
}
}
}

m.mu.Unlock()
return
}
100 changes: 100 additions & 0 deletions client/handler_map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package client

import (
"github.com/stretchr/testify/assert"
"math"
"testing"
"time"
)

const (
testKey = "test_key"
timeoutMs = 200
marginErrorPct = 10
)

func getMsSince(startTime time.Time) int {
return int(time.Now().Sub(startTime).Nanoseconds() / 1e6)
}

func TestHandlerMapEarlyStoreRetrieve(t *testing.T) {

handler_map := NewHandlerMap()
var handler ResponseHandler = func(*Response) {
t.Logf("test: got a response \n")
}
handler_map.Put(testKey, handler)
myHandler, ok := handler_map.Get(testKey, timeoutMs)
if !ok {
t.Error("Failed to get test key")
}
myHandler(nil)

}

func TestHandlerMapDelayedPutRetrieve(t *testing.T) {

handler_map := NewHandlerMap()
startTime := time.Now()
expectedResponseMs := timeoutMs / 2

go func() {
time.Sleep(time.Duration(expectedResponseMs) * time.Millisecond)

// at this point the Get would be waiting for the response.
counts, waiters := handler_map.GetCounts()
assert.Equal(t, 0, counts, "Map Elements")
assert.Equal(t, 1, waiters, "Waiter groups")

var handler ResponseHandler = func(*Response) {
t.Logf("test: got a response at time %d ms after start\n", getMsSince(startTime))
}
handler_map.Put(testKey, handler)
}()

t.Logf("test: started waiting for key at %d ms after start\n", getMsSince(startTime))
myHandler, ok := handler_map.Get(testKey, timeoutMs)
if !ok {
t.Error("Failed to get test key")
} else {
myHandler(nil)
actualResponseMs := getMsSince(startTime)
var comp assert.Comparison = func() (success bool) {
return math.Abs(float64(actualResponseMs-expectedResponseMs))/float64(expectedResponseMs) < float64(marginErrorPct)/100
}
assert.Condition(t, comp, "Response did not arrive within %d%% margin, expected time %d ms", marginErrorPct, expectedResponseMs)

}

}

func TestHandlerMapTimeoutPutTooLate(t *testing.T) {

handler_map := NewHandlerMap()
startTime := time.Now()

go func() {
time.Sleep(2 * timeoutMs * time.Millisecond)
handler_map.Put(testKey, func(*Response) {})
}()

t.Logf("test: started waiting for key at %d ms after start\n", getMsSince(startTime))
_, ok := handler_map.Get(testKey, timeoutMs)
if ok {
t.Error("Should have timed out when getting the key")
return
} else {
actualTimeoutMs := getMsSince(startTime)
t.Logf("test: timed out waiting for key at %d ms after start\n", actualTimeoutMs)
var comp assert.Comparison = func() (success bool) {
return math.Abs(float64(actualTimeoutMs-timeoutMs))/timeoutMs < float64(marginErrorPct)/100
}
assert.Condition(t, comp, "Timeout did not occur within %d%% margin, expected timeout ms: %d", marginErrorPct, timeoutMs)
// wait till producer has added the element
time.Sleep(3 * timeoutMs * time.Millisecond)
counts, waiters := handler_map.GetCounts()
assert.Equal(t, 1, counts, "Map elements")
assert.Equal(t, 0, waiters, "Waiter groups")
}

}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ require (
github.com/spf13/pflag v1.0.1
github.com/stretchr/testify v1.3.0
github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect
gopkg.in/robfig/cron.v2 v2.0.0-20150107220207-be2e0b0deed5
Expand Down

0 comments on commit fb8c16d

Please sign in to comment.