Skip to content

Commit

Permalink
[FAB-9516] Split cclifecycle update to 2 phases
Browse files Browse the repository at this point in the history
The lifecycle object that caches the deployed chaincodes of the peer
receives a notification from the ledger before the install of the
chaincode actually happens.
The install might fail, and then gossip would publish false information.

For this - the ledger listener separated the notification into 2 steps:

 1.  Notify about the deployment, which happens just before the install
 2.  Notify whether the deployment was successful or not, which is only
   relevant for install.

This change set moves the state update of the lifecycle object to the DeployDone()
phase (the 2nd phase from above)

Change-Id: I495f73278ce4e7d017085a6df4043794bc7005a7
Signed-off-by: yacovm <[email protected]>
  • Loading branch information
yacovm committed Apr 22, 2018
1 parent 7dc4a86 commit e360a6e
Show file tree
Hide file tree
Showing 12 changed files with 604 additions and 636 deletions.
75 changes: 46 additions & 29 deletions core/cclifecycle/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ import (

"github.com/hyperledger/fabric/common/chaincode"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/ledger/cceventmgmt"
"github.com/pkg/errors"
)

var (
logger = flogging.MustGetLogger("discovery/lifecycle")
// Logger is the logging instance for this package.
// It's exported because the tests override its backend
Logger = flogging.MustGetLogger("discovery/lifecycle")
)

