Skip to content

Commit

Permalink
feat: remove concurrent query cache mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
hanjukim committed Jan 10, 2022
1 parent 11d61c2 commit 7b39f85
Showing 1 changed file with 15 additions and 66 deletions.
81 changes: 15 additions & 66 deletions bin/v0.34.x/rpc/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package rpc

import (
"fmt"
lru "github.com/hashicorp/golang-lru"
"net/http"
"net/http/httptest"
"sync"

lru "github.com/hashicorp/golang-lru"
)

type ResponseCache struct {
Expand All @@ -20,11 +19,7 @@ type CacheBackend struct {
cacheServeCount uint64
serveCount uint64
cacheType string
mtx *sync.RWMutex

// subscribe to cache for same request URI
resultChan map[string]chan *ResponseCache
subscribeCount map[string]int
mtx *sync.Mutex
}

func NewCacheBackend(cacheSize int, cacheType string) *CacheBackend {
Expand All @@ -40,22 +35,17 @@ func NewCacheBackend(cacheSize int, cacheType string) *CacheBackend {
cacheServeCount: 0,
serveCount: 0,
cacheType: cacheType,
mtx: new(sync.RWMutex),
resultChan: make(map[string]chan *ResponseCache),
subscribeCount: make(map[string]int),
mtx: new(sync.Mutex),
}
}

func (cb *CacheBackend) Set(cacheKey string, status int, body []byte) *ResponseCache {
response := &ResponseCache{
func (cb *CacheBackend) Set(cacheKey string, status int, body []byte) {
if evicted := cb.lru.Add(cacheKey, &ResponseCache{
status: status,
body: body,
}
if evicted := cb.lru.Add(cacheKey, response); evicted != false {
}); evicted != false {
cb.evictionCount++
}

return response
}

func (cb *CacheBackend) Get(cacheKey string) *ResponseCache {
Expand Down Expand Up @@ -84,8 +74,6 @@ func (cb *CacheBackend) Purge() {
cb.evictionCount = 0
cb.cacheServeCount = 0
cb.serveCount = 0
cb.resultChan = make(map[string]chan *ResponseCache)
cb.subscribeCount = make(map[string]int)
cb.mtx.Unlock()
}

Expand All @@ -94,11 +82,9 @@ func (cb *CacheBackend) HandleCachedHTTP(writer http.ResponseWriter, request *ht
cb.serveCount++
cb.mtx.Unlock()

uri := request.URL.String()

// see if this request is already made, and in transit
// set response type as json
writer.Header().Set("Content-Type", "application/json")
writer.Header().Set("Connection", "close")

cached := cb.Get(request.URL.String())
// if cached, return as is
Expand All @@ -112,52 +98,15 @@ func (cb *CacheBackend) HandleCachedHTTP(writer http.ResponseWriter, request *ht
return
}

cb.mtx.Lock()
resChan, isInTransit := cb.resultChan[uri]

// if isInTransit is false, this is the first time we're processing this query
// run actual querier
if !isInTransit {
cb.resultChan[uri] = make(chan *ResponseCache)
cb.subscribeCount[uri] = 0
cb.mtx.Unlock()

recorder := httptest.NewRecorder()
recorder := httptest.NewRecorder()

// process request
handler.ServeHTTP(recorder, request)

// set in cache
responseCacheBody := cb.Set(request.URL.String(), recorder.Code, recorder.Body.Bytes())

// write
writer.WriteHeader(recorder.Code)
writer.Write(recorder.Body.Bytes())

// feed all subscriptions
cb.mtx.RLock()
c := cb.resultChan[uri]
subscribeCount := cb.subscribeCount[uri]
cb.mtx.RUnlock()
for i := 0; i < subscribeCount; i++ {
c <- responseCacheBody
}

cb.mtx.Lock()
delete(cb.subscribeCount, uri)
delete(cb.resultChan, uri)
cb.mtx.Unlock()

return
}

// same query is processing but not cached yet.
// subscribe for cache result here.
cb.subscribeCount[uri]++
cb.mtx.Unlock()
// process request
handler.ServeHTTP(recorder, request)

response := <-resChan
// set in cache
cb.Set(request.URL.String(), recorder.Code, recorder.Body.Bytes())

writer.WriteHeader(response.status)
writer.Write(response.body)
// write
writer.WriteHeader(recorder.Code)
writer.Write(recorder.Body.Bytes())
}

0 comments on commit 7b39f85

Please sign in to comment.