Skip to content

Commit

Permalink
feat: wait for same query's cache (#8)
Browse files Browse the repository at this point in the history
* wip

* feat: subscribe for cache channel

* test: test cache module

Co-authored-by: Jesse Chung <[email protected]>
  • Loading branch information
Jeff Woo and kjessec authored Dec 1, 2021
1 parent e751d1d commit 87b6c91
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 15 deletions.
71 changes: 59 additions & 12 deletions bin/v0.34.x/rpc/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package rpc

import (
"fmt"
lru "github.com/hashicorp/golang-lru"
"net/http"
"net/http/httptest"
"sync"

lru "github.com/hashicorp/golang-lru"
)

type ResponseCache struct {
Expand All @@ -20,6 +21,10 @@ type CacheBackend struct {
serveCount uint64
cacheType string
mtx *sync.Mutex

// subscribe to cache for same request URI
resultChan map[string]chan *ResponseCache
subscribeCount map[string]int
}

func NewCacheBackend(cacheSize int, cacheType string) *CacheBackend {
Expand All @@ -36,16 +41,21 @@ func NewCacheBackend(cacheSize int, cacheType string) *CacheBackend {
serveCount: 0,
cacheType: cacheType,
mtx: new(sync.Mutex),
resultChan: make(map[string]chan *ResponseCache),
subscribeCount: make(map[string]int),
}
}

func (cb *CacheBackend) Set(cacheKey string, status int, body []byte) {
if evicted := cb.lru.Add(cacheKey, &ResponseCache{
func (cb *CacheBackend) Set(cacheKey string, status int, body []byte) *ResponseCache {
response := &ResponseCache{
status: status,
body: body,
}); evicted != false {
}
if evicted := cb.lru.Add(cacheKey, response); evicted != false {
cb.evictionCount++
}

return response
}

func (cb *CacheBackend) Get(cacheKey string) *ResponseCache {
Expand Down Expand Up @@ -74,6 +84,8 @@ func (cb *CacheBackend) Purge() {
cb.evictionCount = 0
cb.cacheServeCount = 0
cb.serveCount = 0
cb.resultChan = make(map[string]chan *ResponseCache)
cb.subscribeCount = make(map[string]int)
cb.mtx.Unlock()
}

Expand All @@ -82,6 +94,9 @@ func (cb *CacheBackend) HandleCachedHTTP(writer http.ResponseWriter, request *ht
cb.serveCount++
cb.mtx.Unlock()

uri := request.URL.String()

// see if this request is already made, and in transit
// set response type as json
writer.Header().Set("Content-Type", "application/json")

Expand All @@ -97,15 +112,47 @@ func (cb *CacheBackend) HandleCachedHTTP(writer http.ResponseWriter, request *ht
return
}

recorder := httptest.NewRecorder()
cb.mtx.Lock()
_, isInTransit := cb.resultChan[uri]

// if isInTransit is false, this is the first time we're processing this query
// run actual querier
if !isInTransit {
cb.resultChan[uri] = make(chan *ResponseCache)
cb.subscribeCount[uri] = 0
cb.mtx.Unlock()

recorder := httptest.NewRecorder()

// process request
handler.ServeHTTP(recorder, request)
// process request
handler.ServeHTTP(recorder, request)

// set in cache
cb.Set(request.URL.String(), recorder.Code, recorder.Body.Bytes())
// set in cache
responseCacheBody := cb.Set(request.URL.String(), recorder.Code, recorder.Body.Bytes())

// write
writer.WriteHeader(recorder.Code)
writer.Write(recorder.Body.Bytes())

// feed all subscriptions
for i := 0; i < cb.subscribeCount[uri]; i++ {
cb.resultChan[uri] <- responseCacheBody
}

cb.mtx.Lock()
delete(cb.subscribeCount, uri)
delete(cb.resultChan, uri)
cb.mtx.Unlock()

return
}

// same query is processing but not cached yet.
// subscribe for cache result here.
cb.subscribeCount[uri]++
cb.mtx.Unlock()
response := <-cb.resultChan[uri]

// write
writer.WriteHeader(recorder.Code)
writer.Write(recorder.Body.Bytes())
writer.WriteHeader(response.status)
writer.Write(response.body)
}
12 changes: 9 additions & 3 deletions bin/v0.34.x/rpc/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package rpc

import (
"github.com/stretchr/testify/assert"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
)

func TestCacheBackend(t *testing.T) {
cb := NewCacheBackend(1)
cb := NewCacheBackend(1, "test")

cb.Set("key", 200, []byte("hello world"))
cached := cb.Get("key")
Expand Down Expand Up @@ -42,7 +44,10 @@ func TestCacheBackend(t *testing.T) {
cb.HandleCachedHTTP(testRes, testReq, handler)
cb.HandleCachedHTTP(testRes, testReq, handler)

assert.Equal(t, 1, cb.Purge())
fmt.Println(callCount)
assert.Equal(t, 1, callCount)

cb.Purge()

callCount = 0
cb.HandleCachedHTTP(testRes, testReq, handler)
Expand All @@ -52,6 +57,7 @@ func TestCacheBackend(t *testing.T) {
cb.HandleCachedHTTP(testRes, testReq, handler)
cb.HandleCachedHTTP(testRes, testReq, handler)

fmt.Println(callCount)
assert.Equal(t, callCount, 1)

}
1 change: 1 addition & 0 deletions bin/v0.34.x/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rpc
import (
"context"
"fmt"

abcicli "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/bytes"
Expand Down

0 comments on commit 87b6c91

Please sign in to comment.