Skip to content

Commit

Permalink
FAB-1291: Couch support for doing a savepoint.
Browse files Browse the repository at this point in the history
Added two functions in couchdb_txmgr.go, recordSavepoint is unexported and called during commit
go record a savepoint into couch. GetBlockNumFromSavepoint should be used during recovery to
get the block number associated with the savepoint

Added similar methods to goleveldb txmgr implementation

Added unit test function to xxx_txmgmt_test.go

Change-Id: Id2860232632d29cbe753b2840a625c34e541f2d9
Signed-off-by: Balaji Viswanathan <[email protected]>
  • Loading branch information
bviswana101 committed Dec 6, 2016
1 parent ede30a4 commit 0183483
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 3 deletions.
30 changes: 30 additions & 0 deletions core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,33 @@ func TestDatabaseAutoCreate(t *testing.T) {
}

}

//TestSavepoint tests the recordSavepoint and GetBlockNumfromSavepoint methods for recording and reading a savepoint document
func TestSavepoint(t *testing.T) {

//Only run the tests if CouchDB is explitily enabled in the code,
//otherwise CouchDB may not be installed and all the tests would fail
//TODO replace this with external config property rather than config within the code
if ledgerconfig.IsCouchDBEnabled() == true {

env := newTestEnv(t)

txMgr := NewCouchDBTxMgr(env.conf,
env.couchDBAddress, //couchDB Address
env.couchDatabaseName, //couchDB db name
env.couchUsername, //enter couchDB id
env.couchPassword) //enter couchDB pw

// record savepoint
txMgr.blockNum = 5
err := txMgr.recordSavepoint()
testutil.AssertNoError(t, err, fmt.Sprintf("Error when saving recordpoint data"))

// read the savepoint
blockNum, err := txMgr.GetBlockNumFromSavepoint()
testutil.AssertNoError(t, err, fmt.Sprintf("Error when saving recordpoint data"))
testutil.AssertEquals(t, txMgr.blockNum, blockNum)

txMgr.Shutdown()
}
}
88 changes: 88 additions & 0 deletions core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package couchdbtxmgmt

import (
"encoding/json"
"errors"
"sync"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -48,6 +50,15 @@ type updateSet struct {
m map[string]*versionedValue
}

// Savepoint docid (key) for couchdb
const savepointDocID = "statedb_savepoint"

// Savepoint data for couchdb
type couchSavepointData struct {
BlockNum uint64 `json:"BlockNum"`
UpdateSeq string `json:"UpdateSeq"`
}

func newUpdateSet() *updateSet {
return &updateSet{make(map[string]*versionedValue)}
}
Expand All @@ -72,6 +83,7 @@ type CouchDBTxMgr struct {
updateSet *updateSet
commitRWLock sync.RWMutex
couchDB *couchdb.CouchDBConnectionDef // COUCHDB new properties for CouchDB
blockNum uint64 // block number corresponding to updateSet
}

// CouchConnection provides connection info for CouchDB
Expand Down Expand Up @@ -119,6 +131,7 @@ func (txmgr *CouchDBTxMgr) ValidateAndPrepare(block *common.Block) (*common.Bloc
invalidTxs := []*pb.InvalidTransaction{}
var valid bool
txmgr.updateSet = newUpdateSet()
txmgr.blockNum = block.Header.Number
logger.Debugf("Validating a block with [%d] transactions", len(block.Data.Data))
for txIndex, envBytes := range block.Data.Data {
// extract actions from the envelope message
Expand Down Expand Up @@ -159,6 +172,7 @@ func (txmgr *CouchDBTxMgr) ValidateAndPrepare(block *common.Block) (*common.Bloc
Transaction: &pb.Transaction{ /* FIXME */ }, Cause: pb.InvalidTransaction_RWConflictDuringCommit})
}
}

logger.Debugf("===COUCHDB=== Exiting CouchDBTxMgr.ValidateAndPrepare()")
return block, invalidTxs, nil
}
Expand Down Expand Up @@ -258,13 +272,87 @@ func (txmgr *CouchDBTxMgr) Commit() error {

}

// Record a savepoint
err := txmgr.recordSavepoint()
if err != nil {
logger.Errorf("===COUCHDB=== Error during recordSavepoint: %s\n", err.Error())
return err
}

logger.Debugf("===COUCHDB=== Exiting CouchDBTxMgr.Commit()")
return nil
}

