diff --git a/client/client.go b/client/client.go index d0c384c..d33f74f 100644 --- a/client/client.go +++ b/client/client.go @@ -25,7 +25,7 @@ var ( ) const ( - WorkHandleDelay = 5 // milliseconds delay for re-try processing of work completion requests if handler hasn't been yet stored in hash map. + WorkHandleTimeoutMs = 500 // milliseconds delay for re-try processing of work completion requests if handler hasn't been yet stored in hash map. InProgressQueueSize = 8 ) @@ -52,7 +52,7 @@ type ConnOpenHandler func() (conn net.Conn, err error) type Client struct { reconnectState uint32 net, addr string - handlers sync.Map + handlers *HandlerMap conn *connection //rw *bufio.ReadWriter chans *channels @@ -169,6 +169,7 @@ func NewClient(connCloseHandler ConnCloseHandler, net: addr.Network(), addr: addr.String(), conn: &connection{Conn: conn}, + handlers: NewHandlerMap(), chans: &channels{outbound: make(chan *request), inProgress: make(chan *request, InProgressQueueSize)}, ResponseTimeout: DefaultTimeout, responsePool: &sync.Pool{New: func() interface{} { return &Response{} }}, @@ -443,22 +444,16 @@ func (client *Client) process(resp *Response) { // These alternate conditions should not happen so long as // everyone is following the specification. var handler interface{} - var ok bool - if handler, ok = client.handlers.Load(resp.Handle); !ok { - // possibly the response arrived faster than the job handler was added to client.handlers, we'll wait a bit and give it another try - time.Sleep(WorkHandleDelay * time.Millisecond) - if handler, ok = client.handlers.Load(resp.Handle); !ok { - client.err(errors.New(fmt.Sprintf("unexpected %s response for \"%s\" with no handler", resp.DataType, resp.Handle))) - } - } - if ok { + var ok = false + if handler, ok = client.handlers.Get(resp.Handle, WorkHandleTimeoutMs); !ok { + client.err(errors.New(fmt.Sprintf("unexpected %s response for \"%s\" with no handler", resp.DataType, resp.Handle))) + } else { if h, ok := handler.(ResponseHandler); ok { h(resp) } else { client.err(errors.New(fmt.Sprintf("Could not cast handler to ResponseHandler for %v", resp.Handle))) } } - client.responsePool.Put(resp) } } @@ -521,7 +516,7 @@ func (client *Client) Do(funcname string, payload []byte, handle, err = client.submit(pt, funcname, payload) - client.handlers.Store(handle, h) + client.handlers.Put(handle, h) return } diff --git a/client/handler_map.go b/client/handler_map.go new file mode 100644 index 0000000..2304007 --- /dev/null +++ b/client/handler_map.go @@ -0,0 +1,115 @@ +package client + +import ( + "container/list" + "golang.org/x/net/context" + + "sync" + "time" +) + +type HandlerMap struct { + mu sync.Mutex + innerMap map[string]ResponseHandler + waitersMap map[string]*list.List +} + +type waiter struct { + ready chan<- struct{} // Closed when semaphore acquired. +} + +func NewHandlerMap() *HandlerMap { + return &HandlerMap{sync.Mutex{}, + make(map[string]ResponseHandler, 100), + make(map[string]*list.List, 100), + } +} + +func (m *HandlerMap) GetCounts() (counts int, waiters int) { + m.mu.Lock() + defer m.mu.Unlock() + return len(m.innerMap), len(m.waitersMap) +} + +func (m *HandlerMap) Put(key string, value ResponseHandler) { + m.mu.Lock() + defer m.mu.Unlock() + m.innerMap[key] = value + // signal to any waiters here + if waiters, ok := m.waitersMap[key]; ok { + for { + next := waiters.Front() + if next == nil { + break // No more waiters blocked. + } + w := next.Value.(waiter) + waiters.Remove(next) + close(w.ready) + } + delete(m.waitersMap, key) + } +} + +func (m *HandlerMap) Delete(key string) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.innerMap, key) +} + +func (m *HandlerMap) Get(key string, timeoutMs int) (value ResponseHandler, ok bool) { + m.mu.Lock() + + // optimistic check first + value, ok = m.innerMap[key] + if ok { + m.mu.Unlock() + return + } + + // let's remember the current time + curTime := time.Now() + maxTime := curTime.Add(time.Duration(timeoutMs) * time.Millisecond) + + for time.Now().Before(maxTime) && !ok { + value, ok = m.innerMap[key] + if !ok { + nsLeft := maxTime.Sub(time.Now()).Nanoseconds() + ctx, _ := context.WithTimeout(context.Background(), time.Duration(nsLeft)*time.Nanosecond) + + waiters, wok := m.waitersMap[key] + if !wok { + waiters = &list.List{} + m.waitersMap[key] = waiters + } + ready := make(chan struct{}) + w := waiter{ready: ready} + elem := waiters.PushBack(w) + m.mu.Unlock() // unlock before we start waiting on stuff + + select { + case <-ctx.Done(): + m.mu.Lock() + select { + case <-ready: + // in case we got signalled during cancellation + continue + default: + // we got timeout, let's remove + waiters.Remove(elem) + if waiters.Len() == 0 { + delete(m.waitersMap, key) + } + } + m.mu.Unlock() + return + + case <-ready: + m.mu.Lock() // going back to the loop, gotta lock + continue + } + } + } + + m.mu.Unlock() + return +} diff --git a/client/handler_map_test.go b/client/handler_map_test.go new file mode 100644 index 0000000..1ac036a --- /dev/null +++ b/client/handler_map_test.go @@ -0,0 +1,100 @@ +package client + +import ( + "github.com/stretchr/testify/assert" + "math" + "testing" + "time" +) + +const ( + testKey = "test_key" + timeoutMs = 200 + marginErrorPct = 10 +) + +func getMsSince(startTime time.Time) int { + return int(time.Now().Sub(startTime).Nanoseconds() / 1e6) +} + +func TestHandlerMapEarlyStoreRetrieve(t *testing.T) { + + handler_map := NewHandlerMap() + var handler ResponseHandler = func(*Response) { + t.Logf("test: got a response \n") + } + handler_map.Put(testKey, handler) + myHandler, ok := handler_map.Get(testKey, timeoutMs) + if !ok { + t.Error("Failed to get test key") + } + myHandler(nil) + +} + +func TestHandlerMapDelayedPutRetrieve(t *testing.T) { + + handler_map := NewHandlerMap() + startTime := time.Now() + expectedResponseMs := timeoutMs / 2 + + go func() { + time.Sleep(time.Duration(expectedResponseMs) * time.Millisecond) + + // at this point the Get would be waiting for the response. + counts, waiters := handler_map.GetCounts() + assert.Equal(t, 0, counts, "Map Elements") + assert.Equal(t, 1, waiters, "Waiter groups") + + var handler ResponseHandler = func(*Response) { + t.Logf("test: got a response at time %d ms after start\n", getMsSince(startTime)) + } + handler_map.Put(testKey, handler) + }() + + t.Logf("test: started waiting for key at %d ms after start\n", getMsSince(startTime)) + myHandler, ok := handler_map.Get(testKey, timeoutMs) + if !ok { + t.Error("Failed to get test key") + } else { + myHandler(nil) + actualResponseMs := getMsSince(startTime) + var comp assert.Comparison = func() (success bool) { + return math.Abs(float64(actualResponseMs-expectedResponseMs))/float64(expectedResponseMs) < float64(marginErrorPct)/100 + } + assert.Condition(t, comp, "Response did not arrive within %d%% margin, expected time %d ms", marginErrorPct, expectedResponseMs) + + } + +} + +func TestHandlerMapTimeoutPutTooLate(t *testing.T) { + + handler_map := NewHandlerMap() + startTime := time.Now() + + go func() { + time.Sleep(2 * timeoutMs * time.Millisecond) + handler_map.Put(testKey, func(*Response) {}) + }() + + t.Logf("test: started waiting for key at %d ms after start\n", getMsSince(startTime)) + _, ok := handler_map.Get(testKey, timeoutMs) + if ok { + t.Error("Should have timed out when getting the key") + return + } else { + actualTimeoutMs := getMsSince(startTime) + t.Logf("test: timed out waiting for key at %d ms after start\n", actualTimeoutMs) + var comp assert.Comparison = func() (success bool) { + return math.Abs(float64(actualTimeoutMs-timeoutMs))/timeoutMs < float64(marginErrorPct)/100 + } + assert.Condition(t, comp, "Timeout did not occur within %d%% margin, expected timeout ms: %d", marginErrorPct, timeoutMs) + // wait till producer has added the element + time.Sleep(3 * timeoutMs * time.Millisecond) + counts, waiters := handler_map.GetCounts() + assert.Equal(t, 1, counts, "Map elements") + assert.Equal(t, 0, waiters, "Waiter groups") + } + +} diff --git a/go.mod b/go.mod index b787c9e..417e50b 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,8 @@ require ( github.com/spf13/pflag v1.0.1 github.com/stretchr/testify v1.3.0 github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d + golang.org/x/net v0.0.0-20180906233101-161cd47e91fd + golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect gopkg.in/robfig/cron.v2 v2.0.0-20150107220207-be2e0b0deed5