diff --git a/internal/bcdb/db.go b/internal/bcdb/db.go index f80243a1..f5343910 100644 --- a/internal/bcdb/db.go +++ b/internal/bcdb/db.go @@ -20,7 +20,6 @@ import ( "github.com/IBM-Blockchain/bcdb-server/pkg/crypto" "github.com/IBM-Blockchain/bcdb-server/pkg/logger" "github.com/IBM-Blockchain/bcdb-server/pkg/types" - "github.com/google/uuid" "github.com/pkg/errors" ) @@ -112,10 +111,6 @@ type DB interface { // timeout error will be returned SubmitTransaction(tx interface{}, timeout time.Duration) (*types.TxReceiptResponseEnvelope, error) - // BootstrapDB given bootstrap configuration initialize database by - // creating required system tables to include database meta data - BootstrapDB(conf *config.Configurations) (*types.TxReceiptResponseEnvelope, error) - // IsDBExists returns true if database with given name is exists otherwise false IsDBExists(name string) bool @@ -138,7 +133,8 @@ type db struct { } // NewDB creates a new database bcdb which handles both the queries and transactions. -func NewDB(localConf *config.LocalConfiguration, logger *logger.SugarLogger) (DB, error) { +func NewDB(conf *config.Configurations, logger *logger.SugarLogger) (DB, error) { + localConf := conf.LocalConfig if localConf.Server.Database.Name != "leveldb" { return nil, errors.New("only leveldb is supported as the state database") } @@ -223,17 +219,12 @@ func NewDB(localConf *config.LocalConfiguration, logger *logger.SugarLogger) (DB txProcessor, err := newTransactionProcessor( &txProcessorConfig{ - nodeID: localConf.Server.Identity.ID, - db: levelDB, - blockStore: blockStore, - provenanceStore: provenanceStore, - stateTrieStore: stateTrieStore, - txQueueLength: localConf.Server.QueueLength.Transaction, - txBatchQueueLength: localConf.Server.QueueLength.ReorderedTransactionBatch, - blockQueueLength: localConf.Server.QueueLength.Block, - maxTxCountPerBatch: localConf.BlockCreation.MaxTransactionCountPerBlock, - batchTimeout: localConf.BlockCreation.BlockTimeout, - logger: logger, + config: conf, + db: levelDB, + blockStore: blockStore, + provenanceStore: provenanceStore, + stateTrieStore: stateTrieStore, + logger: logger, }, ) if err != nil { @@ -255,20 +246,6 @@ func NewDB(localConf *config.LocalConfiguration, logger *logger.SugarLogger) (DB }, nil } -// BootstrapDB bootstraps DB with system tables -func (d *db) BootstrapDB(conf *config.Configurations) (*types.TxReceiptResponseEnvelope, error) { - configTx, err := prepareConfigTx(conf) - if err != nil { - return nil, errors.Wrap(err, "failed to prepare and commit a configuration transaction") - } - - resp, err := d.SubmitTransaction(configTx, 30*time.Second) - if err != nil { - return nil, errors.Wrap(err, "error while committing configuration transaction") - } - return resp, nil -} - // LedgerHeight returns ledger height func (d *db) LedgerHeight() (uint64, error) { return d.worldstateQueryProcessor.blockStore.Height() @@ -752,105 +729,6 @@ func (d *db) signature(response interface{}) ([]byte, error) { return d.signer.Sign(responseBytes) } -func prepareConfigTx(conf *config.Configurations) (*types.ConfigTxEnvelope, error) { - certs, err := readCerts(conf) - if err != nil { - return nil, err - } - - inNodes := false - var nodes []*types.NodeConfig - for _, node := range conf.SharedConfig.Nodes { - nc := &types.NodeConfig{ - Id: node.NodeID, - Address: node.Host, - Port: node.Port, - } - if cert, ok := certs.nodeCertificates[node.NodeID]; ok { - nc.Certificate = cert - } else { - return nil, errors.Errorf("Cannot find certificate for node: %s", node.NodeID) - } - nodes = append(nodes, nc) - - if node.NodeID == conf.LocalConfig.Server.Identity.ID { - inNodes = true - } - } - if !inNodes { - return nil, errors.Errorf("Cannot find local Server.Identity.ID [%s] in SharedConfig.Nodes: %v", conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Nodes) - } - - clusterConfig := &types.ClusterConfig{ - Nodes: nodes, - Admins: []*types.Admin{ - { - Id: conf.SharedConfig.Admin.ID, - Certificate: certs.adminCert, - }, - }, - CertAuthConfig: certs.caCerts, - ConsensusConfig: &types.ConsensusConfig{ - Algorithm: conf.SharedConfig.Consensus.Algorithm, - Members: make([]*types.PeerConfig, len(conf.SharedConfig.Consensus.Members)), - Observers: make([]*types.PeerConfig, len(conf.SharedConfig.Consensus.Observers)), - RaftConfig: &types.RaftConfig{ - TickInterval: conf.SharedConfig.Consensus.RaftConfig.TickInterval, - ElectionTicks: conf.SharedConfig.Consensus.RaftConfig.ElectionTicks, - HeartbeatTicks: conf.SharedConfig.Consensus.RaftConfig.HeartbeatTicks, - }, - }, - } - - inMembers := false - for i, m := range conf.SharedConfig.Consensus.Members { - clusterConfig.ConsensusConfig.Members[i] = &types.PeerConfig{ - NodeId: m.NodeId, - RaftId: m.RaftId, - PeerHost: m.PeerHost, - PeerPort: m.PeerPort, - } - if m.NodeId == conf.LocalConfig.Server.Identity.ID { - inMembers = true - } - } - - inObservers := false - for i, m := range conf.SharedConfig.Consensus.Observers { - clusterConfig.ConsensusConfig.Observers[i] = &types.PeerConfig{ - NodeId: m.NodeId, - RaftId: m.RaftId, - PeerHost: m.PeerHost, - PeerPort: m.PeerPort, - } - if m.NodeId == conf.LocalConfig.Server.Identity.ID { - inObservers = true - } - } - - if !inMembers && !inObservers { - return nil, errors.Errorf("Cannot find local Server.Identity.ID [%s] in SharedConfig.Consensus Members or Observers: %v", - conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Consensus) - } - if inObservers && inMembers { - return nil, errors.Errorf("local Server.Identity.ID [%s] cannot be in SharedConfig.Consensus both Members and Observers: %v", - conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Consensus) - } - // TODO add support for observers, see issue: https://github.ibm.com/blockchaindb/server/issues/403 - if inObservers { - return nil, errors.Errorf("not supported yet: local Server.Identity.ID [%s] is in SharedConfig.Consensus.Observers: %v", - conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Consensus) - } - - return &types.ConfigTxEnvelope{ - Payload: &types.ConfigTx{ - TxId: uuid.New().String(), // TODO: we need to change TxID to string - NewConfig: clusterConfig, - }, - // TODO: we can make the node itself sign the transaction - }, nil -} - type certsInGenesisConfig struct { nodeCertificates map[string][]byte adminCert []byte diff --git a/internal/bcdb/mocks/db.go b/internal/bcdb/mocks/db.go index 704a39fd..5c6558bf 100644 --- a/internal/bcdb/mocks/db.go +++ b/internal/bcdb/mocks/db.go @@ -2,38 +2,19 @@ package mocks -import config "github.com/IBM-Blockchain/bcdb-server/config" -import mock "github.com/stretchr/testify/mock" -import time "time" -import types "github.com/IBM-Blockchain/bcdb-server/pkg/types" -import x509 "crypto/x509" +import ( + time "time" -// DB is an autogenerated mock type for the DB type -type DB struct { - mock.Mock -} + mock "github.com/stretchr/testify/mock" -// BootstrapDB provides a mock function with given fields: conf -func (_m *DB) BootstrapDB(conf *config.Configurations) (*types.TxReceiptResponseEnvelope, error) { - ret := _m.Called(conf) + types "github.com/IBM-Blockchain/bcdb-server/pkg/types" - var r0 *types.TxReceiptResponseEnvelope - if rf, ok := ret.Get(0).(func(*config.Configurations) *types.TxReceiptResponseEnvelope); ok { - r0 = rf(conf) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*types.TxReceiptResponseEnvelope) - } - } + x509 "crypto/x509" +) - var r1 error - if rf, ok := ret.Get(1).(func(*config.Configurations) error); ok { - r1 = rf(conf) - } else { - r1 = ret.Error(1) - } - - return r0, r1 +// DB is an autogenerated mock type for the DB type +type DB struct { + mock.Mock } // Close provides a mock function with given fields: diff --git a/internal/bcdb/transaction_processor.go b/internal/bcdb/transaction_processor.go index 7fb68050..ac555246 100644 --- a/internal/bcdb/transaction_processor.go +++ b/internal/bcdb/transaction_processor.go @@ -8,9 +8,11 @@ import ( "sync" "time" + "github.com/IBM-Blockchain/bcdb-server/config" "github.com/IBM-Blockchain/bcdb-server/internal/blockcreator" "github.com/IBM-Blockchain/bcdb-server/internal/blockprocessor" "github.com/IBM-Blockchain/bcdb-server/internal/blockstore" + "github.com/IBM-Blockchain/bcdb-server/internal/comm" internalerror "github.com/IBM-Blockchain/bcdb-server/internal/errors" "github.com/IBM-Blockchain/bcdb-server/internal/mptrie" "github.com/IBM-Blockchain/bcdb-server/internal/provenance" @@ -20,6 +22,7 @@ import ( "github.com/IBM-Blockchain/bcdb-server/internal/worldstate" "github.com/IBM-Blockchain/bcdb-server/pkg/logger" "github.com/IBM-Blockchain/bcdb-server/pkg/types" + "github.com/google/uuid" "github.com/pkg/errors" ) @@ -35,6 +38,7 @@ type transactionProcessor struct { txReorderer *txreorderer.TxReorderer blockCreator *blockcreator.BlockCreator blockReplicator *replication.BlockReplicator + peerTransport *comm.HTTPTransport blockProcessor *blockprocessor.BlockProcessor blockStore *blockstore.Store pendingTxs *pendingTxs @@ -43,70 +47,109 @@ type transactionProcessor struct { } type txProcessorConfig struct { - nodeID string - db worldstate.DB - blockStore *blockstore.Store - provenanceStore *provenance.Store - stateTrieStore mptrie.Store - txQueueLength uint32 - txBatchQueueLength uint32 - blockQueueLength uint32 - maxTxCountPerBatch uint32 - batchTimeout time.Duration - logger *logger.SugarLogger + config *config.Configurations + db worldstate.DB + blockStore *blockstore.Store + provenanceStore *provenance.Store + stateTrieStore mptrie.Store + logger *logger.SugarLogger } func newTransactionProcessor(conf *txProcessorConfig) (*transactionProcessor, error) { p := &transactionProcessor{} - p.nodeID = conf.nodeID + localConfig := conf.config.LocalConfig + + p.nodeID = localConfig.Server.Identity.ID p.logger = conf.logger - p.txQueue = queue.New(conf.txQueueLength) - p.txBatchQueue = queue.New(conf.txBatchQueueLength) + p.txQueue = queue.New(localConfig.Server.QueueLength.Transaction) + p.txBatchQueue = queue.New(localConfig.Server.QueueLength.ReorderedTransactionBatch) p.blockOneQueueBarrier = queue.NewOneQueueBarrier(conf.logger) p.txReorderer = txreorderer.New( &txreorderer.Config{ TxQueue: p.txQueue, TxBatchQueue: p.txBatchQueue, - MaxTxCountPerBatch: conf.maxTxCountPerBatch, - BatchTimeout: conf.batchTimeout, + MaxTxCountPerBatch: localConfig.BlockCreation.MaxTransactionCountPerBlock, + BatchTimeout: localConfig.BlockCreation.BlockTimeout, Logger: conf.logger, }, ) var err error - if p.blockCreator, err = blockcreator.New( + + p.blockProcessor = blockprocessor.New( + &blockprocessor.Config{ + BlockOneQueueBarrier: p.blockOneQueueBarrier, + BlockStore: conf.blockStore, + ProvenanceStore: conf.provenanceStore, + StateTrieStore: conf.stateTrieStore, + DB: conf.db, + Logger: conf.logger, + }, + ) + + ledgerHeight, err := conf.blockStore.Height() + if err != nil { + return nil, err + } + if ledgerHeight == 0 { + p.logger.Info("Bootstrapping the ledger and database") + tx, err := PrepareBootstrapConfigTx(conf.config) + if err != nil { + return nil, err + } + bootBlock, err := blockcreator.BootstrapBlock(tx) + if err != nil { + return nil, err + } + if err = p.blockProcessor.Bootstrap(bootBlock); err != nil { + return nil, err + } + } + + p.blockCreator, err = blockcreator.New( &blockcreator.Config{ TxBatchQueue: p.txBatchQueue, Logger: conf.logger, BlockStore: conf.blockStore, }, - ); err != nil { + ) + if err != nil { + return nil, err + } + + p.peerTransport = comm.NewHTTPTransport(&comm.Config{ + LocalConf: localConfig, + Logger: conf.logger, + }) + + clusterConfig, _, err := conf.db.GetConfig() + if err != nil { + return nil, err + } + conf.logger.Debugf("cluster config: %+v", clusterConfig) + if err = p.peerTransport.UpdateClusterConfig(clusterConfig); err != nil { return nil, err } p.blockReplicator = replication.NewBlockReplicator( &replication.Config{ + LocalConf: localConfig, + ClusterConfig: clusterConfig, + Transport: p.peerTransport, BlockOneQueueBarrier: p.blockOneQueueBarrier, Logger: conf.logger, }, ) - + if err = p.peerTransport.SetConsensusListener(p.blockReplicator); err != nil { + return nil, err + } p.blockCreator.RegisterReplicator(p.blockReplicator) - p.blockProcessor = blockprocessor.New( - &blockprocessor.Config{ - BlockOneQueueBarrier: p.blockOneQueueBarrier, - BlockStore: conf.blockStore, - ProvenanceStore: conf.provenanceStore, - StateTrieStore: conf.stateTrieStore, - DB: conf.db, - Logger: conf.logger, - }, - ) - - _ = p.blockProcessor.RegisterBlockCommitListener(commitListenerName, p) + if err = p.blockProcessor.RegisterBlockCommitListener(commitListenerName, p); err != nil { + return nil, err + } go p.txReorderer.Start() p.txReorderer.WaitTillStart() @@ -114,7 +157,9 @@ func newTransactionProcessor(conf *txProcessorConfig) (*transactionProcessor, er go p.blockCreator.Start() p.blockCreator.WaitTillStart() - p.blockReplicator.Start() + p.peerTransport.Start() // Starts internal goroutine + + p.blockReplicator.Start() // Starts internal goroutine go p.blockProcessor.Start() p.blockProcessor.WaitTillStart() @@ -335,3 +380,102 @@ func (s *promise) close() { close(s.receipt) s = nil } + +func PrepareBootstrapConfigTx(conf *config.Configurations) (*types.ConfigTxEnvelope, error) { + certs, err := readCerts(conf) + if err != nil { + return nil, err + } + + inNodes := false + var nodes []*types.NodeConfig + for _, node := range conf.SharedConfig.Nodes { + nc := &types.NodeConfig{ + Id: node.NodeID, + Address: node.Host, + Port: node.Port, + } + if cert, ok := certs.nodeCertificates[node.NodeID]; ok { + nc.Certificate = cert + } else { + return nil, errors.Errorf("Cannot find certificate for node: %s", node.NodeID) + } + nodes = append(nodes, nc) + + if node.NodeID == conf.LocalConfig.Server.Identity.ID { + inNodes = true + } + } + if !inNodes { + return nil, errors.Errorf("Cannot find local Server.Identity.ID [%s] in SharedConfig.Nodes: %v", conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Nodes) + } + + clusterConfig := &types.ClusterConfig{ + Nodes: nodes, + Admins: []*types.Admin{ + { + Id: conf.SharedConfig.Admin.ID, + Certificate: certs.adminCert, + }, + }, + CertAuthConfig: certs.caCerts, + ConsensusConfig: &types.ConsensusConfig{ + Algorithm: conf.SharedConfig.Consensus.Algorithm, + Members: make([]*types.PeerConfig, len(conf.SharedConfig.Consensus.Members)), + Observers: make([]*types.PeerConfig, len(conf.SharedConfig.Consensus.Observers)), + RaftConfig: &types.RaftConfig{ + TickInterval: conf.SharedConfig.Consensus.RaftConfig.TickInterval, + ElectionTicks: conf.SharedConfig.Consensus.RaftConfig.ElectionTicks, + HeartbeatTicks: conf.SharedConfig.Consensus.RaftConfig.HeartbeatTicks, + }, + }, + } + + inMembers := false + for i, m := range conf.SharedConfig.Consensus.Members { + clusterConfig.ConsensusConfig.Members[i] = &types.PeerConfig{ + NodeId: m.NodeId, + RaftId: m.RaftId, + PeerHost: m.PeerHost, + PeerPort: m.PeerPort, + } + if m.NodeId == conf.LocalConfig.Server.Identity.ID { + inMembers = true + } + } + + inObservers := false + for i, m := range conf.SharedConfig.Consensus.Observers { + clusterConfig.ConsensusConfig.Observers[i] = &types.PeerConfig{ + NodeId: m.NodeId, + RaftId: m.RaftId, + PeerHost: m.PeerHost, + PeerPort: m.PeerPort, + } + if m.NodeId == conf.LocalConfig.Server.Identity.ID { + inObservers = true + } + } + + if !inMembers && !inObservers { + return nil, errors.Errorf("Cannot find local Server.Identity.ID [%s] in SharedConfig.Consensus Members or Observers: %v", + conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Consensus) + } + if inObservers && inMembers { + return nil, errors.Errorf("local Server.Identity.ID [%s] cannot be in SharedConfig.Consensus both Members and Observers: %v", + conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Consensus) + } + // TODO add support for observers, see issue: https://github.ibm.com/blockchaindb/server/issues/403 + if inObservers { + return nil, errors.Errorf("not supported yet: local Server.Identity.ID [%s] is in SharedConfig.Consensus.Observers: %v", + conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Consensus) + } + + return &types.ConfigTxEnvelope{ + Payload: &types.ConfigTx{ + TxId: uuid.New().String(), + NewConfig: clusterConfig, + }, + // TODO: we can make the node itself sign the transaction + }, nil +} diff --git a/internal/bcdb/transaction_processor_test.go b/internal/bcdb/transaction_processor_test.go index 00b65a9d..cf0542b4 100644 --- a/internal/bcdb/transaction_processor_test.go +++ b/internal/bcdb/transaction_processor_test.go @@ -45,7 +45,7 @@ type txProcessorTestEnv struct { cleanup func() } -func newTxProcessorTestEnv(t *testing.T, cryptoDir string) *txProcessorTestEnv { +func newTxProcessorTestEnv(t *testing.T, cryptoDir string, conf *config.Configurations) *txProcessorTestEnv { dir, err := ioutil.TempDir("/tmp", "transactionProcessor") require.NoError(t, err) @@ -117,20 +117,13 @@ func newTxProcessorTestEnv(t *testing.T, cryptoDir string) *txProcessorTestEnv { userCert, userSigner := testutils.LoadTestClientCrypto(t, cryptoDir, "testUser") - nodeID := "bdb-node-1" - txProcConf := &txProcessorConfig{ - nodeID: nodeID, - db: db, - blockStore: blockStore, - provenanceStore: provenanceStore, - stateTrieStore: stateTrieStore, - txQueueLength: 100, - txBatchQueueLength: 100, - blockQueueLength: 100, - maxTxCountPerBatch: 1, - batchTimeout: 50 * time.Millisecond, - logger: logger, + config: conf, + db: db, + blockStore: blockStore, + provenanceStore: provenanceStore, + stateTrieStore: stateTrieStore, + logger: logger, } txProcessor, err := newTransactionProcessor(txProcConf) require.NoError(t, err) @@ -175,43 +168,10 @@ func newTxProcessorTestEnv(t *testing.T, cryptoDir string) *txProcessorTestEnv { } } -func setupTxProcessor(t *testing.T, env *txProcessorTestEnv, conf *config.Configurations, dbName string) { - configTx, err := prepareConfigTx(conf) - require.NoError(t, err) - - txHash, err := calculateTxHash(configTx, &types.ValidationInfo{Flag: types.Flag_VALID}) - require.NoError(t, err) - txMerkelRootHash, err := crypto.ConcatenateHashes(txHash, nil) - require.NoError(t, err) - - stateTrie, err := mptrie.NewTrie(nil, env.stateTrieStore) - require.NoError(t, err) - stateTrieRoot := applyTxsOnTrie(t, env, configTx, stateTrie) - - expectedRespPayload := &types.TxReceiptResponse{ - Receipt: &types.TxReceipt{ - Header: &types.BlockHeader{ - BaseHeader: &types.BlockHeaderBase{ - Number: 1, - LastCommittedBlockNum: 0, - }, - TxMerkelTreeRootHash: txMerkelRootHash, - StateMerkelTreeRootHash: stateTrieRoot, - ValidationInfo: []*types.ValidationInfo{ - { - Flag: types.Flag_VALID, - }, - }, - }, - TxIndex: 0, - }, - } - - resp, err := env.txProcessor.submitTransaction(configTx, 5*time.Second) +func setupTxProcessor(t *testing.T, env *txProcessorTestEnv, dbName string) { + h, err := env.txProcessor.blockStore.Height() require.NoError(t, err) - - require.True(t, proto.Equal(expectedRespPayload, resp)) - require.True(t, env.txProcessor.pendingTxs.isEmpty()) + require.Equal(t, uint64(1), h) user := &types.User{ Id: env.userID, @@ -258,10 +218,10 @@ func TestTransactionProcessor(t *testing.T) { t.Run("commit a data transaction asynchronously", func(t *testing.T) { t.Parallel() - env := newTxProcessorTestEnv(t, cryptoDir) + env := newTxProcessorTestEnv(t, cryptoDir, conf) defer env.cleanup() - setupTxProcessor(t, env, conf, worldstate.DefaultDBName) + setupTxProcessor(t, env, worldstate.DefaultDBName) tx := testutils.SignedDataTxEnvelope(t, []crypto.Signer{env.userSigner}, &types.DataTx{ MustSignUserIds: []string{"testUser"}, @@ -365,10 +325,10 @@ func TestTransactionProcessor(t *testing.T) { t.Run("commit a data transaction synchronously", func(t *testing.T) { t.Parallel() - env := newTxProcessorTestEnv(t, cryptoDir) + env := newTxProcessorTestEnv(t, cryptoDir, conf) defer env.cleanup() - setupTxProcessor(t, env, conf, worldstate.DefaultDBName) + setupTxProcessor(t, env, worldstate.DefaultDBName) tx := testutils.SignedDataTxEnvelope(t, []crypto.Signer{env.userSigner}, &types.DataTx{ MustSignUserIds: []string{"testUser"}, @@ -467,10 +427,10 @@ func TestTransactionProcessor(t *testing.T) { t.Run("duplicate txID with the already committed transaction", func(t *testing.T) { t.Parallel() - env := newTxProcessorTestEnv(t, cryptoDir) + env := newTxProcessorTestEnv(t, cryptoDir, conf) defer env.cleanup() - setupTxProcessor(t, env, conf, worldstate.DefaultDBName) + setupTxProcessor(t, env, worldstate.DefaultDBName) dataTx := testutils.SignedDataTxEnvelope(t, []crypto.Signer{env.userSigner}, &types.DataTx{ MustSignUserIds: []string{"testUser"}, @@ -501,10 +461,10 @@ func TestTransactionProcessor(t *testing.T) { t.Run("duplicate txID with either pending or already committed transaction", func(t *testing.T) { t.Parallel() - env := newTxProcessorTestEnv(t, cryptoDir) + env := newTxProcessorTestEnv(t, cryptoDir, conf) defer env.cleanup() - setupTxProcessor(t, env, conf, worldstate.DefaultDBName) + setupTxProcessor(t, env, worldstate.DefaultDBName) dbTx := testutils.SignedDBAdministrationTxEnvelope(t, env.userSigner, &types.DBAdministrationTx{ UserId: "testUser", @@ -539,10 +499,10 @@ func TestTransactionProcessor(t *testing.T) { t.Run("duplicate txID with either pending or already committed transaction", func(t *testing.T) { t.Parallel() - env := newTxProcessorTestEnv(t, cryptoDir) + env := newTxProcessorTestEnv(t, cryptoDir, conf) defer env.cleanup() - setupTxProcessor(t, env, conf, worldstate.DefaultDBName) + setupTxProcessor(t, env, worldstate.DefaultDBName) resp, err := env.txProcessor.submitTransaction([]byte("hello"), 0) require.EqualError(t, err, "unexpected transaction type") diff --git a/internal/blockcreator/blockcreator.go b/internal/blockcreator/blockcreator.go index 12cd2e75..586833c7 100644 --- a/internal/blockcreator/blockcreator.go +++ b/internal/blockcreator/blockcreator.go @@ -67,6 +67,21 @@ func (b *BlockCreator) RegisterReplicator(blockReplicator Replicator) { b.blockReplicator = blockReplicator } +func BootstrapBlock(tx *types.ConfigTxEnvelope) (*types.Block, error) { + block := &types.Block{ + Header: &types.BlockHeader{ + BaseHeader: &types.BlockHeaderBase{ + Number: 1, + }, + }, + Payload: &types.Block_ConfigTxEnvelope{ + ConfigTxEnvelope: tx, + }, + } + + return block, nil +} + // Start runs the block assembler in an infinite loop func (b *BlockCreator) Start() { defer close(b.stopped) diff --git a/internal/blockprocessor/processor.go b/internal/blockprocessor/processor.go index a1d11a2a..9b36e69f 100644 --- a/internal/blockprocessor/processor.go +++ b/internal/blockprocessor/processor.go @@ -56,6 +56,16 @@ func New(conf *Config) *BlockProcessor { } } +// Bootstrap initializes the ledger and database with the first block, which contains a config transaction. +// This block is a.k.a. the "genesis block". +func (b *BlockProcessor) Bootstrap(configBlock *types.Block) error { + if err := b.initAndRecoverStateTrieIfNeeded(); err != nil { + return errors.WithMessage(err, "error while recovering node state trie") + } + + return b.validateAndCommit(configBlock) +} + // Start starts the validator and committer func (b *BlockProcessor) Start() { b.logger.Debug("starting the block processor") @@ -88,36 +98,19 @@ func (b *BlockProcessor) Start() { } block := blockData.(*types.Block) - b.logger.Debugf("validating and committing block %d", block.GetHeader().GetBaseHeader().GetNumber()) - validationInfo, err := b.validator.validateBlock(block) - if err != nil { - panic(err) - } - - block.Header.ValidationInfo = validationInfo - - if err := b.blockStore.AddSkipListLinks(block); err != nil { - panic(err) - } - - root, err := mtree.BuildTreeForBlockTx(block) - if err != nil { + if err = b.validateAndCommit(block); err != nil { panic(err) } - block.Header.TxMerkelTreeRootHash = root.Hash() - - if err = b.committer.commitBlock(block); err != nil { - panic(err) - } - - b.logger.Debugf("validated and committed block %d\n", block.GetHeader().GetBaseHeader().GetNumber()) - //TODO detect config changes that affect the replication component and return an appropriate non-nil object + //TODO Detect config changes that affect the replication component and return an appropriate non-nil object // to instruct it to reconfigure itself. See issues: // https://github.ibm.com/blockchaindb/server/issues/396 // https://github.ibm.com/blockchaindb/server/issues/413 var reConfig interface{} - + switch block.Payload.(type) { + case *types.Block_ConfigTxEnvelope: + reConfig = block.GetConfigTxEnvelope().GetPayload().GetNewConfig() + } // The replication layer go-routine is blocked until after commit, and is released by calling Reply(). // The post-commit listeners are called after the replication layer go-routine is released. This is an // optimization, as post-commit processing can proceed in parallel with the replication go-routine handling @@ -136,6 +129,36 @@ func (b *BlockProcessor) Start() { } } +func (b *BlockProcessor) validateAndCommit(block *types.Block) error { + b.logger.Debugf("validating and committing block %d", block.GetHeader().GetBaseHeader().GetNumber()) + validationInfo, err := b.validator.validateBlock(block) + if err != nil { + if block.GetHeader().GetBaseHeader().GetNumber() > 1 { + panic(err) + } + return err + } + + block.Header.ValidationInfo = validationInfo + + if err = b.blockStore.AddSkipListLinks(block); err != nil { + panic(err) + } + + root, err := mtree.BuildTreeForBlockTx(block) + if err != nil { + panic(err) + } + block.Header.TxMerkelTreeRootHash = root.Hash() + + if err = b.committer.commitBlock(block); err != nil { + panic(err) + } + + b.logger.Debugf("validated and committed block %d\n", block.GetHeader().GetBaseHeader().GetNumber()) + return err +} + // WaitTillStart waits till the block processor is started func (b *BlockProcessor) WaitTillStart() { <-b.started diff --git a/internal/blockprocessor/processor_test.go b/internal/blockprocessor/processor_test.go index 0bfa1a4f..2e0c110d 100644 --- a/internal/blockprocessor/processor_test.go +++ b/internal/blockprocessor/processor_test.go @@ -41,6 +41,7 @@ type testEnv struct { userID string userCert *x509.Certificate userSigner crypto.Signer + genesisConfig *types.ClusterConfig genesisBlock *types.Block cleanup func(bool) } @@ -130,6 +131,46 @@ func newTestEnv(t *testing.T) *testEnv { Logger: logger, }) + genesisConfig := &types.ClusterConfig{ + Nodes: []*types.NodeConfig{ + { + Id: "node1", + Address: "127.0.0.1", + Port: 6090, + Certificate: nodeCert.Raw, + }, + }, + Admins: []*types.Admin{ + { + Id: "admin1", + Certificate: adminCert.Raw, + }, + }, + CertAuthConfig: &types.CAConfig{ + Roots: [][]byte{caCert.Raw}, + }, + ConsensusConfig: &types.ConsensusConfig{ + Algorithm: "raft", + Members: []*types.PeerConfig{ + { + NodeId: "node1", + RaftId: 1, + PeerHost: "127.0.0.1", + PeerPort: 7090, + XXX_unrecognized: nil, + XXX_sizecache: 0, + }, + }, + RaftConfig: &types.RaftConfig{ + TickInterval: "100ms", + ElectionTicks: 100, + HeartbeatTicks: 10, + }, + XXX_NoUnkeyedLiteral: struct{}{}, + XXX_unrecognized: nil, + XXX_sizecache: 0, + }, + } genesisBlock := &types.Block{ Header: &types.BlockHeader{ BaseHeader: &types.BlockHeaderBase{ @@ -139,46 +180,7 @@ func newTestEnv(t *testing.T) *testEnv { Payload: &types.Block_ConfigTxEnvelope{ ConfigTxEnvelope: &types.ConfigTxEnvelope{ Payload: &types.ConfigTx{ - NewConfig: &types.ClusterConfig{ - Nodes: []*types.NodeConfig{ - { - Id: "node1", - Address: "127.0.0.1", - Port: 6090, - Certificate: nodeCert.Raw, - }, - }, - Admins: []*types.Admin{ - { - Id: "admin1", - Certificate: adminCert.Raw, - }, - }, - CertAuthConfig: &types.CAConfig{ - Roots: [][]byte{caCert.Raw}, - }, - ConsensusConfig: &types.ConsensusConfig{ - Algorithm: "raft", - Members: []*types.PeerConfig{ - { - NodeId: "node1", - RaftId: 1, - PeerHost: "127.0.0.1", - PeerPort: 7090, - XXX_unrecognized: nil, - XXX_sizecache: 0, - }, - }, - RaftConfig: &types.RaftConfig{ - TickInterval: "100ms", - ElectionTicks: 100, - HeartbeatTicks: 10, - }, - XXX_NoUnkeyedLiteral: struct{}{}, - XXX_unrecognized: nil, - XXX_sizecache: 0, - }, - }, + NewConfig: genesisConfig, }, }, }, @@ -215,6 +217,7 @@ func newTestEnv(t *testing.T) *testEnv { userID: "testUser", userCert: userCert, userSigner: userSigner, + genesisConfig: genesisConfig, genesisBlock: genesisBlock, cleanup: cleanup, } @@ -228,7 +231,8 @@ func newTestEnv(t *testing.T) *testEnv { func setup(t *testing.T, env *testEnv) { reply, err := env.blockProcessor.blockOneQueueBarrier.EnqueueWait(env.genesisBlock) require.NoError(t, err) - require.Nil(t, reply) // May not be nil when we implement dynamic config + require.NotNil(t, reply) + require.Equal(t, env.genesisConfig, reply) assertConfigHasCommitted := func() bool { exist, err := env.blockProcessor.validator.configTxValidator.identityQuerier.DoesUserExist("admin1") diff --git a/internal/comm/httptransport.go b/internal/comm/httptransport.go index 4b83bed6..7c1a339f 100644 --- a/internal/comm/httptransport.go +++ b/internal/comm/httptransport.go @@ -85,7 +85,22 @@ func (p *HTTPTransport) UpdateClusterConfig(clusterConfig *types.ClusterConfig) return errors.New("dynamic re-config of http transport is not supported yet") } + //TODO export to a config util + var foundRaftID bool + for _, member := range clusterConfig.ConsensusConfig.Members { + if member.NodeId == p.localConf.Server.Identity.ID { + p.raftID = member.RaftId + foundRaftID = true + break + } + } + if !foundRaftID { + return errors.Errorf("local NodeID '%s' is not in Consensus members: %v", + p.localConf.Server.Identity.ID, clusterConfig.ConsensusConfig.Members) + } + p.clusterConfig = clusterConfig + return nil } @@ -101,20 +116,6 @@ func (p *HTTPTransport) Start() error { p.logger.Panic("Must update ClusterConfig before Start()") } - //TODO export to a config util - var foundRaftID bool - for _, member := range p.clusterConfig.ConsensusConfig.Members { - if member.NodeId == p.localConf.Server.Identity.ID { - p.raftID = member.RaftId - foundRaftID = true - break - } - } - if !foundRaftID { - return errors.Errorf("local NodeID '%s' is not in Consensus members: %v", - p.localConf.Server.Identity.ID, p.clusterConfig.ConsensusConfig.Members) - } - netConf := p.localConf.Replication.Network addr := fmt.Sprintf("%s:%d", netConf.Address, netConf.Port) netListener, err := net.Listen("tcp", addr) diff --git a/internal/replication/blockreplicator.go b/internal/replication/blockreplicator.go index 101def70..2c4f2701 100644 --- a/internal/replication/blockreplicator.go +++ b/internal/replication/blockreplicator.go @@ -4,27 +4,42 @@ package replication import ( + "context" + "errors" "sync" + "github.com/IBM-Blockchain/bcdb-server/config" + "github.com/IBM-Blockchain/bcdb-server/internal/comm" ierrors "github.com/IBM-Blockchain/bcdb-server/internal/errors" "github.com/IBM-Blockchain/bcdb-server/internal/queue" "github.com/IBM-Blockchain/bcdb-server/pkg/logger" + "github.com/IBM-Blockchain/bcdb-server/pkg/types" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/raftpb" ) type BlockReplicator struct { + localConf *config.LocalConfiguration raftCh chan interface{} // TODO a placeholder for the Raft pipeline oneQueueBarrier *queue.OneQueueBarrier // Synchronizes the block-replication deliver with the block-processor commit + transport *comm.HTTPTransport stopCh chan struct{} stopOnce sync.Once doneCh chan struct{} + mutex sync.Mutex + clusterConfig *types.ClusterConfig + logger *logger.SugarLogger } // Config holds the configuration information required to initialize the block replicator. type Config struct { //TODO just a skeleton + LocalConf *config.LocalConfiguration + ClusterConfig *types.ClusterConfig + Transport *comm.HTTPTransport BlockOneQueueBarrier *queue.OneQueueBarrier Logger *logger.SugarLogger } @@ -34,10 +49,13 @@ func NewBlockReplicator(conf *Config) *BlockReplicator { //TODO just a skeleton br := &BlockReplicator{ + localConf: conf.LocalConf, raftCh: make(chan interface{}, 10), // TODO a placeholder for the Raft pipeline oneQueueBarrier: conf.BlockOneQueueBarrier, stopCh: make(chan struct{}), doneCh: make(chan struct{}), + clusterConfig: conf.ClusterConfig, + transport: conf.Transport, logger: conf.Logger, } @@ -70,14 +88,14 @@ func (br *BlockReplicator) Start() { func (br *BlockReplicator) run(readyCh chan<- struct{}) { defer close(br.doneCh) - br.logger.Info("starting the block replicator") + br.logger.Info("Starting the block replicator") close(readyCh) Replicator_Loop: for { select { case <-br.stopCh: - br.logger.Info("stopping block replicator") + br.logger.Info("Stopping block replicator") return case blockToCommit := <-br.raftCh: @@ -89,14 +107,18 @@ Replicator_Loop: break Replicator_Loop } - // A non-nil reply is an indication that the last block was a valid config that applies to the replication - // component. + // A non-nil reply is an indication that the last block was a valid config that applies + // to the replication component. if reConfig != nil { - br.logger.Infof("New config committed, going to apply to block replicator: %v", reConfig) - // TODO + clusterConfig := reConfig.(*types.ClusterConfig) + if err = br.updateClusterConfig(clusterConfig); err != nil { + br.logger.Panicf("Failed to update ClusterConfig: %s", err) + } } } } + + br.logger.Info("Exiting the block replicator") } // Close signals the internal go-routine to stop and waits for it to exit. @@ -115,3 +137,31 @@ func (br *BlockReplicator) Close() (err error) { return err } + +func (br *BlockReplicator) updateClusterConfig(clusterConfig *types.ClusterConfig) error { + br.logger.Infof("New cluster config committed, going to apply to block replicator: %+v", clusterConfig) + + //TODO dynamic re-config, update transport config, etc + + return errors.New("dynamic re-config of ClusterConfig not supported yet") +} + +func (br *BlockReplicator) Process(ctx context.Context, m raftpb.Message) error { + // see: rafthttp.RAFT + //TODO + return nil +} + +func (br *BlockReplicator) IsIDRemoved(id uint64) bool { + // see: rafthttp.RAFT + //TODO + return false +} +func (br *BlockReplicator) ReportUnreachable(id uint64) { + // see: rafthttp.RAFT + //TODO +} +func (br *BlockReplicator) ReportSnapshot(id uint64, status raft.SnapshotStatus) { + // see: rafthttp.RAFT + //TODO +} diff --git a/internal/replication/blockreplicator_test.go b/internal/replication/blockreplicator_test.go index 0ff3639a..87da547f 100644 --- a/internal/replication/blockreplicator_test.go +++ b/internal/replication/blockreplicator_test.go @@ -4,6 +4,7 @@ package replication_test import ( + "github.com/IBM-Blockchain/bcdb-server/pkg/types" "sync" "testing" @@ -14,6 +15,28 @@ import ( "github.com/stretchr/testify/require" ) +var clusterConfig1node = &types.ClusterConfig{ + Nodes: []*types.NodeConfig{{ + Id: "node1", + Address: "127.0.0.1", + Port: 9090, + }}, + ConsensusConfig: &types.ConsensusConfig{ + Algorithm: "raft", + Members: []*types.PeerConfig{{ + NodeId: "node1", + RaftId: 1, + PeerHost: "127.0.0.1", + PeerPort: 9091, + }}, + RaftConfig: &types.RaftConfig{ + TickInterval: "100ms", + ElectionTicks: 100, + HeartbeatTicks: 10, + }, + }, +} + func TestBlockReplicator_StartClose(t *testing.T) { lg := testLogger(t, "debug") qBarrier := queue.NewOneQueueBarrier(lg) diff --git a/pkg/server/server.go b/pkg/server/server.go index 2b6ba2b9..4c71dfb8 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -12,7 +12,6 @@ import ( "github.com/IBM-Blockchain/bcdb-server/internal/httphandler" "github.com/IBM-Blockchain/bcdb-server/pkg/constants" "github.com/IBM-Blockchain/bcdb-server/pkg/logger" - "github.com/IBM-Blockchain/bcdb-server/pkg/types" "github.com/pkg/errors" ) @@ -40,7 +39,7 @@ func New(conf *config.Configurations) (*BCDBHTTPServer, error) { return nil, err } - db, err := bcdb.NewDB(conf.LocalConfig, lg) + db, err := bcdb.NewDB(conf, lg) if err != nil { return nil, errors.Wrap(err, "error while creating the database object") } @@ -73,22 +72,12 @@ func New(conf *config.Configurations) (*BCDBHTTPServer, error) { // Start starts the server func (s *BCDBHTTPServer) Start() error { - blockHeight, err := s.db.LedgerHeight() - if err != nil { + if blockHeight, err := s.db.LedgerHeight(); err != nil { return err - } - if blockHeight == 0 { - s.logger.Infof("Bootstrapping DB for the first time") - resp, err := s.db.BootstrapDB(s.conf) - if err != nil { - return errors.Wrap(err, "error while preparing and committing config transaction") - } - - txReceipt := resp.GetResponse().GetReceipt() - valInfo := txReceipt.GetHeader().GetValidationInfo()[txReceipt.TxIndex] - if valInfo.Flag != types.Flag_VALID { - return errors.Errorf("config transaction was not committed due to invalidation [" + valInfo.ReasonIfInvalid + "]") - } + } else if blockHeight == 0 { + return errors.New("ledger height == 0, bootstrap failed") + } else { + s.logger.Infof("Server starting at ledger height [%d]", blockHeight) } go s.serveRequests(s.listen) diff --git a/test/setup/server_setup.go b/test/setup/server_setup.go index 64c87091..884e0b57 100644 --- a/test/setup/server_setup.go +++ b/test/setup/server_setup.go @@ -189,7 +189,7 @@ func (s *Server) createConfigFile() error { " walDir: " + filepath.Join(s.configDir, "etcdraft", "wal") + "\n" + //TODO create path " snapDir: " + filepath.Join(s.configDir, "etcdraft", "snap") + "\n" + //TODO create path " network:\n" + - " address: 127.0.0.1\n" + + " address: " + s.address + "\n" + " port: " + strconv.FormatInt(int64(s.peerPort), 10) + "\n" + " tls:\n" + //TODO add rest of fields when security is supported " enabled: false\n" +