Skip to content

Commit

Permalink
FAB-1336 Add new ledger blockstorage index.
Browse files Browse the repository at this point in the history
Add a new ledger blockstorage index for History
that will map (blocknum,trannum) to the file storage
location for this block transaction

This index will be used for the API  GetTransactionsForKey()
for (chaincode1,key1).  It will do a key range query on chaincode1~key1
to pick up all chaincode1~key1 records.  Results will indicate
the set of (blocknum,trannum) transactions that updated this key.

Change-Id: I81da09e5526d7e2966634c78a03d34011d514442
Signed-off-by: Mari Wade <[email protected]>
  • Loading branch information
mariwade committed Dec 13, 2016
1 parent d18aa98 commit 458c521
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 28 deletions.
7 changes: 4 additions & 3 deletions core/ledger/blkstorage/blockstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ type IndexableAttr string

// constants for indexable attributes
const (
IndexableAttrBlockNum = IndexableAttr("BlockNum")
IndexableAttrBlockHash = IndexableAttr("BlockHash")
IndexableAttrTxID = IndexableAttr("TxID")
IndexableAttrBlockNum = IndexableAttr("BlockNum")
IndexableAttrBlockHash = IndexableAttr("BlockHash")
IndexableAttrTxID = IndexableAttr("TxID")
IndexableAttrBlockNumTranNum = IndexableAttr("BlockNumTranNum")
)

// IndexConfig - a configuration that includes a list of attributes that should be indexed
Expand Down
23 changes: 16 additions & 7 deletions core/ledger/blkstorage/fsblkstorage/block_serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ import (

type serializedBlockInfo struct {
blockHeader *common.BlockHeader
txOffsets map[string]*locPointer
txOffsets []*txindexInfo
}

//The order of the transactions must be maintained for history
type txindexInfo struct {
txID string
loc *locPointer
}

func serializeBlock(block *common.Block) ([]byte, *serializedBlockInfo, error) {
Expand Down Expand Up @@ -94,8 +100,9 @@ func addHeaderBytes(blockHeader *common.BlockHeader, buf *proto.Buffer) error {
return nil
}

func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) (map[string]*locPointer, error) {
txOffsets := make(map[string]*locPointer)
func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) ([]*txindexInfo, error) {
var txOffsets []*txindexInfo

if err := buf.EncodeVarint(uint64(len(blockData.Data))); err != nil {
return nil, err
}
Expand All @@ -108,7 +115,8 @@ func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) (map[string]*l
if err := buf.EncodeRawBytes(txEnvelopeBytes); err != nil {
return nil, err
}
txOffsets[txid] = &locPointer{offset, len(buf.Bytes()) - offset}
idxInfo := &txindexInfo{txid, &locPointer{offset, len(buf.Bytes()) - offset}}
txOffsets = append(txOffsets, idxInfo)
}
return txOffsets, nil
}
Expand Down Expand Up @@ -147,9 +155,9 @@ func extractHeader(buf *ledgerutil.Buffer) (*common.BlockHeader, error) {
return header, nil
}

