Skip to content

Commit

Permalink
[FAB-11321] Alleviating lock contention of MSP cache
Browse files Browse the repository at this point in the history
This patch replaces LRU in MSP cache with a second-chance alogorithm,
an approximate LRU algorithm, in order to remove mutex locks.

With the second chance algorithm, we can use RW locks to guard cache
items for cucurrent accesses, so this change significantly reducdes
lock contention when TPS is high.

Change-Id: Ic21873596ab83c5605f41e7a14987e586d970b63
Signed-off-by: Yohei Ueda <[email protected]>
  • Loading branch information
yoheiueda committed Sep 1, 2018
1 parent 2ed5537 commit cca4004
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 384 deletions.
9 changes: 0 additions & 9 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ noverify = [
name = "github.com/fsouza/go-dockerclient"
version = "1.2.0"

[[constraint]]
branch = "master"
name = "github.com/golang/groupcache"

[[constraint]]
name = "github.com/golang/protobuf"
version = "1.1.0"
Expand Down
55 changes: 17 additions & 38 deletions msp/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@ SPDX-License-Identifier: Apache-2.0
package cache

import (
"fmt"
"sync"

"github.com/golang/groupcache/lru"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/msp"
pmsp "github.com/hyperledger/fabric/protos/msp"
"github.com/pkg/errors"
)

const (
Expand All @@ -27,13 +24,13 @@ var mspLogger = flogging.MustGetLogger("msp")
func New(o msp.MSP) (msp.MSP, error) {
mspLogger.Debugf("Creating Cache-MSP instance")
if o == nil {
return nil, fmt.Errorf("Invalid passed MSP. It must be different from nil.")
return nil, errors.Errorf("Invalid passed MSP. It must be different from nil.")
}

theMsp := &cachedMSP{MSP: o}
theMsp.deserializeIdentityCache = lru.New(deserializeIdentityCacheSize)
theMsp.satisfiesPrincipalCache = lru.New(satisfiesPrincipalCacheSize)
theMsp.validateIdentityCache = lru.New(validateIdentityCacheSize)
theMsp.deserializeIdentityCache = newSecondChanceCache(deserializeIdentityCacheSize)
theMsp.satisfiesPrincipalCache = newSecondChanceCache(satisfiesPrincipalCacheSize)
theMsp.validateIdentityCache = newSecondChanceCache(validateIdentityCacheSize)

return theMsp, nil
}
Expand All @@ -42,20 +39,14 @@ type cachedMSP struct {
msp.MSP

// cache for DeserializeIdentity.
deserializeIdentityCache *lru.Cache

dicMutex sync.Mutex // synchronize access to cache
deserializeIdentityCache *secondChanceCache

// cache for validateIdentity
validateIdentityCache *lru.Cache

vicMutex sync.Mutex // synchronize access to cache
validateIdentityCache *secondChanceCache

// basically a map of principals=>identities=>stringified to booleans
// specifying whether this identity satisfies this principal
satisfiesPrincipalCache *lru.Cache

spcMutex sync.Mutex // synchronize access to cache
satisfiesPrincipalCache *secondChanceCache
}

type cachedIdentity struct {
Expand All @@ -72,9 +63,7 @@ func (id *cachedIdentity) Validate() error {
}

func (c *cachedMSP) DeserializeIdentity(serializedIdentity []byte) (msp.Identity, error) {
c.dicMutex.Lock()
id, ok := c.deserializeIdentityCache.Get(string(serializedIdentity))
c.dicMutex.Unlock()
id, ok := c.deserializeIdentityCache.get(string(serializedIdentity))
if ok {
return &cachedIdentity{
cache: c,
Expand All @@ -84,9 +73,7 @@ func (c *cachedMSP) DeserializeIdentity(serializedIdentity []byte) (msp.Identity

id, err := c.MSP.DeserializeIdentity(serializedIdentity)
if err == nil {
c.dicMutex.Lock()
defer c.dicMutex.Unlock()
c.deserializeIdentityCache.Add(string(serializedIdentity), id)
c.deserializeIdentityCache.add(string(serializedIdentity), id)
return &cachedIdentity{
cache: c,
Identity: id.(msp.Identity),
Expand All @@ -105,19 +92,15 @@ func (c *cachedMSP) Validate(id msp.Identity) error {
identifier := id.GetIdentifier()
key := string(identifier.Mspid + ":" + identifier.Id)

c.vicMutex.Lock()
_, ok := c.validateIdentityCache.Get(key)
c.vicMutex.Unlock()
_, ok := c.validateIdentityCache.get(key)
if ok {
// cache only stores if the identity is valid.
return nil
}

err := c.MSP.Validate(id)
if err == nil {
c.vicMutex.Lock()
defer c.vicMutex.Unlock()
c.validateIdentityCache.Add(key, true)
c.validateIdentityCache.add(key, true)
}

return err
Expand All @@ -129,9 +112,7 @@ func (c *cachedMSP) SatisfiesPrincipal(id msp.Identity, principal *pmsp.MSPPrinc
principalKey := string(principal.PrincipalClassification) + string(principal.Principal)
key := identityKey + principalKey

c.spcMutex.Lock()
v, ok := c.satisfiesPrincipalCache.Get(key)
c.spcMutex.Unlock()
v, ok := c.satisfiesPrincipalCache.get(key)
if ok {
if v == nil {
return nil
Expand All @@ -142,16 +123,14 @@ func (c *cachedMSP) SatisfiesPrincipal(id msp.Identity, principal *pmsp.MSPPrinc

err := c.MSP.SatisfiesPrincipal(id, principal)

c.spcMutex.Lock()
defer c.spcMutex.Unlock()
c.satisfiesPrincipalCache.Add(key, err)
c.satisfiesPrincipalCache.add(key, err)
return err
}

func (c *cachedMSP) cleanCash() error {
c.deserializeIdentityCache = lru.New(deserializeIdentityCacheSize)
c.satisfiesPrincipalCache = lru.New(satisfiesPrincipalCacheSize)
c.validateIdentityCache = lru.New(validateIdentityCacheSize)
c.deserializeIdentityCache = newSecondChanceCache(deserializeIdentityCacheSize)
c.satisfiesPrincipalCache = newSecondChanceCache(satisfiesPrincipalCacheSize)
c.validateIdentityCache = newSecondChanceCache(validateIdentityCacheSize)

return nil
}
18 changes: 9 additions & 9 deletions msp/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func TestSetup(t *testing.T) {
err = i.Setup(nil)
assert.NoError(t, err)
mockMSP.AssertExpectations(t)
assert.Equal(t, 0, i.(*cachedMSP).deserializeIdentityCache.Len())
assert.Equal(t, 0, i.(*cachedMSP).satisfiesPrincipalCache.Len())
assert.Equal(t, 0, i.(*cachedMSP).validateIdentityCache.Len())
assert.Equal(t, 0, i.(*cachedMSP).deserializeIdentityCache.len())
assert.Equal(t, 0, i.(*cachedMSP).satisfiesPrincipalCache.len())
assert.Equal(t, 0, i.(*cachedMSP).validateIdentityCache.len())
}

func TestGetType(t *testing.T) {
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestDeserializeIdentity(t *testing.T) {

mockMSP.AssertExpectations(t)
// Check the cache
_, ok := wrappedMSP.(*cachedMSP).deserializeIdentityCache.Get(string(serializedIdentity))
_, ok := wrappedMSP.(*cachedMSP).deserializeIdentityCache.get(string(serializedIdentity))
assert.True(t, ok)

// Check the same object is returned
Expand All @@ -170,7 +170,7 @@ func TestDeserializeIdentity(t *testing.T) {
assert.Contains(t, err.Error(), "Invalid identity")
mockMSP.AssertExpectations(t)

_, ok = wrappedMSP.(*cachedMSP).deserializeIdentityCache.Get(string(serializedIdentity))
_, ok = wrappedMSP.(*cachedMSP).deserializeIdentityCache.get(string(serializedIdentity))
assert.False(t, ok)
}

Expand All @@ -190,7 +190,7 @@ func TestValidate(t *testing.T) {
// Check the cache
identifier := mockIdentity.GetIdentifier()
key := string(identifier.Mspid + ":" + identifier.Id)
v, ok := i.(*cachedMSP).validateIdentityCache.Get(string(key))
v, ok := i.(*cachedMSP).validateIdentityCache.get(string(key))
assert.True(t, ok)
assert.True(t, v.(bool))

Expand All @@ -209,7 +209,7 @@ func TestValidate(t *testing.T) {
// Check the cache
identifier = mockIdentity.GetIdentifier()
key = string(identifier.Mspid + ":" + identifier.Id)
_, ok = i.(*cachedMSP).validateIdentityCache.Get(string(key))
_, ok = i.(*cachedMSP).validateIdentityCache.get(string(key))
assert.False(t, ok)
}

Expand Down Expand Up @@ -293,7 +293,7 @@ func TestSatisfiesPrincipal(t *testing.T) {
identityKey := string(identifier.Mspid + ":" + identifier.Id)
principalKey := string(mockMSPPrincipal.PrincipalClassification) + string(mockMSPPrincipal.Principal)
key := identityKey + principalKey
v, ok := i.(*cachedMSP).satisfiesPrincipalCache.Get(key)
v, ok := i.(*cachedMSP).satisfiesPrincipalCache.get(key)
assert.True(t, ok)
assert.Nil(t, v)

Expand All @@ -316,7 +316,7 @@ func TestSatisfiesPrincipal(t *testing.T) {
identityKey = string(identifier.Mspid + ":" + identifier.Id)
principalKey = string(mockMSPPrincipal.PrincipalClassification) + string(mockMSPPrincipal.Principal)
key = identityKey + principalKey
v, ok = i.(*cachedMSP).satisfiesPrincipalCache.Get(key)
v, ok = i.(*cachedMSP).satisfiesPrincipalCache.get(key)
assert.True(t, ok)
assert.NotNil(t, v)
assert.Contains(t, "Invalid", v.(error).Error())
Expand Down
114 changes: 114 additions & 0 deletions msp/cache/second_chance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package cache

import (
"sync"
"sync/atomic"
)

// This package implements Second-Chance Algorithm, an approximate LRU algorithms.
// https://www.cs.jhu.edu/~yairamir/cs418/os6/tsld023.htm

// secondChanceCache holds key-value items with a limited size.
// When the number cached items exceeds the limit, victims are selected based on the
// Second-Chance Algorithm and get purged
type secondChanceCache struct {
// manages mapping between keys and items
table map[string]*cacheItem

// holds a list of cached items.
items []*cacheItem

// indicates the next candidate of a victim in the items list
position int

// read lock for get, and write lock for add
rwlock sync.RWMutex
}

type cacheItem struct {
key string
value interface{}
// set to 1 when get() is called. set to 0 when victim scan
referenced int32
}

func newSecondChanceCache(cacheSize int) *secondChanceCache {
var cache secondChanceCache
cache.position = 0
cache.items = make([]*cacheItem, cacheSize)
cache.table = make(map[string]*cacheItem)

return &cache
}

func (cache *secondChanceCache) len() int {
cache.rwlock.RLock()
defer cache.rwlock.RUnlock()

return len(cache.table)
}

func (cache *secondChanceCache) get(key string) (interface{}, bool) {
cache.rwlock.RLock()
defer cache.rwlock.RUnlock()

item, ok := cache.table[key]
if !ok {
return nil, false
}

// referenced bit is set to true to indicate that this item is recently accessed.
atomic.StoreInt32(&item.referenced, 1)

return item.value, true
}

func (cache *secondChanceCache) add(key string, value interface{}) {
cache.rwlock.Lock()
defer cache.rwlock.Unlock()

if old, ok := cache.table[key]; ok {
old.value = value
atomic.StoreInt32(&old.referenced, 1)
return
}

var item cacheItem
item.key = key
item.value = value
atomic.StoreInt32(&item.referenced, 1)

size := len(cache.items)
num := len(cache.table)
if num < size {
// cache is not full, so just store the new item at the end of the list
cache.table[key] = &item
cache.items[num] = &item
return
}

// starts victim scan since cache is full
for {
// checks whether this item is recently accsessed or not
victim := cache.items[cache.position]
if atomic.LoadInt32(&victim.referenced) == 0 {
// a victim is found. delete it, and store the new item here.
delete(cache.table, victim.key)
cache.table[key] = &item
cache.items[cache.position] = &item
cache.position = (cache.position + 1) % size
return
}

// referenced bit is set to false so that this item will be get purged
// unless it is accessed until a next victim scan
atomic.StoreInt32(&victim.referenced, 0)
cache.position = (cache.position + 1) % size
}
}
Loading

0 comments on commit cca4004

Please sign in to comment.