From 87b6c91c76912db6bd33268c4a4f6e6a3e685ef6 Mon Sep 17 00:00:00 2001 From: Jeff Woo Date: Wed, 1 Dec 2021 15:57:36 +0900 Subject: [PATCH] feat: wait for same query's cache (#8) * wip * feat: subscribe for cache channel * test: test cache module Co-authored-by: Jesse Chung --- bin/v0.34.x/rpc/cache.go | 71 +++++++++++++++++++++++++++++------ bin/v0.34.x/rpc/cache_test.go | 12 ++++-- bin/v0.34.x/rpc/client.go | 1 + 3 files changed, 69 insertions(+), 15 deletions(-) diff --git a/bin/v0.34.x/rpc/cache.go b/bin/v0.34.x/rpc/cache.go index 1e84494..a8eb37b 100644 --- a/bin/v0.34.x/rpc/cache.go +++ b/bin/v0.34.x/rpc/cache.go @@ -2,10 +2,11 @@ package rpc import ( "fmt" - lru "github.com/hashicorp/golang-lru" "net/http" "net/http/httptest" "sync" + + lru "github.com/hashicorp/golang-lru" ) type ResponseCache struct { @@ -20,6 +21,10 @@ type CacheBackend struct { serveCount uint64 cacheType string mtx *sync.Mutex + + // subscribe to cache for same request URI + resultChan map[string]chan *ResponseCache + subscribeCount map[string]int } func NewCacheBackend(cacheSize int, cacheType string) *CacheBackend { @@ -36,16 +41,21 @@ func NewCacheBackend(cacheSize int, cacheType string) *CacheBackend { serveCount: 0, cacheType: cacheType, mtx: new(sync.Mutex), + resultChan: make(map[string]chan *ResponseCache), + subscribeCount: make(map[string]int), } } -func (cb *CacheBackend) Set(cacheKey string, status int, body []byte) { - if evicted := cb.lru.Add(cacheKey, &ResponseCache{ +func (cb *CacheBackend) Set(cacheKey string, status int, body []byte) *ResponseCache { + response := &ResponseCache{ status: status, body: body, - }); evicted != false { + } + if evicted := cb.lru.Add(cacheKey, response); evicted != false { cb.evictionCount++ } + + return response } func (cb *CacheBackend) Get(cacheKey string) *ResponseCache { @@ -74,6 +84,8 @@ func (cb *CacheBackend) Purge() { cb.evictionCount = 0 cb.cacheServeCount = 0 cb.serveCount = 0 + cb.resultChan = make(map[string]chan *ResponseCache) + cb.subscribeCount = make(map[string]int) cb.mtx.Unlock() } @@ -82,6 +94,9 @@ func (cb *CacheBackend) HandleCachedHTTP(writer http.ResponseWriter, request *ht cb.serveCount++ cb.mtx.Unlock() + uri := request.URL.String() + + // see if this request is already made, and in transit // set response type as json writer.Header().Set("Content-Type", "application/json") @@ -97,15 +112,47 @@ func (cb *CacheBackend) HandleCachedHTTP(writer http.ResponseWriter, request *ht return } - recorder := httptest.NewRecorder() + cb.mtx.Lock() + _, isInTransit := cb.resultChan[uri] + + // if isInTransit is false, this is the first time we're processing this query + // run actual querier + if !isInTransit { + cb.resultChan[uri] = make(chan *ResponseCache) + cb.subscribeCount[uri] = 0 + cb.mtx.Unlock() + + recorder := httptest.NewRecorder() - // process request - handler.ServeHTTP(recorder, request) + // process request + handler.ServeHTTP(recorder, request) - // set in cache - cb.Set(request.URL.String(), recorder.Code, recorder.Body.Bytes()) + // set in cache + responseCacheBody := cb.Set(request.URL.String(), recorder.Code, recorder.Body.Bytes()) + + // write + writer.WriteHeader(recorder.Code) + writer.Write(recorder.Body.Bytes()) + + // feed all subscriptions + for i := 0; i < cb.subscribeCount[uri]; i++ { + cb.resultChan[uri] <- responseCacheBody + } + + cb.mtx.Lock() + delete(cb.subscribeCount, uri) + delete(cb.resultChan, uri) + cb.mtx.Unlock() + + return + } + + // same query is processing but not cached yet. + // subscribe for cache result here. + cb.subscribeCount[uri]++ + cb.mtx.Unlock() + response := <-cb.resultChan[uri] - // write - writer.WriteHeader(recorder.Code) - writer.Write(recorder.Body.Bytes()) + writer.WriteHeader(response.status) + writer.Write(response.body) } diff --git a/bin/v0.34.x/rpc/cache_test.go b/bin/v0.34.x/rpc/cache_test.go index e757c79..e8aceaf 100644 --- a/bin/v0.34.x/rpc/cache_test.go +++ b/bin/v0.34.x/rpc/cache_test.go @@ -1,14 +1,16 @@ package rpc import ( - "github.com/stretchr/testify/assert" + "fmt" "net/http" "net/http/httptest" "testing" + + "github.com/stretchr/testify/assert" ) func TestCacheBackend(t *testing.T) { - cb := NewCacheBackend(1) + cb := NewCacheBackend(1, "test") cb.Set("key", 200, []byte("hello world")) cached := cb.Get("key") @@ -42,7 +44,10 @@ func TestCacheBackend(t *testing.T) { cb.HandleCachedHTTP(testRes, testReq, handler) cb.HandleCachedHTTP(testRes, testReq, handler) - assert.Equal(t, 1, cb.Purge()) + fmt.Println(callCount) + assert.Equal(t, 1, callCount) + + cb.Purge() callCount = 0 cb.HandleCachedHTTP(testRes, testReq, handler) @@ -52,6 +57,7 @@ func TestCacheBackend(t *testing.T) { cb.HandleCachedHTTP(testRes, testReq, handler) cb.HandleCachedHTTP(testRes, testReq, handler) + fmt.Println(callCount) assert.Equal(t, callCount, 1) } diff --git a/bin/v0.34.x/rpc/client.go b/bin/v0.34.x/rpc/client.go index 6f86611..7cfbe4d 100644 --- a/bin/v0.34.x/rpc/client.go +++ b/bin/v0.34.x/rpc/client.go @@ -3,6 +3,7 @@ package rpc import ( "context" "fmt" + abcicli "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/bytes"