Skip to content

Commit

Permalink
[FAB-11056] Fix caching in Fabric & Dynamic Selection
Browse files Browse the repository at this point in the history
Both Fabric selection and Dynamic selection services cache
GetEndorsersForChaincode requests even if the request contains
a bad invocation chain. This patch adds an 'expiring' feature
to lazycache that combines the features of lazycache with
lazyref. Now if lazyref.Get() returns an error then the ref
will not be cached.

Change-Id: Ia5022c748d1dd504cd7d8a29f0bf0b5527edc76f
Signed-off-by: Bob Stasyszyn <[email protected]>
  • Loading branch information
bstasyszyn committed Jul 4, 2018
1 parent 2ad5906 commit 0c6b7f1
Show file tree
Hide file tree
Showing 10 changed files with 446 additions and 101 deletions.
15 changes: 3 additions & 12 deletions pkg/client/common/selection/dynamicselection/dynamicselection.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,9 @@ func newService(context context.Client, channelID string, discovery fab.Discover
service.pgResolvers = lazycache.New(
"PG_Resolver_Cache",
func(key lazycache.Key) (interface{}, error) {
return lazyref.New(
func() (interface{}, error) {
return service.createPGResolver(key.(*resolverKey))
},
lazyref.WithAbsoluteExpiration(service.cacheTimeout),
), nil
return service.createPGResolver(key.(*resolverKey))
},
lazyref.WithAbsoluteExpiration(service.cacheTimeout),
)

return service, nil
Expand Down Expand Up @@ -150,12 +146,7 @@ func (s *SelectionService) Close() {
}

func (s *SelectionService) getPeerGroupResolver(chaincodeIDs []string) (pgresolver.PeerGroupResolver, error) {
value, err := s.pgResolvers.Get(newResolverKey(s.channelID, chaincodeIDs...))
if err != nil {
return nil, err
}
lazyRef := value.(*lazyref.Reference)
resolver, err := lazyRef.Get()
resolver, err := s.pgResolvers.Get(newResolverKey(s.channelID, chaincodeIDs...))
if err != nil {
return nil, err
}
Expand Down
36 changes: 12 additions & 24 deletions pkg/client/common/selection/fabricselection/fabricselection.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,19 @@ func New(ctx contextAPI.Client, channelID string, discovery fab.DiscoveryService
}

s.chResponseCache = lazycache.New(
"Channel_Response_Cache",
"Fabric_Selection_Cache",
func(key lazycache.Key) (interface{}, error) {
return s.newChannelResponseRef(key.(*cacheKey).chaincodes, options.refreshInterval), nil
invocationChain := key.(*cacheKey).chaincodes
if logging.IsEnabledFor(moduleName, logging.DEBUG) {
key, err := json.Marshal(invocationChain)
if err != nil {
panic(fmt.Sprintf("marshal of chaincodes failed: %s", err))
}
logger.Debugf("Refreshing endorsers for chaincodes [%s] in channel [%s] from discovery service...", key, channelID)
}
return s.queryEndorsers(invocationChain)
},
lazyref.WithRefreshInterval(lazyref.InitImmediately, options.refreshInterval),
)

return s, nil
Expand Down Expand Up @@ -158,30 +167,9 @@ func (s *Service) getEndorsers(chaincodes []*fab.ChaincodeCall, chResponse discc
return endpoints, err
}

func (s *Service) newChannelResponseRef(chaincodes []*fab.ChaincodeCall, refreshInterval time.Duration) *lazyref.Reference {
return lazyref.New(
func() (interface{}, error) {
if logging.IsEnabledFor(moduleName, logging.DEBUG) {
key, err := json.Marshal(chaincodes)
if err != nil {
panic(fmt.Sprintf("marshal of chaincodes failed: %s", err))
}

logger.Debugf("Refreshing endorsers for chaincodes [%s] in channel [%s] from discovery service...", key, s.channelID)
}
return s.queryEndorsers(chaincodes)
},
lazyref.WithRefreshInterval(lazyref.InitImmediately, refreshInterval),
)
}

func (s *Service) getChannelResponse(chaincodes []*fab.ChaincodeCall) (discclient.ChannelResponse, error) {
key := newCacheKey(chaincodes)
ref, err := s.chResponseCache.Get(key)
if err != nil {
return nil, err
}
chResp, err := ref.(*lazyref.Reference).Get()
chResp, err := s.chResponseCache.Get(key)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/fabsdk/provider/chpvdr/chprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
var logger = logging.NewLogger("fabsdk")

type cache interface {
Get(lazycache.Key) (interface{}, error)
Get(lazycache.Key, ...interface{}) (interface{}, error)
Close()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/fabsdk/provider/chpvdr/fabprovider_testing_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func newChCfgRef(cfg fab.ChannelCfg) *chconfig.Ref {
}

// Get mock channel config reference
func (m *chCfgCache) Get(k lazycache.Key) (interface{}, error) {
func (m *chCfgCache) Get(k lazycache.Key, data ...interface{}) (interface{}, error) {
cfg, ok := m.cfgMap.Load(k.(chconfig.CacheKey).ChannelID())
if !ok {
return nil, errors.New("Channel config not found in cache")
Expand Down
108 changes: 98 additions & 10 deletions pkg/util/concurrent/lazycache/lazycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"sync/atomic"

"github.com/hyperledger/fabric-sdk-go/pkg/common/logging"
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
"github.com/hyperledger/fabric-sdk-go/pkg/util/concurrent/futurevalue"
"github.com/hyperledger/fabric-sdk-go/pkg/util/concurrent/lazyref"
"github.com/pkg/errors"
)

Expand All @@ -26,6 +28,12 @@ type Key interface {
// EntryInitializer creates a cache value for the given key
type EntryInitializer func(key Key) (interface{}, error)

// EntryInitializerWithData creates a cache value for the given key and the
// additional data passed in from Get(). With expiring cache entries, the
// initializer is called with the same key, but the latest data is passed from
// the Get() call that triggered the data to be cached/re-cached.
type EntryInitializerWithData func(key Key, data interface{}) (interface{}, error)

type future interface {
Get() (interface{}, error)
MustGet() interface{}
Expand All @@ -44,16 +52,42 @@ type Cache struct {
// name is useful for debugging
name string
m sync.Map
initializer EntryInitializer
initializer EntryInitializerWithData
closed int32
useRef bool
}

// New creates a new lazy cache.
// - name is the name of the cache and is only used for debugging purpose
// - initializer is invoked the first time an entry is being cached
// - opts are options for the cache. If any lazyref option is passed then a lazy reference
// is created for each of the cache entries to hold the actual value. This makes it possible
// to have expiring values and values that proactively refresh.
func New(name string, initializer EntryInitializer, opts ...options.Opt) *Cache {
return NewWithData(name,
func(key Key, data interface{}) (interface{}, error) {
return initializer(key)
},
opts...,
)
}

// New creates a new lazy cache with the given name
// (Note that the name is only used for debugging purpose)
func New(name string, initializer EntryInitializer) *Cache {
// NewWithData creates a new lazy cache. The provided initializer accepts optional data that
// is passed in from Get().
// - name is the name of the cache and is only used for debugging purpose
// - initializer is invoked the first time an entry is being cached
// - opts are options for the cache. If any lazyref option is passed then a lazy reference
// is created for each of the cache entries to hold the actual value. This makes it possible
// to have expiring values and values that proactively refresh.
func NewWithData(name string, initializer EntryInitializerWithData, opts ...options.Opt) *Cache {
useRef := useLazyRef(opts...)
if useRef {
initializer = newLazyRefInitializer(name, initializer, opts...)
}
return &Cache{
name: name,
initializer: initializer,
useRef: useRef,
}
}

Expand All @@ -67,12 +101,16 @@ func (c *Cache) Name() string {
// to create the value, and the key is inserted. If the
// initializer returns an error then the key is removed
// from the cache.
func (c *Cache) Get(key Key) (interface{}, error) {
func (c *Cache) Get(key Key, data ...interface{}) (interface{}, error) {
keyStr := key.String()

f, ok := c.m.Load(keyStr)
if ok {
return f.(future).Get()
v, err := f.(future).Get()
if err != nil {
return nil, err
}
return c.value(v, first(data))
}

// The key wasn't found. Attempt to add one.
Expand All @@ -81,24 +119,29 @@ func (c *Cache) Get(key Key) (interface{}, error) {
if closed := atomic.LoadInt32(&c.closed); closed == 1 {
return nil, errors.Errorf("%s - cache is closed", c.name)
}
return c.initializer(key)
return c.initializer(key, first(data))
},
)

f, loaded := c.m.LoadOrStore(keyStr, newFuture)
if loaded {
// Another thread has added the key before us. Return the value.
return f.(future).Get()
v, err := f.(future).Get()
if err != nil {
return nil, err
}
return c.value(v, first(data))
}

// We added the key. It must be initailized.
// We added the key. It must be initialized.
value, err := newFuture.Initialize()
if err != nil {
// Failed. Delete the key.
logger.Debugf("%s - Failed to initialize key [%s]: %s. Deleting key.", c.name, keyStr, err)
c.m.Delete(keyStr)
return nil, err
}
return value, err
return c.value(value, first(data))
}

// MustGet returns the value for the given key. If the key doesn't
Expand Down Expand Up @@ -150,3 +193,48 @@ func (c *Cache) close(key string, f future) {
}
}
}

func newLazyRefInitializer(name string, initializer EntryInitializerWithData, opts ...options.Opt) EntryInitializerWithData {
return func(key Key, data interface{}) (interface{}, error) {
logger.Debugf("%s - Calling initializer for [%s], data [%#v]", name, key, data)
ref := lazyref.NewWithData(
func(data interface{}) (interface{}, error) {
logger.Debugf("%s - Calling lazyref initializer for [%s], data [%#v]", name, key, data)
return initializer(key, data)
},
opts...,
)

// Make sure no error is returned from lazyref.Get(). If there is
// then return the error. We don't want to cache a reference that always
// returns an error, especially if it's a refreshing reference.
_, err := ref.Get(data)
if err != nil {
logger.Debugf("%s - Error returned from lazyref initializer [%s], data [%#v]: %s", name, key, data, err)
return nil, err
}
logger.Debugf("%s - Returning lazyref for [%s], data [%#v]", name, key, data)
return ref, nil
}
}

func (c *Cache) value(value interface{}, data interface{}) (interface{}, error) {
if value != nil && c.useRef {
return value.(*lazyref.Reference).Get(data)
}
return value, nil
}

func first(data []interface{}) interface{} {
if len(data) == 0 {
return nil
}
return data[0]
}

// useLazyRef returns true if the cache should used lazy references to hold the actual value
func useLazyRef(opts ...options.Opt) bool {
chk := &refOptCheck{}
options.Apply(chk, opts)
return chk.useRef
}
Loading

0 comments on commit 0c6b7f1

Please sign in to comment.