// Lifecycle manages information regarding chaincode lifecycle
Expand All @@ -36,12 +39,16 @@ type LifeCycleChangeListener interface {
// HandleMetadataUpdate is triggered upon a change in the chaincode lifecycle change
type HandleMetadataUpdate func(channel string, chaincodes chaincode.MetadataSet)

// go:generate mockery -dir core/cclifecycle -name LifeCycleChangeListener -case underscore -output core/cclifecycle/mocks/

// LifeCycleChangeListener runs whenever there is a change to the metadata
// // of a chaincode in the context of a specific channel
func (mdUpdate HandleMetadataUpdate) LifeCycleChangeListener(channel string, chaincodes chaincode.MetadataSet) {
mdUpdate(channel, chaincodes)
}

// go:generate mockery -dir core/cclifecycle -name Enumerator -case underscore -output core/cclifecycle/mocks/

// Enumerator enumerates chaincodes
type Enumerator interface {
// Enumerate returns the installed chaincodes
Expand All @@ -56,6 +63,8 @@ func (listCCs Enumerate) Enumerate() ([]chaincode.InstalledChaincode, error) {
return listCCs()
}

// go:generate mockery -dir core/cclifecycle -name Query -case underscore -output core/cclifecycle/mocks/

// Query queries the state
type Query interface {
// GetState gets the value for given namespace and key. For a chaincode, the namespace corresponds to the chaincodeId
Expand All @@ -65,8 +74,21 @@ type Query interface {
Done()
}

// QueryCreator creates a new query
type QueryCreator func() (Query, error)
// go:generate mockery -dir core/cclifecycle -name QueryCreator -case underscore -output core/cclifecycle/mocks/

// QueryCreator creates queries
type QueryCreator interface {
// NewQuery creates a new Query, or error on failure
NewQuery() (Query, error)
}

// QueryCreatorFunc creates a new query
type QueryCreatorFunc func() (Query, error)

// NewQuery creates a new Query, or error on failure
func (qc QueryCreatorFunc) NewQuery() (Query, error) {
return qc()
}

// NewLifeCycle creates a new Lifecycle instance
func NewLifeCycle(installedChaincodes Enumerator) (*Lifecycle, error) {
Expand All @@ -87,48 +109,48 @@ func NewLifeCycle(installedChaincodes Enumerator) (*Lifecycle, error) {
// Metadata returns the metadata of the chaincode on the given channel,
// or nil if not found or an error occurred at retrieving it
func (lc *Lifecycle) Metadata(channel string, cc string) *chaincode.Metadata {
newQuery := lc.queryCreatorsByChannel[channel]
if newQuery == nil {
logger.Warning("Requested Metadata for non-existent channel", channel)
queryCreator := lc.queryCreatorsByChannel[channel]
if queryCreator == nil {
Logger.Warning("Requested Metadata for non-existent channel", channel)
return nil
}
if md, found := lc.deployedCCsByChannel[channel].Lookup(cc); found {
logger.Debug("Returning metadata for channel", channel, ", chaincode", cc, ":", md)
Logger.Debug("Returning metadata for channel", channel, ", chaincode", cc, ":", md)
return &md
}
query, err := newQuery()
query, err := queryCreator.NewQuery()
if err != nil {
logger.Error("Failed obtaining new query for channel", channel, ":", err)
Logger.Error("Failed obtaining new query for channel", channel, ":", err)
return nil
}
md, err := DeployedChaincodes(query, AcceptAll, cc)
if err != nil {
logger.Error("Failed querying LSCC for channel", channel, ":", err)
Logger.Error("Failed querying LSCC for channel", channel, ":", err)
return nil
}
if len(md) == 0 {
logger.Info("Chaincode", cc, "isn't defined in channel", channel)
Logger.Info("Chaincode", cc, "isn't defined in channel", channel)
return nil
}

return &md[0]
}

func (lc *Lifecycle) initMetadataForChannel(channel string, newQuery QueryCreator) error {
func (lc *Lifecycle) initMetadataForChannel(channel string, queryCreator QueryCreator) error {
if lc.isChannelMetadataInitialized(channel) {
return nil
}
// Create a new metadata mapping for the channel
query, err := newQuery()
query, err := queryCreator.NewQuery()
if err != nil {
return errors.WithStack(err)
}
ccs, err := queryChaincodeDefinitions(query, lc.installedCCs, DeployedChaincodes)
if err != nil {
return errors.WithStack(err)
}
lc.createMetadataForChannel(channel, newQuery)
lc.loadMetadataForChannel(channel, ccs)
lc.createMetadataForChannel(channel, queryCreator)
lc.updateState(channel, ccs)
return nil
}

Expand All @@ -146,14 +168,6 @@ func (lc *Lifecycle) isChannelMetadataInitialized(channel string) bool {
return exists
}

func (lc *Lifecycle) loadMetadataForChannel(channel string, ccs chaincode.MetadataSet) {
lc.RLock()
defer lc.RUnlock()
for _, cc := range ccs {
lc.deployedCCsByChannel[channel].Update(cc)
}
}

func (lc *Lifecycle) updateState(channel string, ccUpdate chaincode.MetadataSet) {
lc.RLock()
defer lc.RUnlock()
Expand All @@ -167,20 +181,23 @@ func (lc *Lifecycle) fireChangeListeners(channel string) {
md := lc.deployedCCsByChannel[channel]
lc.RUnlock()
for _, listener := range lc.listeners {
listener.LifeCycleChangeListener(channel, md.Aggregate())
aggregatedMD := md.Aggregate()
listener.LifeCycleChangeListener(channel, aggregatedMD)
}
Logger.Debug("Listeners for channel", channel, "invoked")
}

// NewChannelSubscription subscribes to a channel
func (lc *Lifecycle) NewChannelSubscription(channel string, newQuery QueryCreator) (*Subscription, error) {
func (lc *Lifecycle) NewChannelSubscription(channel string, queryCreator QueryCreator) (*Subscription, error) {
sub := &Subscription{
lc: lc,
channel: channel,
newQuery: newQuery,
lc: lc,
channel: channel,
queryCreator: queryCreator,
pendingUpdates: make(chan *cceventmgmt.ChaincodeDefinition, 1),
}
// Initialize metadata for the channel.
// This loads metadata about all installed chaincodes
if err := lc.initMetadataForChannel(channel, newQuery); err != nil {
if err := lc.initMetadataForChannel(channel, queryCreator); err != nil {
return nil, errors.WithStack(err)
}
lc.fireChangeListeners(channel)
Expand Down
Loading

0 comments on commit e360a6e

Please sign in to comment.