From 575572c77cd00c4298e2fc49aa71084317e832b0 Mon Sep 17 00:00:00 2001 From: Yoav Tock Date: Thu, 24 Jun 2021 16:54:53 +0300 Subject: [PATCH] Integrate transport into replicator (#11) Initialize and start comm.HTTPTransport and blockreplicator in transaction processor. Simplify the server bootstrap sequence, pull it to execute creating and starting the replicator and transport. No Raft yet, no dynamic config of ClusterConfig. Signed-off-by: Yoav Tock --- internal/bcdb/db.go | 138 +----------- internal/bcdb/mocks/db.go | 37 +--- internal/bcdb/transaction_processor.go | 208 ++++++++++++++++--- internal/bcdb/transaction_processor_test.go | 80 ++----- internal/blockcreator/blockcreator.go | 15 ++ internal/blockprocessor/processor.go | 69 ++++-- internal/blockprocessor/processor_test.go | 86 ++++---- internal/comm/httptransport.go | 29 +-- internal/replication/blockreplicator.go | 62 +++++- internal/replication/blockreplicator_test.go | 23 ++ pkg/server/server.go | 23 +- test/setup/server_setup.go | 2 +- 12 files changed, 420 insertions(+), 352 deletions(-) diff --git a/internal/bcdb/db.go b/internal/bcdb/db.go index f80243a1..f5343910 100644 --- a/internal/bcdb/db.go +++ b/internal/bcdb/db.go @@ -20,7 +20,6 @@ import ( "github.com/IBM-Blockchain/bcdb-server/pkg/crypto" "github.com/IBM-Blockchain/bcdb-server/pkg/logger" "github.com/IBM-Blockchain/bcdb-server/pkg/types" - "github.com/google/uuid" "github.com/pkg/errors" ) @@ -112,10 +111,6 @@ type DB interface { // timeout error will be returned SubmitTransaction(tx interface{}, timeout time.Duration) (*types.TxReceiptResponseEnvelope, error) - // BootstrapDB given bootstrap configuration initialize database by - // creating required system tables to include database meta data - BootstrapDB(conf *config.Configurations) (*types.TxReceiptResponseEnvelope, error) - // IsDBExists returns true if database with given name is exists otherwise false IsDBExists(name string) bool @@ -138,7 +133,8 @@ type db struct { } // NewDB creates a new database bcdb which handles both the queries and transactions. -func NewDB(localConf *config.LocalConfiguration, logger *logger.SugarLogger) (DB, error) { +func NewDB(conf *config.Configurations, logger *logger.SugarLogger) (DB, error) { + localConf := conf.LocalConfig if localConf.Server.Database.Name != "leveldb" { return nil, errors.New("only leveldb is supported as the state database") } @@ -223,17 +219,12 @@ func NewDB(localConf *config.LocalConfiguration, logger *logger.SugarLogger) (DB txProcessor, err := newTransactionProcessor( &txProcessorConfig{ - nodeID: localConf.Server.Identity.ID, - db: levelDB, - blockStore: blockStore, - provenanceStore: provenanceStore, - stateTrieStore: stateTrieStore, - txQueueLength: localConf.Server.QueueLength.Transaction, - txBatchQueueLength: localConf.Server.QueueLength.ReorderedTransactionBatch, - blockQueueLength: localConf.Server.QueueLength.Block, - maxTxCountPerBatch: localConf.BlockCreation.MaxTransactionCountPerBlock, - batchTimeout: localConf.BlockCreation.BlockTimeout, - logger: logger, + config: conf, + db: levelDB, + blockStore: blockStore, + provenanceStore: provenanceStore, + stateTrieStore: stateTrieStore, + logger: logger, }, ) if err != nil { @@ -255,20 +246,6 @@ func NewDB(localConf *config.LocalConfiguration, logger *logger.SugarLogger) (DB }, nil } -// BootstrapDB bootstraps DB with system tables -func (d *db) BootstrapDB(conf *config.Configurations) (*types.TxReceiptResponseEnvelope, error) { - configTx, err := prepareConfigTx(conf) - if err != nil { - return nil, errors.Wrap(err, "failed to prepare and commit a configuration transaction") - } - - resp, err := d.SubmitTransaction(configTx, 30*time.Second) - if err != nil { - return nil, errors.Wrap(err, "error while committing configuration transaction") - } - return resp, nil -} - // LedgerHeight returns ledger height func (d *db) LedgerHeight() (uint64, error) { return d.worldstateQueryProcessor.blockStore.Height() @@ -752,105 +729,6 @@ func (d *db) signature(response interface{}) ([]byte, error) { return d.signer.Sign(responseBytes) } -func prepareConfigTx(conf *config.Configurations) (*types.ConfigTxEnvelope, error) { - certs, err := readCerts(conf) - if err != nil { - return nil, err - } - - inNodes := false - var nodes []*types.NodeConfig - for _, node := range conf.SharedConfig.Nodes { - nc := &types.NodeConfig{ - Id: node.NodeID, - Address: node.Host, - Port: node.Port, - } - if cert, ok := certs.nodeCertificates[node.NodeID]; ok { - nc.Certificate = cert - } else { - return nil, errors.Errorf("Cannot find certificate for node: %s", node.NodeID) - } - nodes = append(nodes, nc) - - if node.NodeID == conf.LocalConfig.Server.Identity.ID { - inNodes = true - } - } - if !inNodes { - return nil, errors.Errorf("Cannot find local Server.Identity.ID [%s] in SharedConfig.Nodes: %v", conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Nodes) - } - - clusterConfig := &types.ClusterConfig{ - Nodes: nodes, - Admins: []*types.Admin{ - { - Id: conf.SharedConfig.Admin.ID, - Certificate: certs.adminCert, - }, - }, - CertAuthConfig: certs.caCerts, - ConsensusConfig: &types.ConsensusConfig{ - Algorithm: conf.SharedConfig.Consensus.Algorithm, - Members: make([]*types.PeerConfig, len(conf.SharedConfig.Consensus.Members)), - Observers: make([]*types.PeerConfig, len(conf.SharedConfig.Consensus.Observers)), - RaftConfig: &types.RaftConfig{ - TickInterval: conf.SharedConfig.Consensus.RaftConfig.TickInterval, - ElectionTicks: conf.SharedConfig.Consensus.RaftConfig.ElectionTicks, - HeartbeatTicks: conf.SharedConfig.Consensus.RaftConfig.HeartbeatTicks, - }, - }, - } - - inMembers := false - for i, m := range conf.SharedConfig.Consensus.Members { - clusterConfig.ConsensusConfig.Members[i] = &types.PeerConfig{ - NodeId: m.NodeId, - RaftId: m.RaftId, - PeerHost: m.PeerHost, - PeerPort: m.PeerPort, - } - if m.NodeId == conf.LocalConfig.Server.Identity.ID { - inMembers = true - } - } - - inObservers := false - for i, m := range conf.SharedConfig.Consensus.Observers { - clusterConfig.ConsensusConfig.Observers[i] = &types.PeerConfig{ - NodeId: m.NodeId, - RaftId: m.RaftId, - PeerHost: m.PeerHost, - PeerPort: m.PeerPort, - } - if m.NodeId == conf.LocalConfig.Server.Identity.ID { - inObservers = true - } - } - - if !inMembers && !inObservers { - return nil, errors.Errorf("Cannot find local Server.Identity.ID [%s] in SharedConfig.Consensus Members or Observers: %v", - conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Consensus) - } - if inObservers && inMembers { - return nil, errors.Errorf("local Server.Identity.ID [%s] cannot be in SharedConfig.Consensus both Members and Observers: %v", - conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Consensus) - } - // TODO add support for observers, see issue: https://github.ibm.com/blockchaindb/server/issues/403 - if inObservers { - return nil, errors.Errorf("not supported yet: local Server.Identity.ID [%s] is in SharedConfig.Consensus.Observers: %v", - conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Consensus) - } - - return &types.ConfigTxEnvelope{ - Payload: &types.ConfigTx{ - TxId: uuid.New().String(), // TODO: we need to change TxID to string - NewConfig: clusterConfig, - }, - // TODO: we can make the node itself sign the transaction - }, nil -} - type certsInGenesisConfig struct { nodeCertificates map[string][]byte adminCert []byte diff --git a/internal/bcdb/mocks/db.go b/internal/bcdb/mocks/db.go index 704a39fd..5c6558bf 100644 --- a/internal/bcdb/mocks/db.go +++ b/internal/bcdb/mocks/db.go @@ -2,38 +2,19 @@ package mocks -import config "github.com/IBM-Blockchain/bcdb-server/config" -import mock "github.com/stretchr/testify/mock" -import time "time" -import types "github.com/IBM-Blockchain/bcdb-server/pkg/types" -import x509 "crypto/x509" +import ( + time "time" -// DB is an autogenerated mock type for the DB type -type DB struct { - mock.Mock -} + mock "github.com/stretchr/testify/mock" -// BootstrapDB provides a mock function with given fields: conf -func (_m *DB) BootstrapDB(conf *config.Configurations) (*types.TxReceiptResponseEnvelope, error) { - ret := _m.Called(conf) + types "github.com/IBM-Blockchain/bcdb-server/pkg/types" - var r0 *types.TxReceiptResponseEnvelope - if rf, ok := ret.Get(0).(func(*config.Configurations) *types.TxReceiptResponseEnvelope); ok { - r0 = rf(conf) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*types.TxReceiptResponseEnvelope) - } - } + x509 "crypto/x509" +) - var r1 error - if rf, ok := ret.Get(1).(func(*config.Configurations) error); ok { - r1 = rf(conf) - } else { - r1 = ret.Error(1) - } - - return r0, r1 +// DB is an autogenerated mock type for the DB type +type DB struct { + mock.Mock } // Close provides a mock function with given fields: diff --git a/internal/bcdb/transaction_processor.go b/internal/bcdb/transaction_processor.go index 7fb68050..ac555246 100644 --- a/internal/bcdb/transaction_processor.go +++ b/internal/bcdb/transaction_processor.go @@ -8,9 +8,11 @@ import ( "sync" "time" + "github.com/IBM-Blockchain/bcdb-server/config" "github.com/IBM-Blockchain/bcdb-server/internal/blockcreator" "github.com/IBM-Blockchain/bcdb-server/internal/blockprocessor" "github.com/IBM-Blockchain/bcdb-server/internal/blockstore" + "github.com/IBM-Blockchain/bcdb-server/internal/comm" internalerror "github.com/IBM-Blockchain/bcdb-server/internal/errors" "github.com/IBM-Blockchain/bcdb-server/internal/mptrie" "github.com/IBM-Blockchain/bcdb-server/internal/provenance" @@ -20,6 +22,7 @@ import ( "github.com/IBM-Blockchain/bcdb-server/internal/worldstate" "github.com/IBM-Blockchain/bcdb-server/pkg/logger" "github.com/IBM-Blockchain/bcdb-server/pkg/types" + "github.com/google/uuid" "github.com/pkg/errors" ) @@ -35,6 +38,7 @@ type transactionProcessor struct { txReorderer *txreorderer.TxReorderer blockCreator *blockcreator.BlockCreator blockReplicator *replication.BlockReplicator + peerTransport *comm.HTTPTransport blockProcessor *blockprocessor.BlockProcessor blockStore *blockstore.Store pendingTxs *pendingTxs @@ -43,70 +47,109 @@ type transactionProcessor struct { } type txProcessorConfig struct { - nodeID string - db worldstate.DB - blockStore *blockstore.Store - provenanceStore *provenance.Store - stateTrieStore mptrie.Store - txQueueLength uint32 - txBatchQueueLength uint32 - blockQueueLength uint32 - maxTxCountPerBatch uint32 - batchTimeout time.Duration - logger *logger.SugarLogger + config *config.Configurations + db worldstate.DB + blockStore *blockstore.Store + provenanceStore *provenance.Store + stateTrieStore mptrie.Store + logger *logger.SugarLogger } func newTransactionProcessor(conf *txProcessorConfig) (*transactionProcessor, error) { p := &transactionProcessor{} - p.nodeID = conf.nodeID + localConfig := conf.config.LocalConfig + + p.nodeID = localConfig.Server.Identity.ID p.logger = conf.logger - p.txQueue = queue.New(conf.txQueueLength) - p.txBatchQueue = queue.New(conf.txBatchQueueLength) + p.txQueue = queue.New(localConfig.Server.QueueLength.Transaction) + p.txBatchQueue = queue.New(localConfig.Server.QueueLength.ReorderedTransactionBatch) p.blockOneQueueBarrier = queue.NewOneQueueBarrier(conf.logger) p.txReorderer = txreorderer.New( &txreorderer.Config{ TxQueue: p.txQueue, TxBatchQueue: p.txBatchQueue, - MaxTxCountPerBatch: conf.maxTxCountPerBatch, - BatchTimeout: conf.batchTimeout, + MaxTxCountPerBatch: localConfig.BlockCreation.MaxTransactionCountPerBlock, + BatchTimeout: localConfig.BlockCreation.BlockTimeout, Logger: conf.logger, }, ) var err error - if p.blockCreator, err = blockcreator.New( + + p.blockProcessor = blockprocessor.New( + &blockprocessor.Config{ + BlockOneQueueBarrier: p.blockOneQueueBarrier, + BlockStore: conf.blockStore, + ProvenanceStore: conf.provenanceStore, + StateTrieStore: conf.stateTrieStore, + DB: conf.db, + Logger: conf.logger, + }, + ) + + ledgerHeight, err := conf.blockStore.Height() + if err != nil { + return nil, err + } + if ledgerHeight == 0 { + p.logger.Info("Bootstrapping the ledger and database") + tx, err := PrepareBootstrapConfigTx(conf.config) + if err != nil { + return nil, err + } + bootBlock, err := blockcreator.BootstrapBlock(tx) + if err != nil { + return nil, err + } + if err = p.blockProcessor.Bootstrap(bootBlock); err != nil { + return nil, err + } + } + + p.blockCreator, err = blockcreator.New( &blockcreator.Config{ TxBatchQueue: p.txBatchQueue, Logger: conf.logger, BlockStore: conf.blockStore, }, - ); err != nil { + ) + if err != nil { + return nil, err + } + + p.peerTransport = comm.NewHTTPTransport(&comm.Config{ + LocalConf: localConfig, + Logger: conf.logger, + }) + + clusterConfig, _, err := conf.db.GetConfig() + if err != nil { + return nil, err + } + conf.logger.Debugf("cluster config: %+v", clusterConfig) + if err = p.peerTransport.UpdateClusterConfig(clusterConfig); err != nil { return nil, err } p.blockReplicator = replication.NewBlockReplicator( &replication.Config{ + LocalConf: localConfig, + ClusterConfig: clusterConfig, + Transport: p.peerTransport, BlockOneQueueBarrier: p.blockOneQueueBarrier, Logger: conf.logger, }, ) - + if err = p.peerTransport.SetConsensusListener(p.blockReplicator); err != nil { + return nil, err + } p.blockCreator.RegisterReplicator(p.blockReplicator) - p.blockProcessor = blockprocessor.New( - &blockprocessor.Config{ - BlockOneQueueBarrier: p.blockOneQueueBarrier, - BlockStore: conf.blockStore, - ProvenanceStore: conf.provenanceStore, - StateTrieStore: conf.stateTrieStore, - DB: conf.db, - Logger: conf.logger, - }, - ) - - _ = p.blockProcessor.RegisterBlockCommitListener(commitListenerName, p) + if err = p.blockProcessor.RegisterBlockCommitListener(commitListenerName, p); err != nil { + return nil, err + } go p.txReorderer.Start() p.txReorderer.WaitTillStart() @@ -114,7 +157,9 @@ func newTransactionProcessor(conf *txProcessorConfig) (*transactionProcessor, er go p.blockCreator.Start() p.blockCreator.WaitTillStart() - p.blockReplicator.Start() + p.peerTransport.Start() // Starts internal goroutine + + p.blockReplicator.Start() // Starts internal goroutine go p.blockProcessor.Start() p.blockProcessor.WaitTillStart() @@ -335,3 +380,102 @@ func (s *promise) close() { close(s.receipt) s = nil } + +func PrepareBootstrapConfigTx(conf *config.Configurations) (*types.ConfigTxEnvelope, error) { + certs, err := readCerts(conf) + if err != nil { + return nil, err + } + + inNodes := false + var nodes []*types.NodeConfig + for _, node := range conf.SharedConfig.Nodes { + nc := &types.NodeConfig{ + Id: node.NodeID, + Address: node.Host, + Port: node.Port, + } + if cert, ok := certs.nodeCertificates[node.NodeID]; ok { + nc.Certificate = cert + } else { + return nil, errors.Errorf("Cannot find certificate for node: %s", node.NodeID) + } + nodes = append(nodes, nc) + + if node.NodeID == conf.LocalConfig.Server.Identity.ID { + inNodes = true + } + } + if !inNodes { + return nil, errors.Errorf("Cannot find local Server.Identity.ID [%s] in SharedConfig.Nodes: %v", conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Nodes) + } + + clusterConfig := &types.ClusterConfig{ + Nodes: nodes, + Admins: []*types.Admin{ + { + Id: conf.SharedConfig.Admin.ID, + Certificate: certs.adminCert, + }, + }, + CertAuthConfig: certs.caCerts, + ConsensusConfig: &types.ConsensusConfig{ + Algorithm: conf.SharedConfig.Consensus.Algorithm, + Members: make([]*types.PeerConfig, len(conf.SharedConfig.Consensus.Members)), + Observers: make([]*types.PeerConfig, len(conf.SharedConfig.Consensus.Observers)), + RaftConfig: &types.RaftConfig{ + TickInterval: conf.SharedConfig.Consensus.RaftConfig.TickInterval, + ElectionTicks: conf.SharedConfig.Consensus.RaftConfig.ElectionTicks, + HeartbeatTicks: conf.SharedConfig.Consensus.RaftConfig.HeartbeatTicks, + }, + }, + } + + inMembers := false + for i, m := range conf.SharedConfig.Consensus.Members { + clusterConfig.ConsensusConfig.Members[i] = &types.PeerConfig{ + NodeId: m.NodeId, + RaftId: m.RaftId, + PeerHost: m.PeerHost, + PeerPort: m.PeerPort, + } + if m.NodeId == conf.LocalConfig.Server.Identity.ID { + inMembers = true + } + } + + inObservers := false + for i, m := range conf.SharedConfig.Consensus.Observers { + clusterConfig.ConsensusConfig.Observers[i] = &types.PeerConfig{ + NodeId: m.NodeId, + RaftId: m.RaftId, + PeerHost: m.PeerHost, + PeerPort: m.PeerPort, + } + if m.NodeId == conf.LocalConfig.Server.Identity.ID { + inObservers = true + } + } + + if !inMembers && !inObservers { + return nil, errors.Errorf("Cannot find local Server.Identity.ID [%s] in SharedConfig.Consensus Members or Observers: %v", + conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Consensus) + } + if inObservers && inMembers { + return nil, errors.Errorf("local Server.Identity.ID [%s] cannot be in SharedConfig.Consensus both Members and Observers: %v", + conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Consensus) + } + // TODO add support for observers, see issue: https://github.ibm.com/blockchaindb/server/issues/403 + if inObservers { + return nil, errors.Errorf("not supported yet: local Server.Identity.ID [%s] is in SharedConfig.Consensus.Observers: %v", + conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Consensus) + } + + return &types.ConfigTxEnvelope{ + Payload: &types.ConfigTx{ + TxId: uuid.New().String(), + NewConfig: clusterConfig, + }, + // TODO: we can make the node itself sign the transaction + }, nil +} diff --git a/internal/bcdb/transaction_processor_test.go b/internal/bcdb/transaction_processor_test.go index 00b65a9d..cf0542b4 100644 --- a/internal/bcdb/transaction_processor_test.go +++ b/internal/bcdb/transaction_processor_test.go @@ -45,7 +45,7 @@ type txProcessorTestEnv struct { cleanup func() } -func newTxProcessorTestEnv(t *testing.T, cryptoDir string) *txProcessorTestEnv { +func newTxProcessorTestEnv(t *testing.T, cryptoDir string, conf *config.Configurations) *txProcessorTestEnv { dir, err := ioutil.TempDir("/tmp", "transactionProcessor") require.NoError(t, err) @@ -117,20 +117,13 @@ func newTxProcessorTestEnv(t *testing.T, cryptoDir string) *txProcessorTestEnv { userCert, userSigner := testutils.LoadTestClientCrypto(t, cryptoDir, "testUser") - nodeID := "bdb-node-1" - txProcConf := &txProcessorConfig{ - nodeID: nodeID, - db: db, - blockStore: blockStore, - provenanceStore: provenanceStore, - stateTrieStore: stateTrieStore, - txQueueLength: 100, - txBatchQueueLength: 100, - blockQueueLength: 100, - maxTxCountPerBatch: 1, - batchTimeout: 50 * time.Millisecond, - logger: logger, + config: conf, + db: db, + blockStore: blockStore, + provenanceStore: provenanceStore, + stateTrieStore: stateTrieStore, + logger: logger, } txProcessor, err := newTransactionProcessor(txProcConf) require.NoError(t, err) @@ -175,43 +168,10 @@ func newTxProcessorTestEnv(t *testing.T, cryptoDir string) *txProcessorTestEnv { } } -func setupTxProcessor(t *testing.T, env *txProcessorTestEnv, conf *config.Configurations, dbName string) { - configTx, err := prepareConfigTx(conf) - require.NoError(t, err) - - txHash, err := calculateTxHash(configTx, &types.ValidationInfo{Flag: types.Flag_VALID}) - require.NoError(t, err) - txMerkelRootHash, err := crypto.ConcatenateHashes(txHash, nil) - require.NoError(t, err) - - stateTrie, err := mptrie.NewTrie(nil, env.stateTrieStore) - require.NoError(t, err) - stateTrieRoot := applyTxsOnTrie(t, env, configTx, stateTrie) - - expectedRespPayload := &types.TxReceiptResponse{ - Receipt: &types.TxReceipt{ - Header: &types.BlockHeader{ - BaseHeader: &types.BlockHeaderBase{ - Number: 1, - LastCommittedBlockNum: 0, - }, - TxMerkelTreeRootHash: txMerkelRootHash, - StateMerkelTreeRootHash: stateTrieRoot, - ValidationInfo: []*types.ValidationInfo{ - { - Flag: types.Flag_VALID, - }, - }, - }, - TxIndex: 0, - }, - } - - resp, err := env.txProcessor.submitTransaction(configTx, 5*time.Second) +func setupTxProcessor(t *testing.T, env *txProcessorTestEnv, dbName string) { + h, err := env.txProcessor.blockStore.Height() require.NoError(t, err) - - require.True(t, proto.Equal(expectedRespPayload, resp)) - require.True(t, env.txProcessor.pendingTxs.isEmpty()) + require.Equal(t, uint64(1), h) user := &types.User{ Id: env.userID, @@ -258,10 +218,10 @@ func TestTransactionProcessor(t *testing.T) { t.Run("commit a data transaction asynchronously", func(t *testing.T) { t.Parallel() - env := newTxProcessorTestEnv(t, cryptoDir) + env := newTxProcessorTestEnv(t, cryptoDir, conf) defer env.cleanup() - setupTxProcessor(t, env, conf, worldstate.DefaultDBName) + setupTxProcessor(t, env, worldstate.DefaultDBName) tx := testutils.SignedDataTxEnvelope(t, []crypto.Signer{env.userSigner}, &types.DataTx{ MustSignUserIds: []string{"testUser"}, @@ -365,10 +325,10 @@ func TestTransactionProcessor(t *testing.T) { t.Run("commit a data transaction synchronously", func(t *testing.T) { t.Parallel() - env := newTxProcessorTestEnv(t, cryptoDir) + env := newTxProcessorTestEnv(t, cryptoDir, conf) defer env.cleanup() - setupTxProcessor(t, env, conf, worldstate.DefaultDBName) + setupTxProcessor(t, env, worldstate.DefaultDBName) tx := testutils.SignedDataTxEnvelope(t, []crypto.Signer{env.userSigner}, &types.DataTx{ MustSignUserIds: []string{"testUser"}, @@ -467,10 +427,10 @@ func TestTransactionProcessor(t *testing.T) { t.Run("duplicate txID with the already committed transaction", func(t *testing.T) { t.Parallel() - env := newTxProcessorTestEnv(t, cryptoDir) + env := newTxProcessorTestEnv(t, cryptoDir, conf) defer env.cleanup() - setupTxProcessor(t, env, conf, worldstate.DefaultDBName) + setupTxProcessor(t, env, worldstate.DefaultDBName) dataTx := testutils.SignedDataTxEnvelope(t, []crypto.Signer{env.userSigner}, &types.DataTx{ MustSignUserIds: []string{"testUser"}, @@ -501,10 +461,10 @@ func TestTransactionProcessor(t *testing.T) { t.Run("duplicate txID with either pending or already committed transaction", func(t *testing.T) { t.Parallel() - env := newTxProcessorTestEnv(t, cryptoDir) + env := newTxProcessorTestEnv(t, cryptoDir, conf) defer env.cleanup() - setupTxProcessor(t, env, conf, worldstate.DefaultDBName) + setupTxProcessor(t, env, worldstate.DefaultDBName) dbTx := testutils.SignedDBAdministrationTxEnvelope(t, env.userSigner, &types.DBAdministrationTx{ UserId: "testUser", @@ -539,10 +499,10 @@ func TestTransactionProcessor(t *testing.T) { t.Run("duplicate txID with either pending or already committed transaction", func(t *testing.T) { t.Parallel() - env := newTxProcessorTestEnv(t, cryptoDir) + env := newTxProcessorTestEnv(t, cryptoDir, conf) defer env.cleanup() - setupTxProcessor(t, env, conf, worldstate.DefaultDBName) + setupTxProcessor(t, env, worldstate.DefaultDBName) resp, err := env.txProcessor.submitTransaction([]byte("hello"), 0) require.EqualError(t, err, "unexpected transaction type") diff --git a/internal/blockcreator/blockcreator.go b/internal/blockcreator/blockcreator.go index 12cd2e75..586833c7 100644 --- a/internal/blockcreator/blockcreator.go +++ b/internal/blockcreator/blockcreator.go @@ -67,6 +67,21 @@ func (b *BlockCreator) RegisterReplicator(blockReplicator Replicator) { b.blockReplicator = blockReplicator } +func BootstrapBlock(tx *types.ConfigTxEnvelope) (*types.Block, error) { + block := &types.Block{ + Header: &types.BlockHeader{ + BaseHeader: &types.BlockHeaderBase{ + Number: 1, + }, + }, + Payload: &types.Block_ConfigTxEnvelope{ + ConfigTxEnvelope: tx, + }, + } + + return block, nil +} + // Start runs the block assembler in an infinite loop func (b *BlockCreator) Start() { defer close(b.stopped) diff --git a/internal/blockprocessor/processor.go b/internal/blockprocessor/processor.go index a1d11a2a..9b36e69f 100644 --- a/internal/blockprocessor/processor.go +++ b/internal/blockprocessor/processor.go @@ -56,6 +56,16 @@ func New(conf *Config) *BlockProcessor { } } +// Bootstrap initializes the ledger and database with the first block, which contains a config transaction. +// This block is a.k.a. the "genesis block". +func (b *BlockProcessor) Bootstrap(configBlock *types.Block) error { + if err := b.initAndRecoverStateTrieIfNeeded(); err != nil { + return errors.WithMessage(err, "error while recovering node state trie") + } + + return b.validateAndCommit(configBlock) +} + // Start starts the validator and committer func (b *BlockProcessor) Start() { b.logger.Debug("starting the block processor") @@ -88,36 +98,19 @@ func (b *BlockProcessor) Start() { } block := blockData.(*types.Block) - b.logger.Debugf("validating and committing block %d", block.GetHeader().GetBaseHeader().GetNumber()) - validationInfo, err := b.validator.validateBlock(block) - if err != nil { - panic(err) - } - - block.Header.ValidationInfo = validationInfo - - if err := b.blockStore.AddSkipListLinks(block); err != nil { - panic(err) - } - - root, err := mtree.BuildTreeForBlockTx(block) - if err != nil { + if err = b.validateAndCommit(block); err != nil { panic(err) } - block.Header.TxMerkelTreeRootHash = root.Hash() - - if err = b.committer.commitBlock(block); err != nil { - panic(err) - } - - b.logger.Debugf("validated and committed block %d\n", block.GetHeader().GetBaseHeader().GetNumber()) - //TODO detect config changes that affect the replication component and return an appropriate non-nil object + //TODO Detect config changes that affect the replication component and return an appropriate non-nil object // to instruct it to reconfigure itself. See issues: // https://github.ibm.com/blockchaindb/server/issues/396 // https://github.ibm.com/blockchaindb/server/issues/413 var reConfig interface{} - + switch block.Payload.(type) { + case *types.Block_ConfigTxEnvelope: + reConfig = block.GetConfigTxEnvelope().GetPayload().GetNewConfig() + } // The replication layer go-routine is blocked until after commit, and is released by calling Reply(). // The post-commit listeners are called after the replication layer go-routine is released. This is an // optimization, as post-commit processing can proceed in parallel with the replication go-routine handling @@ -136,6 +129,36 @@ func (b *BlockProcessor) Start() { } } +func (b *BlockProcessor) validateAndCommit(block *types.Block) error { + b.logger.Debugf("validating and committing block %d", block.GetHeader().GetBaseHeader().GetNumber()) + validationInfo, err := b.validator.validateBlock(block) + if err != nil { + if block.GetHeader().GetBaseHeader().GetNumber() > 1 { + panic(err) + } + return err + } + + block.Header.ValidationInfo = validationInfo + + if err = b.blockStore.AddSkipListLinks(block); err != nil { + panic(err) + } + + root, err := mtree.BuildTreeForBlockTx(block) + if err != nil { + panic(err) + } + block.Header.TxMerkelTreeRootHash = root.Hash() + + if err = b.committer.commitBlock(block); err != nil { + panic(err) + } + + b.logger.Debugf("validated and committed block %d\n", block.GetHeader().GetBaseHeader().GetNumber()) + return err +} + // WaitTillStart waits till the block processor is started func (b *BlockProcessor) WaitTillStart() { <-b.started diff --git a/internal/blockprocessor/processor_test.go b/internal/blockprocessor/processor_test.go index 0bfa1a4f..2e0c110d 100644 --- a/internal/blockprocessor/processor_test.go +++ b/internal/blockprocessor/processor_test.go @@ -41,6 +41,7 @@ type testEnv struct { userID string userCert *x509.Certificate userSigner crypto.Signer + genesisConfig *types.ClusterConfig genesisBlock *types.Block cleanup func(bool) } @@ -130,6 +131,46 @@ func newTestEnv(t *testing.T) *testEnv { Logger: logger, }) + genesisConfig := &types.ClusterConfig{ + Nodes: []*types.NodeConfig{ + { + Id: "node1", + Address: "127.0.0.1", + Port: 6090, + Certificate: nodeCert.Raw, + }, + }, + Admins: []*types.Admin{ + { + Id: "admin1", + Certificate: adminCert.Raw, + }, + }, + CertAuthConfig: &types.CAConfig{ + Roots: [][]byte{caCert.Raw}, + }, + ConsensusConfig: &types.ConsensusConfig{ + Algorithm: "raft", + Members: []*types.PeerConfig{ + { + NodeId: "node1", + RaftId: 1, + PeerHost: "127.0.0.1", + PeerPort: 7090, + XXX_unrecognized: nil, + XXX_sizecache: 0, + }, + }, + RaftConfig: &types.RaftConfig{ + TickInterval: "100ms", + ElectionTicks: 100, + HeartbeatTicks: 10, + }, + XXX_NoUnkeyedLiteral: struct{}{}, + XXX_unrecognized: nil, + XXX_sizecache: 0, + }, + } genesisBlock := &types.Block{ Header: &types.BlockHeader{ BaseHeader: &types.BlockHeaderBase{ @@ -139,46 +180,7 @@ func newTestEnv(t *testing.T) *testEnv { Payload: &types.Block_ConfigTxEnvelope{ ConfigTxEnvelope: &types.ConfigTxEnvelope{ Payload: &types.ConfigTx{ - NewConfig: &types.ClusterConfig{ - Nodes: []*types.NodeConfig{ - { - Id: "node1", - Address: "127.0.0.1", - Port: 6090, - Certificate: nodeCert.Raw, - }, - }, - Admins: []*types.Admin{ - { - Id: "admin1", - Certificate: adminCert.Raw, - }, - }, - CertAuthConfig: &types.CAConfig{ - Roots: [][]byte{caCert.Raw}, - }, - ConsensusConfig: &types.ConsensusConfig{ - Algorithm: "raft", - Members: []*types.PeerConfig{ - { - NodeId: "node1", - RaftId: 1, - PeerHost: "127.0.0.1", - PeerPort: 7090, - XXX_unrecognized: nil, - XXX_sizecache: 0, - }, - }, - RaftConfig: &types.RaftConfig{ - TickInterval: "100ms", - ElectionTicks: 100, - HeartbeatTicks: 10, - }, - XXX_NoUnkeyedLiteral: struct{}{}, - XXX_unrecognized: nil, - XXX_sizecache: 0, - }, - }, + NewConfig: genesisConfig, }, }, }, @@ -215,6 +217,7 @@ func newTestEnv(t *testing.T) *testEnv { userID: "testUser", userCert: userCert, userSigner: userSigner, + genesisConfig: genesisConfig, genesisBlock: genesisBlock, cleanup: cleanup, } @@ -228,7 +231,8 @@ func newTestEnv(t *testing.T) *testEnv { func setup(t *testing.T, env *testEnv) { reply, err := env.blockProcessor.blockOneQueueBarrier.EnqueueWait(env.genesisBlock) require.NoError(t, err) - require.Nil(t, reply) // May not be nil when we implement dynamic config + require.NotNil(t, reply) + require.Equal(t, env.genesisConfig, reply) assertConfigHasCommitted := func() bool { exist, err := env.blockProcessor.validator.configTxValidator.identityQuerier.DoesUserExist("admin1") diff --git a/internal/comm/httptransport.go b/internal/comm/httptransport.go index 4b83bed6..7c1a339f 100644 --- a/internal/comm/httptransport.go +++ b/internal/comm/httptransport.go @@ -85,7 +85,22 @@ func (p *HTTPTransport) UpdateClusterConfig(clusterConfig *types.ClusterConfig) return errors.New("dynamic re-config of http transport is not supported yet") } + //TODO export to a config util + var foundRaftID bool + for _, member := range clusterConfig.ConsensusConfig.Members { + if member.NodeId == p.localConf.Server.Identity.ID { + p.raftID = member.RaftId + foundRaftID = true + break + } + } + if !foundRaftID { + return errors.Errorf("local NodeID '%s' is not in Consensus members: %v", + p.localConf.Server.Identity.ID, clusterConfig.ConsensusConfig.Members) + } + p.clusterConfig = clusterConfig + return nil } @@ -101,20 +116,6 @@ func (p *HTTPTransport) Start() error { p.logger.Panic("Must update ClusterConfig before Start()") } - //TODO export to a config util - var foundRaftID bool - for _, member := range p.clusterConfig.ConsensusConfig.Members { - if member.NodeId == p.localConf.Server.Identity.ID { - p.raftID = member.RaftId - foundRaftID = true - break - } - } - if !foundRaftID { - return errors.Errorf("local NodeID '%s' is not in Consensus members: %v", - p.localConf.Server.Identity.ID, p.clusterConfig.ConsensusConfig.Members) - } - netConf := p.localConf.Replication.Network addr := fmt.Sprintf("%s:%d", netConf.Address, netConf.Port) netListener, err := net.Listen("tcp", addr) diff --git a/internal/replication/blockreplicator.go b/internal/replication/blockreplicator.go index 101def70..2c4f2701 100644 --- a/internal/replication/blockreplicator.go +++ b/internal/replication/blockreplicator.go @@ -4,27 +4,42 @@ package replication import ( + "context" + "errors" "sync" + "github.com/IBM-Blockchain/bcdb-server/config" + "github.com/IBM-Blockchain/bcdb-server/internal/comm" ierrors "github.com/IBM-Blockchain/bcdb-server/internal/errors" "github.com/IBM-Blockchain/bcdb-server/internal/queue" "github.com/IBM-Blockchain/bcdb-server/pkg/logger" + "github.com/IBM-Blockchain/bcdb-server/pkg/types" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/raftpb" ) type BlockReplicator struct { + localConf *config.LocalConfiguration raftCh chan interface{} // TODO a placeholder for the Raft pipeline oneQueueBarrier *queue.OneQueueBarrier // Synchronizes the block-replication deliver with the block-processor commit + transport *comm.HTTPTransport stopCh chan struct{} stopOnce sync.Once doneCh chan struct{} + mutex sync.Mutex + clusterConfig *types.ClusterConfig + logger *logger.SugarLogger } // Config holds the configuration information required to initialize the block replicator. type Config struct { //TODO just a skeleton + LocalConf *config.LocalConfiguration + ClusterConfig *types.ClusterConfig + Transport *comm.HTTPTransport BlockOneQueueBarrier *queue.OneQueueBarrier Logger *logger.SugarLogger } @@ -34,10 +49,13 @@ func NewBlockReplicator(conf *Config) *BlockReplicator { //TODO just a skeleton br := &BlockReplicator{ + localConf: conf.LocalConf, raftCh: make(chan interface{}, 10), // TODO a placeholder for the Raft pipeline oneQueueBarrier: conf.BlockOneQueueBarrier, stopCh: make(chan struct{}), doneCh: make(chan struct{}), + clusterConfig: conf.ClusterConfig, + transport: conf.Transport, logger: conf.Logger, } @@ -70,14 +88,14 @@ func (br *BlockReplicator) Start() { func (br *BlockReplicator) run(readyCh chan<- struct{}) { defer close(br.doneCh) - br.logger.Info("starting the block replicator") + br.logger.Info("Starting the block replicator") close(readyCh) Replicator_Loop: for { select { case <-br.stopCh: - br.logger.Info("stopping block replicator") + br.logger.Info("Stopping block replicator") return case blockToCommit := <-br.raftCh: @@ -89,14 +107,18 @@ Replicator_Loop: break Replicator_Loop } - // A non-nil reply is an indication that the last block was a valid config that applies to the replication - // component. + // A non-nil reply is an indication that the last block was a valid config that applies + // to the replication component. if reConfig != nil { - br.logger.Infof("New config committed, going to apply to block replicator: %v", reConfig) - // TODO + clusterConfig := reConfig.(*types.ClusterConfig) + if err = br.updateClusterConfig(clusterConfig); err != nil { + br.logger.Panicf("Failed to update ClusterConfig: %s", err) + } } } } + + br.logger.Info("Exiting the block replicator") } // Close signals the internal go-routine to stop and waits for it to exit. @@ -115,3 +137,31 @@ func (br *BlockReplicator) Close() (err error) { return err } + +func (br *BlockReplicator) updateClusterConfig(clusterConfig *types.ClusterConfig) error { + br.logger.Infof("New cluster config committed, going to apply to block replicator: %+v", clusterConfig) + + //TODO dynamic re-config, update transport config, etc + + return errors.New("dynamic re-config of ClusterConfig not supported yet") +} + +func (br *BlockReplicator) Process(ctx context.Context, m raftpb.Message) error { + // see: rafthttp.RAFT + //TODO + return nil +} + +func (br *BlockReplicator) IsIDRemoved(id uint64) bool { + // see: rafthttp.RAFT + //TODO + return false +} +func (br *BlockReplicator) ReportUnreachable(id uint64) { + // see: rafthttp.RAFT + //TODO +} +func (br *BlockReplicator) ReportSnapshot(id uint64, status raft.SnapshotStatus) { + // see: rafthttp.RAFT + //TODO +} diff --git a/internal/replication/blockreplicator_test.go b/internal/replication/blockreplicator_test.go index 0ff3639a..87da547f 100644 --- a/internal/replication/blockreplicator_test.go +++ b/internal/replication/blockreplicator_test.go @@ -4,6 +4,7 @@ package replication_test import ( + "github.com/IBM-Blockchain/bcdb-server/pkg/types" "sync" "testing" @@ -14,6 +15,28 @@ import ( "github.com/stretchr/testify/require" ) +var clusterConfig1node = &types.ClusterConfig{ + Nodes: []*types.NodeConfig{{ + Id: "node1", + Address: "127.0.0.1", + Port: 9090, + }}, + ConsensusConfig: &types.ConsensusConfig{ + Algorithm: "raft", + Members: []*types.PeerConfig{{ + NodeId: "node1", + RaftId: 1, + PeerHost: "127.0.0.1", + PeerPort: 9091, + }}, + RaftConfig: &types.RaftConfig{ + TickInterval: "100ms", + ElectionTicks: 100, + HeartbeatTicks: 10, + }, + }, +} + func TestBlockReplicator_StartClose(t *testing.T) { lg := testLogger(t, "debug") qBarrier := queue.NewOneQueueBarrier(lg) diff --git a/pkg/server/server.go b/pkg/server/server.go index 2b6ba2b9..4c71dfb8 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -12,7 +12,6 @@ import ( "github.com/IBM-Blockchain/bcdb-server/internal/httphandler" "github.com/IBM-Blockchain/bcdb-server/pkg/constants" "github.com/IBM-Blockchain/bcdb-server/pkg/logger" - "github.com/IBM-Blockchain/bcdb-server/pkg/types" "github.com/pkg/errors" ) @@ -40,7 +39,7 @@ func New(conf *config.Configurations) (*BCDBHTTPServer, error) { return nil, err } - db, err := bcdb.NewDB(conf.LocalConfig, lg) + db, err := bcdb.NewDB(conf, lg) if err != nil { return nil, errors.Wrap(err, "error while creating the database object") } @@ -73,22 +72,12 @@ func New(conf *config.Configurations) (*BCDBHTTPServer, error) { // Start starts the server func (s *BCDBHTTPServer) Start() error { - blockHeight, err := s.db.LedgerHeight() - if err != nil { + if blockHeight, err := s.db.LedgerHeight(); err != nil { return err - } - if blockHeight == 0 { - s.logger.Infof("Bootstrapping DB for the first time") - resp, err := s.db.BootstrapDB(s.conf) - if err != nil { - return errors.Wrap(err, "error while preparing and committing config transaction") - } - - txReceipt := resp.GetResponse().GetReceipt() - valInfo := txReceipt.GetHeader().GetValidationInfo()[txReceipt.TxIndex] - if valInfo.Flag != types.Flag_VALID { - return errors.Errorf("config transaction was not committed due to invalidation [" + valInfo.ReasonIfInvalid + "]") - } + } else if blockHeight == 0 { + return errors.New("ledger height == 0, bootstrap failed") + } else { + s.logger.Infof("Server starting at ledger height [%d]", blockHeight) } go s.serveRequests(s.listen) diff --git a/test/setup/server_setup.go b/test/setup/server_setup.go index 64c87091..884e0b57 100644 --- a/test/setup/server_setup.go +++ b/test/setup/server_setup.go @@ -189,7 +189,7 @@ func (s *Server) createConfigFile() error { " walDir: " + filepath.Join(s.configDir, "etcdraft", "wal") + "\n" + //TODO create path " snapDir: " + filepath.Join(s.configDir, "etcdraft", "snap") + "\n" + //TODO create path " network:\n" + - " address: 127.0.0.1\n" + + " address: " + s.address + "\n" + " port: " + strconv.FormatInt(int64(s.peerPort), 10) + "\n" + " tls:\n" + //TODO add rest of fields when security is supported " enabled: false\n" +