Skip to content

Commit

Permalink
[FAB-9923/FAB-10096] Use stored coll. conf for pull
Browse files Browse the repository at this point in the history
This commit makes use of the stored collection config which was
available at endorsement time to support pull of the missing private
data.

Change-Id: Ic10f3142f769e6afa7cfff70553fa3093244a08e
Signed-off-by: Artem Barger <[email protected]>
  • Loading branch information
C0rWin authored and mastersingh24 committed May 19, 2018
1 parent 71983e8 commit d6ba31b
Show file tree
Hide file tree
Showing 10 changed files with 551 additions and 185 deletions.
15 changes: 10 additions & 5 deletions gossip/privdata/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,20 +304,25 @@ func (c *coordinator) fetchFromTransientStore(txAndSeq txAndSeqInBlock, filter l
}
defer iterator.Close()
for {
res, err := iterator.Next()
res, err := iterator.NextWithConfig()
if err != nil {
logger.Warning("Failed iterating:", err)
logger.Error("Failed iterating:", err)
break
}
if res == nil {
// End of iteration
break
}
if res.PvtSimulationResults == nil {
logger.Warning("Resultset's PvtSimulationResults for", txAndSeq.txID, "is nil, skipping")
if res.PvtSimulationResultsWithConfig == nil {
logger.Warning("Resultset's PvtSimulationResultsWithConfig for", txAndSeq.txID, "is nil, skipping")
continue
}
for _, ns := range res.PvtSimulationResults.NsPvtRwset {
simRes := res.PvtSimulationResultsWithConfig
if simRes.PvtRwset == nil {
logger.Warning("The PvtRwset of PvtSimulationResultsWithConfig for", txAndSeq.txID, "is nil, skipping")
continue
}
for _, ns := range simRes.PvtRwset.NsPvtRwset {
for _, col := range ns.CollectionPvtRwset {
key := rwSetKey{
txID: txAndSeq.txID,
Expand Down
52 changes: 37 additions & 15 deletions gossip/privdata/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,35 @@ func (store *mockTransientStore) GetTxPvtRWSetByTxid(txid string, filter ledger.

type mockRWSetScanner struct {
err error
results []*transientstore.EndorserPvtSimulationResults
results []*transientstore.EndorserPvtSimulationResultsWithConfig
}

func (scanner *mockRWSetScanner) withRWSet(ns string, col string) *mockRWSetScanner {
scanner.results = append(scanner.results, &transientstore.EndorserPvtSimulationResults{
PvtSimulationResults: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: ns,
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
scanner.results = append(scanner.results, &transientstore.EndorserPvtSimulationResultsWithConfig{
PvtSimulationResultsWithConfig: &transientstore2.TxPvtReadWriteSetWithConfigInfo{
PvtRwset: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: ns,
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: col,
Rwset: []byte("rws-pre-image"),
},
},
},
},
},
CollectionConfigs: map[string]*common.CollectionConfigPackage{
ns: {
Config: []*common.CollectionConfig{
{
CollectionName: col,
Rwset: []byte("rws-pre-image"),
Payload: &common.CollectionConfig_StaticCollectionConfig{
StaticCollectionConfig: &common.StaticCollectionConfig{
Name: col,
},
},
},
},
},
Expand All @@ -141,21 +156,21 @@ func (scanner *mockRWSetScanner) withRWSet(ns string, col string) *mockRWSetScan
}

func (scanner *mockRWSetScanner) Next() (*transientstore.EndorserPvtSimulationResults, error) {
panic("should not be used")
}

func (scanner *mockRWSetScanner) NextWithConfig() (*transientstore.EndorserPvtSimulationResultsWithConfig, error) {
if scanner.err != nil {
return nil, scanner.err
}
var res *transientstore.EndorserPvtSimulationResults
var res *transientstore.EndorserPvtSimulationResultsWithConfig
if len(scanner.results) == 0 {
return nil, nil
}
res, scanner.results = scanner.results[len(scanner.results)-1], scanner.results[:len(scanner.results)-1]
return res, nil
}

func (scanner *mockRWSetScanner) NextWithConfig() (*transientstore.EndorserPvtSimulationResultsWithConfig, error) {
return nil, nil
}

func (*mockRWSetScanner) Close() {
}

Expand Down Expand Up @@ -279,6 +294,7 @@ func (f *fetcherMock) On(methodName string, arguments ...interface{}) *fetchCall
}

func (f *fetcherMock) fetch(dig2src dig2sources) ([]*proto.PvtDataElement, error) {
fmt.Println("XXX: Expected endorsers", f.expectedEndorsers)
for _, endorsements := range dig2src {
for _, endorsement := range endorsements {
_, exists := f.expectedEndorsers[string(endorsement.Endorser)]
Expand Down Expand Up @@ -882,6 +898,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
block := bf.AddTxnWithEndorsement("tx1", "ns1", hash, "org1", true, "c1", "c2").
AddTxnWithEndorsement("tx2", "ns2", hash, "org2", true, "c1").create()

fmt.Println("Scenario I")
// Scenario I: Block we got has sufficient private data alongside it.
// If the coordinator tries fetching from the transientstore, or peers it would result in panic,
// because we didn't define yet the "On(...)" invocation of the transient store or other peers.
Expand All @@ -898,6 +915,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
assertCommitHappened()
assertPurged("tx1", "tx2")

fmt.Println("Scenario II")
// Scenario II: Block we got doesn't have sufficient private data alongside it,
// it is missing ns1: c2, but the data exists in the transient store
store.On("GetTxPvtRWSetByTxid", "tx1", mock.Anything).Return((&mockRWSetScanner{}).withRWSet("ns1", "c2"), nil)
Expand All @@ -914,6 +932,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
},
}, store.lastReqFilter)

fmt.Println("Scenario III")
// Scenario III: Block doesn't have sufficient private data alongside it,
// it is missing ns1: c2, and the data exists in the transient store,
// but it is also missing ns2: c1, and that data doesn't exist in the transient store - but in a peer.
Expand Down Expand Up @@ -956,6 +975,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
assert.NoError(t, err)
assertCommitHappened()

fmt.Println("Scenario IV")
// Scenario IV: Block came with more than sufficient private data alongside it, some of it is redundant.
pvtData = pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2", "c3").
addRWSet().addNSRWSet("ns2", "c1", "c3").addRWSet().addNSRWSet("ns1", "c4").create()
Expand All @@ -964,6 +984,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
assert.NoError(t, err)
assertCommitHappened()

fmt.Println("Scenario V")
// Scenario V: Block didn't get with any private data alongside it, and the transient store
// has some problem.
// In this case, we should try to fetch data from peers.
Expand Down Expand Up @@ -1010,6 +1031,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
assert.NoError(t, err)
assertCommitHappened()

fmt.Println("Scenario VI")
// Scenario VI: Block contains 2 transactions, and the peer is eligible for only tx3-ns3-c3.
// Also, the blocks comes with a private data for tx3-ns3-c3 so that the peer won't have to fetch the
// private data from the transient store or peers, and in fact- if it attempts to fetch the data it's not eligible
Expand Down
153 changes: 114 additions & 39 deletions gossip/privdata/dataretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/transientstore"
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/protos/common"
gossip2 "github.com/hyperledger/fabric/protos/gossip"
"github.com/hyperledger/fabric/protos/ledger/rwset"
)
Expand All @@ -18,7 +19,7 @@ import (
type StorageDataRetriever interface {
// CollectionRWSet retrieves for give digest relevant private data if
// available otherwise returns nil
CollectionRWSet(dig *gossip2.PvtDataDigest) []util.PrivateRWSet
CollectionRWSet(dig *gossip2.PvtDataDigest) *util.PrivateRWSetWithConfig
}

// DataStore defines set of APIs need to get private data
Expand Down Expand Up @@ -52,69 +53,129 @@ func NewDataRetriever(store DataStore) StorageDataRetriever {

// CollectionRWSet retrieves for give digest relevant private data if
// available otherwise returns nil
func (dr *dataRetriever) CollectionRWSet(dig *gossip2.PvtDataDigest) []util.PrivateRWSet {
func (dr *dataRetriever) CollectionRWSet(dig *gossip2.PvtDataDigest) *util.PrivateRWSetWithConfig {
filter := map[string]ledger.PvtCollFilter{
dig.Namespace: map[string]bool{
dig.Collection: true,
},
}

pRWsets := []util.PrivateRWSet{}

height, err := dr.store.LedgerHeight()
if err != nil {
// if there is an error getting info from the ledger, we need to try to read from transient store
logger.Warning("Wasn't able to read ledger height, due to", err, "trying to lookup "+
logger.Error("Wasn't able to read ledger height, due to", err, "trying to lookup "+
"private data from transient store, namespace", dig.Namespace, "collection name", dig.Collection, "txID", dig.TxId)
return nil
}
if height <= dig.BlockSeq {
logger.Debug("Current ledger height ", height, "is below requested block sequence number",
dig.BlockSeq, "retrieving private data from transient store, namespace", dig.Namespace, "collection name",
dig.Collection, "txID", dig.TxId)
}
if err != nil || height <= dig.BlockSeq { // Check whenever current ledger height is equal or above block sequence num.
if height <= dig.BlockSeq { // Check whenever current ledger height is equal or above block sequence num.
return dr.fromTransientStore(dig, filter)
}
// Since ledger height is above block sequence number private data is might be available in the ledger
return dr.fromLedger(dig, filter)
}

func (dr *dataRetriever) fromLedger(dig *gossip2.PvtDataDigest, filter map[string]ledger.PvtCollFilter) *util.PrivateRWSetWithConfig {
results := &util.PrivateRWSetWithConfig{}
pvtData, err := dr.store.GetPvtDataByNum(dig.BlockSeq, filter)
if err != nil {
logger.Error("Wasn't able to obtain private data for collection", dig.Collection,
"txID", dig.TxId, "block sequence number", dig.BlockSeq, "due to", err)
return nil
}

for _, data := range pvtData {
if data.WriteSet == nil {
logger.Warning("Received nil write set for collection", dig.Collection, "namespace", dig.Namespace)
continue
}
pvtRWSet := dr.extractPvtRWsets(data.WriteSet.NsPvtRwset, dig.Namespace, dig.Collection)
results.RWSet = append(results.RWSet, pvtRWSet...)
}

confHistoryRetriever, err := dr.store.GetConfigHistoryRetriever()
if err != nil {
logger.Error("Cannot obtain configuration history retriever, for collection,", dig.Collection,
"txID", dig.TxId, "block sequence number", dig.BlockSeq, "due to", err)
return nil
}

configInfo, err := confHistoryRetriever.MostRecentCollectionConfigBelow(dig.BlockSeq, dig.Namespace)
if err != nil {
logger.Error("Cannot find recent collection config update below block sequence = ", dig.BlockSeq,
"collection name =", dig.Collection, "for chaincode", dig.Namespace)
return nil
}

if configInfo == nil {
logger.Error("No collection config update below block sequence = ", dig.BlockSeq,
"collection name =", dig.Collection, "for chaincode", dig.Namespace, "is available")
return nil
}
configs := dr.extractCollectionConfigs(configInfo.CollectionConfig, dig)
if configs == nil {
logger.Error("No collection config was found for collection", dig.Collection,
"namespace", dig.Namespace, "txID", dig.TxId)
return nil
}
results.CollectionConfig = configs
return results
}

func (dr *dataRetriever) fromTransientStore(dig *gossip2.PvtDataDigest, filter map[string]ledger.PvtCollFilter) *util.PrivateRWSetWithConfig {
results := &util.PrivateRWSetWithConfig{}
it, err := dr.store.GetTxPvtRWSetByTxid(dig.TxId, filter)
if err != nil {
logger.Error("Was not able to retrieve private data from transient store, namespace", dig.Namespace,
", collection name", dig.Collection, ", txID", dig.TxId, ", due to", err)
return nil
}
defer it.Close()

it, err := dr.store.GetTxPvtRWSetByTxid(dig.TxId, filter)
for {
res, err := it.NextWithConfig()
if err != nil {
logger.Error("Was not able to retrieve private data from transient store, namespace", dig.Namespace,
logger.Error("Error getting next element out of private data iterator, namespace", dig.Namespace,
", collection name", dig.Collection, ", txID", dig.TxId, ", due to", err)
return nil
}
defer it.Close()

for {
res, err := it.Next()
if err != nil {
logger.Error("Error getting next element out of private data iterator, namespace", dig.Namespace,
", collection name", dig.Collection, ", txID", dig.TxId, ", due to", err)
return nil
}
if res == nil {
return pRWsets
}
rws := res.PvtSimulationResults
if rws == nil {
logger.Debug("Skipping empty PvtSimulationResults received at block height", res.ReceivedAtBlockHeight)
continue
}
pRWsets = append(pRWsets, dr.extractPvtRWsets(rws.NsPvtRwset, dig.Namespace, dig.Collection)...)
if res == nil {
return results
}
} else { // Since ledger height is above block sequence number private data is available in the ledger
pvtData, err := dr.store.GetPvtDataByNum(dig.BlockSeq, filter)
if err != nil {
logger.Error("Wasn't able to obtain private data for collection", dig.Collection,
"txID", dig.TxId, "block sequence number", dig.BlockSeq, "due to", err)
rws := res.PvtSimulationResultsWithConfig
if rws == nil {
logger.Debug("Skipping nil PvtSimulationResultsWithConfig received at block height", res.ReceivedAtBlockHeight)
continue
}
for _, data := range pvtData {
if data.WriteSet == nil {
logger.Warning("Received nil write set for collection", dig.Collection, "namespace", dig.Namespace)
continue
}
pRWsets = append(pRWsets, dr.extractPvtRWsets(data.WriteSet.NsPvtRwset, dig.Namespace, dig.Collection)...)
txPvtRWSet := rws.PvtRwset
if txPvtRWSet == nil {
logger.Debug("Skipping empty PvtRwset of PvtSimulationResultsWithConfig received at block height", res.ReceivedAtBlockHeight)
continue
}
}

return pRWsets
colConfigs, found := rws.CollectionConfigs[dig.Namespace]
if !found {
logger.Error("No collection config was found for chaincode", dig.Namespace, "collection name",
dig.Namespace, "txID", dig.TxId)
continue
}
configs := dr.extractCollectionConfigs(colConfigs, dig)
if configs == nil {
logger.Error("No collection config was found for collection", dig.Collection,
"namespace", dig.Namespace, "txID", dig.TxId)
continue
}

pvtRWSet := dr.extractPvtRWsets(txPvtRWSet.NsPvtRwset, dig.Namespace, dig.Collection)
// TODO: Next CR will extend TxPvtReadWriteSetWithConfigInfo to have ledger height of
// endorsement time to be used here in order to select most updated collection config.
results.CollectionConfig = configs
results.RWSet = append(results.RWSet, pvtRWSet...)
}
}

func (dr *dataRetriever) extractPvtRWsets(pvtRWSets []*rwset.NsPvtReadWriteSet, namespace string, collectionName string) []util.PrivateRWSet {
Expand All @@ -140,3 +201,17 @@ func (dr *dataRetriever) extractPvtRWsets(pvtRWSets []*rwset.NsPvtReadWriteSet,

return pRWsets
}

func (dr *dataRetriever) extractCollectionConfigs(configPackage *common.CollectionConfigPackage, digest *gossip2.PvtDataDigest) *common.CollectionConfig {
for _, config := range configPackage.Config {
switch cconf := config.Payload.(type) {
case *common.CollectionConfig_StaticCollectionConfig:
if cconf.StaticCollectionConfig.Name == digest.Collection {
return config
}
default:
return nil
}
}
return nil
}
Loading

0 comments on commit d6ba31b

Please sign in to comment.