func extractData(buf *ledgerutil.Buffer) (*common.BlockData, map[string]*locPointer, error) {
func extractData(buf *ledgerutil.Buffer) (*common.BlockData, []*txindexInfo, error) {
data := &common.BlockData{}
txOffsets := make(map[string]*locPointer)
var txOffsets []*txindexInfo
var numItems uint64
var err error

Expand All @@ -167,7 +175,8 @@ func extractData(buf *ledgerutil.Buffer) (*common.BlockData, map[string]*locPoin
return nil, nil, err
}
data.Data = append(data.Data, txEnvBytes)
txOffsets[txid] = &locPointer{txOffset, buf.GetBytesConsumed() - txOffset}
idxInfo := &txindexInfo{txid, &locPointer{txOffset, buf.GetBytesConsumed() - txOffset}}
txOffsets = append(txOffsets, idxInfo)
}
return data, txOffsets, nil
}
Expand Down
12 changes: 8 additions & 4 deletions core/ledger/blkstorage/fsblkstorage/block_serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@ func TestSerializedBlockInfo(t *testing.T) {
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, infoFromBB, info)
testutil.AssertEquals(t, len(info.txOffsets), len(block.Data.Data))
for _, txEnvBytes := range block.Data.Data {
for txIndex, txEnvBytes := range block.Data.Data {
txid, err := extractTxID(txEnvBytes)
testutil.AssertNoError(t, err, "")
offset, ok := info.txOffsets[txid]
testutil.AssertEquals(t, ok, true)
b := bb[offset.offset:]

indexInfo := info.txOffsets[txIndex]
indexTxID := indexInfo.txID
indexOffset := indexInfo.loc

testutil.AssertEquals(t, txid, indexTxID)
b := bb[indexOffset.offset:]
len, num := proto.DecodeVarint(b)
txEnvBytesFromBB := b[num : num+int(len)]
testutil.AssertEquals(t, txEnvBytesFromBB, txEnvBytes)
Expand Down
13 changes: 11 additions & 2 deletions core/ledger/blkstorage/fsblkstorage/blockfile_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (mgr *blockfileMgr) addBlock(block *common.Block) error {
blockFLP.offset = currentOffset
// shift the txoffset because we prepend length of bytes before block bytes
for _, txOffset := range txOffsets {
txOffset.offset += len(blockBytesEncodedLen)
txOffset.loc.offset += len(blockBytesEncodedLen)
}
//save the index in the database
mgr.index.indexBlock(&blockIdxInfo{
Expand Down Expand Up @@ -363,7 +363,7 @@ func (mgr *blockfileMgr) syncIndex() error {
return err
}
for _, offset := range info.txOffsets {
offset.offset += int(blockPlacementInfo.blockBytesOffset)
offset.loc.offset += int(blockPlacementInfo.blockBytesOffset)
}
//Update the blockIndexInfo with what was actually stored in file system
blockIdxInfo := &blockIdxInfo{}
Expand Down Expand Up @@ -456,6 +456,15 @@ func (mgr *blockfileMgr) retrieveTransactionByID(txID string) (*pb.Transaction,
return mgr.fetchTransaction(loc)
}

func (mgr *blockfileMgr) retrieveTransactionForBlockNumTranNum(blockNum uint64, tranNum uint64) (*pb.Transaction, error) {
logger.Debugf("retrieveTransactionForBlockNumTranNum() - blockNum = [%d], tranNum = [%d]", blockNum, tranNum)
loc, err := mgr.index.getTXLocForBlockNumTranNum(blockNum, tranNum)
if err != nil {
return nil, err
}
return mgr.fetchTransaction(loc)
}

func (mgr *blockfileMgr) fetchBlock(lp *fileLocPointer) (*common.Block, error) {
blockBytes, err := mgr.fetchBlockBytes(lp)
if err != nil {
Expand Down
60 changes: 51 additions & 9 deletions core/ledger/blkstorage/fsblkstorage/blockindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import (
)

const (
blockNumIdxKeyPrefix = 'n'
blockHashIdxKeyPrefix = 'h'
txIDIdxKeyPrefix = 't'
indexCheckpointKeyStr = "indexCheckpointKey"
blockNumIdxKeyPrefix = 'n'
blockHashIdxKeyPrefix = 'h'
txIDIdxKeyPrefix = 't'
blockNumTranNumIdxKeyPrefix = 'a'
indexCheckpointKeyStr = "indexCheckpointKey"
)

var indexCheckpointKey = []byte(indexCheckpointKeyStr)
Expand All @@ -41,13 +42,14 @@ type index interface {
getBlockLocByHash(blockHash []byte) (*fileLocPointer, error)
getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error)
getTxLoc(txID string) (*fileLocPointer, error)
getTXLocForBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error)
}

type blockIdxInfo struct {
blockNum uint64
blockHash []byte
flp *fileLocPointer
txOffsets map[string]*locPointer
txOffsets []*txindexInfo
}

type blockIndex struct {
Expand Down Expand Up @@ -89,25 +91,42 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
return err
}

//Index1
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; ok {
batch.Put(constructBlockHashKey(blockIdxInfo.blockHash), flpBytes)
}

//Index2
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; ok {
batch.Put(constructBlockNumKey(blockIdxInfo.blockNum), flpBytes)
}

//Index3 Used to find a transactin by it's transaction id
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; ok {
for txid, txoffset := range txOffsets {
txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset)
logger.Debugf("Adding txLoc [%s] for tx [%s] to index", txFlp, txid)
for _, txoffset := range txOffsets {
txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
logger.Debugf("Adding txLoc [%s] for tx ID: [%s] to index", txFlp, txoffset.txID)
txFlpBytes, marshalErr := txFlp.marshal()
if marshalErr != nil {
return marshalErr
}
batch.Put(constructTxIDKey(txid), txFlpBytes)
batch.Put(constructTxIDKey(txoffset.txID), txFlpBytes)
}
}

//Index4 - Store BlockNumTranNum will be used to query history data
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; ok {
for txIterator, txoffset := range txOffsets {
txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
logger.Debugf("Adding txLoc [%s] for tx number:[%d] ID: [%s] to blockNumTranNum index", txFlp, txIterator+1, txoffset.txID)
txFlpBytes, marshalErr := txFlp.marshal()
if marshalErr != nil {
return marshalErr
}
batch.Put(constructBlockNumTranNumKey(blockIdxInfo.blockNum, uint64(txIterator+1)), txFlpBytes)
}
}

batch.Put(indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum))
if err := index.db.WriteBatch(batch, false); err != nil {
return err
Expand Down Expand Up @@ -163,6 +182,22 @@ func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error) {
return txFLP, nil
}

func (index *blockIndex) getTXLocForBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) {
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; !ok {
return nil, blkstorage.ErrAttrNotIndexed
}
b, err := index.db.Get(constructBlockNumTranNumKey(blockNum, tranNum))
if err != nil {
return nil, err
}
if b == nil {
return nil, blkstorage.ErrNotFoundInIndex
}
txFLP := &fileLocPointer{}
txFLP.unmarshal(b)
return txFLP, nil
}

func constructBlockNumKey(blockNum uint64) []byte {
blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum)
return append([]byte{blockNumIdxKeyPrefix}, blkNumBytes...)
Expand All @@ -176,6 +211,13 @@ func constructTxIDKey(txID string) []byte {
return append([]byte{txIDIdxKeyPrefix}, []byte(txID)...)
}

func constructBlockNumTranNumKey(blockNum uint64, txNum uint64) []byte {
blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum)
tranNumBytes := util.EncodeOrderPreservingVarUint64(txNum)
key := append(blkNumBytes, tranNumBytes...)
return append([]byte{blockNumTranNumIdxKeyPrefix}, key...)
}

func constructTxID(blockNum uint64, txNum int) string {
return fmt.Sprintf("%d:%d", blockNum, txNum)
}
Expand Down
16 changes: 16 additions & 0 deletions core/ledger/blkstorage/fsblkstorage/blockindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func (i *noopIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, err
func (i *noopIndex) getTxLoc(txID string) (*fileLocPointer, error) {
return nil, nil
}
func (i *noopIndex) getTXLocForBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) {
return nil, nil
}

func TestBlockIndexSync(t *testing.T) {
testBlockIndexSync(t, 10, 5, false)
Expand Down Expand Up @@ -103,7 +106,9 @@ func TestBlockIndexSelectiveIndexing(t *testing.T) {
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockHash})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockNum})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrTxID})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockNumTranNum})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockHash, blkstorage.IndexableAttrBlockNum})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrTxID, blkstorage.IndexableAttrBlockNumTranNum})
}