// recordSavepoint Record a savepoint in statedb.
// Couch parallelizes writes in cluster or sharded setup and ordering is not guaranteed.
// Hence we need to fence the savepoint with sync. So ensure_full_commit is called before AND after writing savepoint document
// TODO: Optimization - merge 2nd ensure_full_commit with savepoint by using X-Couch-Full-Commit header
func (txmgr *CouchDBTxMgr) recordSavepoint() error {
var err error
var savepointDoc couchSavepointData
// ensure full commit to flush all changes until now to disk
dbResponse, err := txmgr.couchDB.EnsureFullCommit()
if err != nil || dbResponse.Ok != true {
logger.Errorf("====COUCHDB==== Failed to perform full commit\n")
return errors.New("Failed to perform full commit")
}

// construct savepoint document
// UpdateSeq would be useful if we want to get all db changes since a logical savepoint
dbInfo, _, err := txmgr.couchDB.GetDatabaseInfo()
if err != nil {
logger.Errorf("====COUCHDB==== Failed to get DB info %s\n", err.Error())
return err
}
savepointDoc.BlockNum = txmgr.blockNum
savepointDoc.UpdateSeq = dbInfo.UpdateSeq

savepointDocJSON, err := json.Marshal(savepointDoc)
if err != nil {
logger.Errorf("====COUCHDB==== Failed to create savepoint data %s\n", err.Error())
return err
}

// SaveDoc using couchdb client and use JSON format
_, err = txmgr.couchDB.SaveDoc(savepointDocID, "", savepointDocJSON, nil)
if err != nil {
logger.Errorf("====CouchDB==== Failed to save the savepoint to DB %s\n", err.Error())
}

// ensure full commit to flush savepoint to disk
dbResponse, err = txmgr.couchDB.EnsureFullCommit()
if err != nil || dbResponse.Ok != true {
logger.Errorf("====COUCHDB==== Failed to perform full commit\n")
return errors.New("Failed to perform full commit")
}
return nil
}

// GetBlockNumFromSavepoint Reads the savepoint from database and returns the corresponding block number.
// If no savepoint is found, it returns 0
func (txmgr *CouchDBTxMgr) GetBlockNumFromSavepoint() (uint64, error) {
var err error
savepointJSON, _, err := txmgr.couchDB.ReadDoc(savepointDocID)
if err != nil {
// TODO: differentiate between 404 and some other error code
logger.Errorf("====COUCHDB==== Failed to read savepoint data %s\n", err.Error())
return 0, err
}

savepointDoc := &couchSavepointData{}
err = json.Unmarshal(savepointJSON, &savepointDoc)
if err != nil {
logger.Errorf("====COUCHDB==== Failed to read savepoint data %s\n", err.Error())
return 0, err
}

return savepointDoc.BlockNum, nil
}

// Rollback implements method in interface `txmgmt.TxMgr`
func (txmgr *CouchDBTxMgr) Rollback() {
txmgr.updateSet = nil
txmgr.blockNum = 0
}

