From 86041e76cca7342af419bcf4e1b8b1f4f954a36c Mon Sep 17 00:00:00 2001 From: Paul Kim Date: Mon, 10 Jan 2022 16:03:29 +0900 Subject: [PATCH 1/5] feat: remove concurrent query cache mutex --- bin/v0.34.x/rpc/cache.go | 81 ++++++++-------------------------------- 1 file changed, 15 insertions(+), 66 deletions(-) diff --git a/bin/v0.34.x/rpc/cache.go b/bin/v0.34.x/rpc/cache.go index ac09e22..cbe1846 100644 --- a/bin/v0.34.x/rpc/cache.go +++ b/bin/v0.34.x/rpc/cache.go @@ -2,11 +2,10 @@ package rpc import ( "fmt" + lru "github.com/hashicorp/golang-lru" "net/http" "net/http/httptest" "sync" - - lru "github.com/hashicorp/golang-lru" ) type ResponseCache struct { @@ -20,11 +19,7 @@ type CacheBackend struct { cacheServeCount uint64 serveCount uint64 cacheType string - mtx *sync.RWMutex - - // subscribe to cache for same request URI - resultChan map[string]chan *ResponseCache - subscribeCount map[string]int + mtx *sync.Mutex } func NewCacheBackend(cacheSize int, cacheType string) *CacheBackend { @@ -40,22 +35,17 @@ func NewCacheBackend(cacheSize int, cacheType string) *CacheBackend { cacheServeCount: 0, serveCount: 0, cacheType: cacheType, - mtx: new(sync.RWMutex), - resultChan: make(map[string]chan *ResponseCache), - subscribeCount: make(map[string]int), + mtx: new(sync.Mutex), } } -func (cb *CacheBackend) Set(cacheKey string, status int, body []byte) *ResponseCache { - response := &ResponseCache{ +func (cb *CacheBackend) Set(cacheKey string, status int, body []byte) { + if evicted := cb.lru.Add(cacheKey, &ResponseCache{ status: status, body: body, - } - if evicted := cb.lru.Add(cacheKey, response); evicted != false { + }); evicted != false { cb.evictionCount++ } - - return response } func (cb *CacheBackend) Get(cacheKey string) *ResponseCache { @@ -84,8 +74,6 @@ func (cb *CacheBackend) Purge() { cb.evictionCount = 0 cb.cacheServeCount = 0 cb.serveCount = 0 - cb.resultChan = make(map[string]chan *ResponseCache) - cb.subscribeCount = make(map[string]int) cb.mtx.Unlock() } @@ -94,11 +82,9 @@ func (cb *CacheBackend) HandleCachedHTTP(writer http.ResponseWriter, request *ht cb.serveCount++ cb.mtx.Unlock() - uri := request.URL.String() - - // see if this request is already made, and in transit // set response type as json writer.Header().Set("Content-Type", "application/json") + writer.Header().Set("Connection", "close") cached := cb.Get(request.URL.String()) // if cached, return as is @@ -112,52 +98,15 @@ func (cb *CacheBackend) HandleCachedHTTP(writer http.ResponseWriter, request *ht return } - cb.mtx.Lock() - resChan, isInTransit := cb.resultChan[uri] - - // if isInTransit is false, this is the first time we're processing this query - // run actual querier - if !isInTransit { - cb.resultChan[uri] = make(chan *ResponseCache) - cb.subscribeCount[uri] = 0 - cb.mtx.Unlock() - - recorder := httptest.NewRecorder() + recorder := httptest.NewRecorder() - // process request - handler.ServeHTTP(recorder, request) - - // set in cache - responseCacheBody := cb.Set(request.URL.String(), recorder.Code, recorder.Body.Bytes()) - - // write - writer.WriteHeader(recorder.Code) - writer.Write(recorder.Body.Bytes()) - - // feed all subscriptions - cb.mtx.RLock() - c := cb.resultChan[uri] - subscribeCount := cb.subscribeCount[uri] - cb.mtx.RUnlock() - for i := 0; i < subscribeCount; i++ { - c <- responseCacheBody - } - - cb.mtx.Lock() - delete(cb.subscribeCount, uri) - delete(cb.resultChan, uri) - cb.mtx.Unlock() - - return - } - - // same query is processing but not cached yet. - // subscribe for cache result here. - cb.subscribeCount[uri]++ - cb.mtx.Unlock() + // process request + handler.ServeHTTP(recorder, request) - response := <-resChan + // set in cache + cb.Set(request.URL.String(), recorder.Code, recorder.Body.Bytes()) - writer.WriteHeader(response.status) - writer.Write(response.body) + // write + writer.WriteHeader(recorder.Code) + writer.Write(recorder.Body.Bytes()) } From ba1fa1cf13eb2aa4138077bd8d38d0ac25ec9424 Mon Sep 17 00:00:00 2001 From: Jeff Woo Date: Mon, 10 Jan 2022 18:22:50 +0900 Subject: [PATCH 2/5] Revert "feat: remove concurrent query cache mutex" This reverts commit 7b39f856e59c536b88046780960b34dc484f9128. --- bin/v0.34.x/rpc/cache.go | 81 ++++++++++++++++++++++++++++++++-------- 1 file changed, 66 insertions(+), 15 deletions(-) diff --git a/bin/v0.34.x/rpc/cache.go b/bin/v0.34.x/rpc/cache.go index cbe1846..ac09e22 100644 --- a/bin/v0.34.x/rpc/cache.go +++ b/bin/v0.34.x/rpc/cache.go @@ -2,10 +2,11 @@ package rpc import ( "fmt" - lru "github.com/hashicorp/golang-lru" "net/http" "net/http/httptest" "sync" + + lru "github.com/hashicorp/golang-lru" ) type ResponseCache struct { @@ -19,7 +20,11 @@ type CacheBackend struct { cacheServeCount uint64 serveCount uint64 cacheType string - mtx *sync.Mutex + mtx *sync.RWMutex + + // subscribe to cache for same request URI + resultChan map[string]chan *ResponseCache + subscribeCount map[string]int } func NewCacheBackend(cacheSize int, cacheType string) *CacheBackend { @@ -35,17 +40,22 @@ func NewCacheBackend(cacheSize int, cacheType string) *CacheBackend { cacheServeCount: 0, serveCount: 0, cacheType: cacheType, - mtx: new(sync.Mutex), + mtx: new(sync.RWMutex), + resultChan: make(map[string]chan *ResponseCache), + subscribeCount: make(map[string]int), } } -func (cb *CacheBackend) Set(cacheKey string, status int, body []byte) { - if evicted := cb.lru.Add(cacheKey, &ResponseCache{ +func (cb *CacheBackend) Set(cacheKey string, status int, body []byte) *ResponseCache { + response := &ResponseCache{ status: status, body: body, - }); evicted != false { + } + if evicted := cb.lru.Add(cacheKey, response); evicted != false { cb.evictionCount++ } + + return response } func (cb *CacheBackend) Get(cacheKey string) *ResponseCache { @@ -74,6 +84,8 @@ func (cb *CacheBackend) Purge() { cb.evictionCount = 0 cb.cacheServeCount = 0 cb.serveCount = 0 + cb.resultChan = make(map[string]chan *ResponseCache) + cb.subscribeCount = make(map[string]int) cb.mtx.Unlock() } @@ -82,9 +94,11 @@ func (cb *CacheBackend) HandleCachedHTTP(writer http.ResponseWriter, request *ht cb.serveCount++ cb.mtx.Unlock() + uri := request.URL.String() + + // see if this request is already made, and in transit // set response type as json writer.Header().Set("Content-Type", "application/json") - writer.Header().Set("Connection", "close") cached := cb.Get(request.URL.String()) // if cached, return as is @@ -98,15 +112,52 @@ func (cb *CacheBackend) HandleCachedHTTP(writer http.ResponseWriter, request *ht return } - recorder := httptest.NewRecorder() + cb.mtx.Lock() + resChan, isInTransit := cb.resultChan[uri] + + // if isInTransit is false, this is the first time we're processing this query + // run actual querier + if !isInTransit { + cb.resultChan[uri] = make(chan *ResponseCache) + cb.subscribeCount[uri] = 0 + cb.mtx.Unlock() + + recorder := httptest.NewRecorder() - // process request - handler.ServeHTTP(recorder, request) + // process request + handler.ServeHTTP(recorder, request) + + // set in cache + responseCacheBody := cb.Set(request.URL.String(), recorder.Code, recorder.Body.Bytes()) + + // write + writer.WriteHeader(recorder.Code) + writer.Write(recorder.Body.Bytes()) + + // feed all subscriptions + cb.mtx.RLock() + c := cb.resultChan[uri] + subscribeCount := cb.subscribeCount[uri] + cb.mtx.RUnlock() + for i := 0; i < subscribeCount; i++ { + c <- responseCacheBody + } + + cb.mtx.Lock() + delete(cb.subscribeCount, uri) + delete(cb.resultChan, uri) + cb.mtx.Unlock() + + return + } + + // same query is processing but not cached yet. + // subscribe for cache result here. + cb.subscribeCount[uri]++ + cb.mtx.Unlock() - // set in cache - cb.Set(request.URL.String(), recorder.Code, recorder.Body.Bytes()) + response := <-resChan - // write - writer.WriteHeader(recorder.Code) - writer.Write(recorder.Body.Bytes()) + writer.WriteHeader(response.status) + writer.Write(response.body) } From c796f0e8b6ec09fdb2b5ccd48f0e9eeb95f391df Mon Sep 17 00:00:00 2001 From: Jeff Woo Date: Tue, 21 Dec 2021 17:22:45 +0900 Subject: [PATCH 3/5] fix: keep cache channel from purging --- bin/v0.34.x/rpc/cache.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/bin/v0.34.x/rpc/cache.go b/bin/v0.34.x/rpc/cache.go index ac09e22..4d55242 100644 --- a/bin/v0.34.x/rpc/cache.go +++ b/bin/v0.34.x/rpc/cache.go @@ -84,8 +84,6 @@ func (cb *CacheBackend) Purge() { cb.evictionCount = 0 cb.cacheServeCount = 0 cb.serveCount = 0 - cb.resultChan = make(map[string]chan *ResponseCache) - cb.subscribeCount = make(map[string]int) cb.mtx.Unlock() } From 247d2f7e254234bc44c3ba3a9a06f631044e571a Mon Sep 17 00:00:00 2001 From: Jeff Woo Date: Wed, 22 Dec 2021 22:44:53 +0900 Subject: [PATCH 4/5] feat: close channel after response --- bin/v0.34.x/rpc/cache.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/bin/v0.34.x/rpc/cache.go b/bin/v0.34.x/rpc/cache.go index 4d55242..badc614 100644 --- a/bin/v0.34.x/rpc/cache.go +++ b/bin/v0.34.x/rpc/cache.go @@ -116,7 +116,8 @@ func (cb *CacheBackend) HandleCachedHTTP(writer http.ResponseWriter, request *ht // if isInTransit is false, this is the first time we're processing this query // run actual querier if !isInTransit { - cb.resultChan[uri] = make(chan *ResponseCache) + c := make(chan *ResponseCache) + cb.resultChan[uri] = c cb.subscribeCount[uri] = 0 cb.mtx.Unlock() @@ -134,12 +135,12 @@ func (cb *CacheBackend) HandleCachedHTTP(writer http.ResponseWriter, request *ht // feed all subscriptions cb.mtx.RLock() - c := cb.resultChan[uri] subscribeCount := cb.subscribeCount[uri] cb.mtx.RUnlock() for i := 0; i < subscribeCount; i++ { c <- responseCacheBody } + close(c) cb.mtx.Lock() delete(cb.subscribeCount, uri) From 2d35844b1fd293efa9ea8d3caf50111e4584136e Mon Sep 17 00:00:00 2001 From: Jeff Woo Date: Fri, 14 Jan 2022 01:28:12 +0900 Subject: [PATCH 5/5] fix: close go channel on http serve is failure --- bin/v0.34.x/rpc/cache.go | 50 +++++++++++++++++++++++++--------------- 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/bin/v0.34.x/rpc/cache.go b/bin/v0.34.x/rpc/cache.go index badc614..32aa0f5 100644 --- a/bin/v0.34.x/rpc/cache.go +++ b/bin/v0.34.x/rpc/cache.go @@ -97,6 +97,7 @@ func (cb *CacheBackend) HandleCachedHTTP(writer http.ResponseWriter, request *ht // see if this request is already made, and in transit // set response type as json writer.Header().Set("Content-Type", "application/json") + writer.Header().Set("Connection", "close") cached := cb.Get(request.URL.String()) // if cached, return as is @@ -122,31 +123,39 @@ func (cb *CacheBackend) HandleCachedHTTP(writer http.ResponseWriter, request *ht cb.mtx.Unlock() recorder := httptest.NewRecorder() + var cache *ResponseCache + + go func() { + <-request.Context().Done() + + // feed all subscriptions + cb.mtx.RLock() + subscribeCount := cb.subscribeCount[uri] + cb.mtx.RUnlock() + + if cache != nil { + for i := 0; i < subscribeCount; i++ { + c <- cache + } + } + close(c) + + cb.mtx.Lock() + delete(cb.subscribeCount, uri) + delete(cb.resultChan, uri) + cb.mtx.Unlock() + }() // process request handler.ServeHTTP(recorder, request) // set in cache - responseCacheBody := cb.Set(request.URL.String(), recorder.Code, recorder.Body.Bytes()) + cache = cb.Set(request.URL.String(), recorder.Code, recorder.Body.Bytes()) // write writer.WriteHeader(recorder.Code) writer.Write(recorder.Body.Bytes()) - // feed all subscriptions - cb.mtx.RLock() - subscribeCount := cb.subscribeCount[uri] - cb.mtx.RUnlock() - for i := 0; i < subscribeCount; i++ { - c <- responseCacheBody - } - close(c) - - cb.mtx.Lock() - delete(cb.subscribeCount, uri) - delete(cb.resultChan, uri) - cb.mtx.Unlock() - return } @@ -155,8 +164,13 @@ func (cb *CacheBackend) HandleCachedHTTP(writer http.ResponseWriter, request *ht cb.subscribeCount[uri]++ cb.mtx.Unlock() - response := <-resChan + response, ok := <-resChan - writer.WriteHeader(response.status) - writer.Write(response.body) + if ok { + writer.WriteHeader(response.status) + writer.Write(response.body) + } else { + writer.WriteHeader(503) + writer.Write([]byte("Service Unavailable")) + } }