func testBlockIndexSelectiveIndexing(t *testing.T, indexItems []blkstorage.IndexableAttr) {
Expand Down Expand Up @@ -149,4 +154,15 @@ func testBlockIndexSelectiveIndexing(t *testing.T, indexItems []blkstorage.Index
} else {
testutil.AssertSame(t, err, blkstorage.ErrAttrNotIndexed)
}

//test 'retrieveTrasnactionsByBlockNumTranNum
tx2, err := blockfileMgr.retrieveTransactionForBlockNumTranNum(1, 1)
if testutil.Contains(indexItems, blkstorage.IndexableAttrBlockNumTranNum) {
testutil.AssertNoError(t, err, "Error while retrieving tx by blockNum and tranNum")
txOrig2, err2 := extractTransaction(blocks[0].Data.Data[0])
testutil.AssertNoError(t, err2, "")
testutil.AssertEquals(t, tx2, txOrig2)
} else {
testutil.AssertSame(t, err, blkstorage.ErrAttrNotIndexed)
}
}
1 change: 1 addition & 0 deletions core/ledger/blkstorage/fsblkstorage/pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func newTestEnv(t testing.TB) *testEnv {
blkstorage.IndexableAttrBlockHash,
blkstorage.IndexableAttrBlockNum,
blkstorage.IndexableAttrTxID,
blkstorage.IndexableAttrBlockNumTranNum,
}
os.RemoveAll(conf.dbPath)
os.RemoveAll(conf.blockfilesDir)
Expand Down
1 change: 1 addition & 0 deletions core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) {
blkstorage.IndexableAttrBlockHash,
blkstorage.IndexableAttrBlockNum,
blkstorage.IndexableAttrTxID,
blkstorage.IndexableAttrBlockNumTranNum,
}
indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex}
blockStorageConf := fsblkstorage.NewConf(conf.blockStorageDir, conf.maxBlockfileSize)
Expand Down
16 changes: 13 additions & 3 deletions core/ledger/kvledger/kv_ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
simulator, _ := ledger.NewTxSimulator()
simulator.SetState("ns1", "key4", []byte("value1"))
simulator.SetState("ns1", "key5", []byte("value2"))
simulator.SetState("ns1", "key6", []byte("{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091624\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
simulator.SetState("ns1", "key6", []byte("{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091622\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
simulator.SetState("ns1", "key7", []byte("{\"shipmentID\":\"161003PKC7600\",\"customsInvoice\":{\"methodOfTransport\":\"AIR MAYBE\",\"invoiceNumber\":\"00091624\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
simulator.Done()
simRes, _ := simulator.GetTxSimulationResults()
Expand All @@ -195,6 +195,7 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
Height: 1, CurrentBlockHash: block1Hash, PreviousBlockHash: []byte{}})

//Note key 4 and 6 are updates but key 7 is new. I.E. should see history for key 4 and 6 if history is enabled
simulationResults := [][]byte{}
simulator, _ = ledger.NewTxSimulator()
simulator.SetState("ns1", "key4", []byte("value3"))
simulator.SetState("ns1", "key5", []byte("{\"shipmentID\":\"161003PKC7500\",\"customsInvoice\":{\"methodOfTransport\":\"AIR FREIGHT\",\"invoiceNumber\":\"00091623\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
Expand All @@ -203,7 +204,16 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
simulator.SetState("ns1", "key8", []byte("{\"shipmentID\":\"161003PKC7700\",\"customsInvoice\":{\"methodOfTransport\":\"SHIP\",\"invoiceNumber\":\"00091625\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
simulator.Done()
simRes, _ = simulator.GetTxSimulationResults()
block2 := bg.NextBlock([][]byte{simRes}, false)
simulationResults = append(simulationResults, simRes)
//add a 2nd transaction
simulator2, _ := ledger.NewTxSimulator()
simulator2.SetState("ns1", "key9", []byte("value5"))
simulator2.SetState("ns1", "key10", []byte("{\"shipmentID\":\"261003PKC8000\",\"customsInvoice\":{\"methodOfTransport\":\"DONKEY\",\"invoiceNumber\":\"00091626\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
simulator2.Done()
simRes2, _ := simulator2.GetTxSimulationResults()
simulationResults = append(simulationResults, simRes2)

block2 := bg.NextBlock(simulationResults, false)
ledger.RemoveInvalidTransactionsAndPrepare(block2)
ledger.Commit()

Expand All @@ -225,6 +235,6 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
testutil.AssertEquals(t, b2, block2)

if ledgerconfig.IsHistoryDBEnabled() == true {
//TODO history specific test
//TODO history specific test once the query api's are in and we can validate content
}
}

0 comments on commit 458c521

Please sign in to comment.