func (txmgr *CouchDBTxMgr) getCommitedVersion(ns string, key string) (*version.Height, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package lockbasedtxmgmt

import (
"fmt"
"math"
"testing"

"github.com/hyperledger/fabric/core/ledger"
Expand Down Expand Up @@ -67,9 +68,13 @@ func TestTxSimulatorWithExistingData(t *testing.T) {
testutil.AssertNoError(t, err, fmt.Sprintf("Error in validateTx(): %s", err))
testutil.AssertSame(t, isValid, true)
txMgr.addWriteSetToBatch(txRWSet, version.NewHeight(1, 1))
txMgr.blockNum = math.MaxUint64
err = txMgr.Commit()
testutil.AssertNoError(t, err, fmt.Sprintf("Error while calling commit(): %s", err))

blockNum, err := txMgr.GetBlockNumFromSavepoint()
testutil.AssertEquals(t, blockNum, txMgr.blockNum)

// simulate tx2 that make changes to existing data
s2, _ := txMgr.NewTxSimulator()
value, _ := s2.GetState("ns1", "key1")
Expand Down Expand Up @@ -258,7 +263,7 @@ func testIterator(t *testing.T, numKeys int, startKeyNum int, endKeyNum int) {
keyNum := begin + count
k := kv.(*ledger.KV).Key
v := kv.(*ledger.KV).Value
t.Logf("Retrieved k=%s, v=%s", k, v)
t.Logf("Retrieved k=%s, v=%s at count=%d start=%s end=%s", k, v, count, startKey, endKey)
testutil.AssertEquals(t, k, createTestKey(keyNum))
testutil.AssertEquals(t, v, createTestValue(keyNum))
count++
Expand Down
31 changes: 31 additions & 0 deletions core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package lockbasedtxmgmt

import (
"bytes"
"encoding/binary"
"reflect"
"sync"

"github.com/hyperledger/fabric/core/ledger"
Expand Down Expand Up @@ -50,6 +52,9 @@ type updateSet struct {
m map[string]*versionedValue
}

// savepoint key
const savepointKey = "savepoint"

func newUpdateSet() *updateSet {
return &updateSet{make(map[string]*versionedValue)}
}
Expand All @@ -73,6 +78,7 @@ type LockBasedTxMgr struct {
db *db.DB
updateSet *updateSet
commitRWLock sync.RWMutex
blockNum uint64
}

// NewLockBasedTxMgr constructs a `LockBasedTxMgr`
Expand Down Expand Up @@ -102,6 +108,7 @@ func (txmgr *LockBasedTxMgr) ValidateAndPrepare(block *common.Block) (*common.Bl
invalidTxs := []*pb.InvalidTransaction{}
var valid bool
txmgr.updateSet = newUpdateSet()
txmgr.blockNum = block.Header.Number
logger.Debugf("Validating a block with [%d] transactions", len(block.Data.Data))
for txIndex, envBytes := range block.Data.Data {
// extract actions from the envelope message
Expand Down Expand Up @@ -203,6 +210,15 @@ func (txmgr *LockBasedTxMgr) Commit() error {
batch.Put([]byte(k), encodeValue(v.value, v.version))
}
}

// record the savepoint along with batch
if txmgr.blockNum != 0 {
savepointValue := make([]byte, reflect.TypeOf(txmgr.blockNum).Size())
binary.LittleEndian.PutUint64(savepointValue, txmgr.blockNum)
// Need a composite key for iterator to function correctly - use separator itself as special/hidden namespace
batch.Put(constructCompositeKey(string(compositeKeySep), savepointKey), savepointValue)
}

txmgr.commitRWLock.Lock()
defer txmgr.commitRWLock.Unlock()
defer func() { txmgr.updateSet = nil }()
Expand All @@ -212,9 +228,24 @@ func (txmgr *LockBasedTxMgr) Commit() error {
return nil
}

// GetBlockNumFromSavepoint returns the block num recorded in savepoint,
// returns 0 if NO savepoint is found
func (txmgr *LockBasedTxMgr) GetBlockNumFromSavepoint() (uint64, error) {
var blockNum uint64
savepointValue, err := txmgr.db.Get(constructCompositeKey(string(compositeKeySep), savepointKey))
if err != nil {
return 0, err
}

// savepointValue is not encoded with version
blockNum = binary.LittleEndian.Uint64(savepointValue)
return blockNum, nil
}

// Rollback implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) Rollback() {
txmgr.updateSet = nil
txmgr.blockNum = 0
}

func (txmgr *LockBasedTxMgr) getCommitedVersion(ns string, key string) (*version.Height, error) {
Expand Down
36 changes: 34 additions & 2 deletions core/ledger/util/couchdb/couchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,38 @@ func (dbclient *CouchDBConnectionDef) DropDatabase() (*DBOperationResponse, erro

}

// EnsureFullCommit calls _ensure_full_commit for explicit fsync
func (dbclient *CouchDBConnectionDef) EnsureFullCommit() (*DBOperationResponse, error) {

logger.Debugf("===COUCHDB=== Entering EnsureFullCommit()")

url := fmt.Sprintf("%s/%s/_ensure_full_commit", dbclient.URL, dbclient.Database)

resp, _, err := dbclient.handleRequest(http.MethodPost, url, nil, "", "")
if err != nil {
logger.Errorf("====COUCHDB==== Failed to invoke _ensure_full_commit Error: %s\n", err.Error())
return nil, err
}
defer resp.Body.Close()

dbResponse := &DBOperationResponse{}
json.NewDecoder(resp.Body).Decode(&dbResponse)

if dbResponse.Ok == true {
logger.Debugf("===COUCHDB=== _ensure_full_commit database %s ", dbclient.Database)
}

logger.Debugf("===COUCHDB=== Exiting EnsureFullCommit()")

if dbResponse.Ok == true {

return dbResponse, nil

}

return dbResponse, fmt.Errorf("Error syncing database")
}

//SaveDoc method provides a function to save a document, id and byte array
func (dbclient *CouchDBConnectionDef) SaveDoc(id string, rev string, bytesDoc []byte, attachments []Attachment) (string, error) {

Expand Down Expand Up @@ -497,7 +529,7 @@ func (dbclient *CouchDBConnectionDef) handleRequest(method, url string, data io.
}

//add content header for PUT
if method == http.MethodPut {
if method == http.MethodPut || method == http.MethodPost {

//If the multipartBoundary is not set, then this is a JSON and content-type should be set
//to application/json. Else, this is contains an attachment and needs to be multipart
Expand All @@ -514,7 +546,7 @@ func (dbclient *CouchDBConnectionDef) handleRequest(method, url string, data io.
}

//add content header for PUT
if method == http.MethodPut {
if method == http.MethodPut || method == http.MethodPost {
req.Header.Set("Accept", "application/json")
}

Expand Down

0 comments on commit 0183483

Please sign in to comment.