Skip to content

Commit

Permalink
Integrate transport into replicator (hyperledger-labs#11)
Browse files Browse the repository at this point in the history
Initialize and start comm.HTTPTransport and blockreplicator in transaction processor.
Simplify the server bootstrap sequence, pull it to execute creating and starting the replicator and transport.
No Raft yet, no dynamic config of ClusterConfig.

Signed-off-by: Yoav Tock <[email protected]>
  • Loading branch information
tock-ibm authored Jun 24, 2021
1 parent dc5c72e commit 575572c
Show file tree
Hide file tree
Showing 12 changed files with 420 additions and 352 deletions.
138 changes: 8 additions & 130 deletions internal/bcdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/IBM-Blockchain/bcdb-server/pkg/crypto"
"github.com/IBM-Blockchain/bcdb-server/pkg/logger"
"github.com/IBM-Blockchain/bcdb-server/pkg/types"
"github.com/google/uuid"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -112,10 +111,6 @@ type DB interface {
// timeout error will be returned
SubmitTransaction(tx interface{}, timeout time.Duration) (*types.TxReceiptResponseEnvelope, error)

// BootstrapDB given bootstrap configuration initialize database by
// creating required system tables to include database meta data
BootstrapDB(conf *config.Configurations) (*types.TxReceiptResponseEnvelope, error)

// IsDBExists returns true if database with given name is exists otherwise false
IsDBExists(name string) bool

Expand All @@ -138,7 +133,8 @@ type db struct {
}

// NewDB creates a new database bcdb which handles both the queries and transactions.
func NewDB(localConf *config.LocalConfiguration, logger *logger.SugarLogger) (DB, error) {
func NewDB(conf *config.Configurations, logger *logger.SugarLogger) (DB, error) {
localConf := conf.LocalConfig
if localConf.Server.Database.Name != "leveldb" {
return nil, errors.New("only leveldb is supported as the state database")
}
Expand Down Expand Up @@ -223,17 +219,12 @@ func NewDB(localConf *config.LocalConfiguration, logger *logger.SugarLogger) (DB

txProcessor, err := newTransactionProcessor(
&txProcessorConfig{
nodeID: localConf.Server.Identity.ID,
db: levelDB,
blockStore: blockStore,
provenanceStore: provenanceStore,
stateTrieStore: stateTrieStore,
txQueueLength: localConf.Server.QueueLength.Transaction,
txBatchQueueLength: localConf.Server.QueueLength.ReorderedTransactionBatch,
blockQueueLength: localConf.Server.QueueLength.Block,
maxTxCountPerBatch: localConf.BlockCreation.MaxTransactionCountPerBlock,
batchTimeout: localConf.BlockCreation.BlockTimeout,
logger: logger,
config: conf,
db: levelDB,
blockStore: blockStore,
provenanceStore: provenanceStore,
stateTrieStore: stateTrieStore,
logger: logger,
},
)
if err != nil {
Expand All @@ -255,20 +246,6 @@ func NewDB(localConf *config.LocalConfiguration, logger *logger.SugarLogger) (DB
}, nil
}

// BootstrapDB bootstraps DB with system tables
func (d *db) BootstrapDB(conf *config.Configurations) (*types.TxReceiptResponseEnvelope, error) {
configTx, err := prepareConfigTx(conf)
if err != nil {
return nil, errors.Wrap(err, "failed to prepare and commit a configuration transaction")
}

resp, err := d.SubmitTransaction(configTx, 30*time.Second)
if err != nil {
return nil, errors.Wrap(err, "error while committing configuration transaction")
}
return resp, nil
}

// LedgerHeight returns ledger height
func (d *db) LedgerHeight() (uint64, error) {
return d.worldstateQueryProcessor.blockStore.Height()
Expand Down Expand Up @@ -752,105 +729,6 @@ func (d *db) signature(response interface{}) ([]byte, error) {
return d.signer.Sign(responseBytes)
}

func prepareConfigTx(conf *config.Configurations) (*types.ConfigTxEnvelope, error) {
certs, err := readCerts(conf)
if err != nil {
return nil, err
}

inNodes := false
var nodes []*types.NodeConfig
for _, node := range conf.SharedConfig.Nodes {
nc := &types.NodeConfig{
Id: node.NodeID,
Address: node.Host,
Port: node.Port,
}
if cert, ok := certs.nodeCertificates[node.NodeID]; ok {
nc.Certificate = cert
} else {
return nil, errors.Errorf("Cannot find certificate for node: %s", node.NodeID)
}
nodes = append(nodes, nc)

if node.NodeID == conf.LocalConfig.Server.Identity.ID {
inNodes = true
}
}
if !inNodes {
return nil, errors.Errorf("Cannot find local Server.Identity.ID [%s] in SharedConfig.Nodes: %v", conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Nodes)
}

clusterConfig := &types.ClusterConfig{
Nodes: nodes,
Admins: []*types.Admin{
{
Id: conf.SharedConfig.Admin.ID,
Certificate: certs.adminCert,
},
},
CertAuthConfig: certs.caCerts,
ConsensusConfig: &types.ConsensusConfig{
Algorithm: conf.SharedConfig.Consensus.Algorithm,
Members: make([]*types.PeerConfig, len(conf.SharedConfig.Consensus.Members)),
Observers: make([]*types.PeerConfig, len(conf.SharedConfig.Consensus.Observers)),
RaftConfig: &types.RaftConfig{
TickInterval: conf.SharedConfig.Consensus.RaftConfig.TickInterval,
ElectionTicks: conf.SharedConfig.Consensus.RaftConfig.ElectionTicks,
HeartbeatTicks: conf.SharedConfig.Consensus.RaftConfig.HeartbeatTicks,
},
},
}

inMembers := false
for i, m := range conf.SharedConfig.Consensus.Members {
clusterConfig.ConsensusConfig.Members[i] = &types.PeerConfig{
NodeId: m.NodeId,
RaftId: m.RaftId,
PeerHost: m.PeerHost,
PeerPort: m.PeerPort,
}
if m.NodeId == conf.LocalConfig.Server.Identity.ID {
inMembers = true
}
}

inObservers := false
for i, m := range conf.SharedConfig.Consensus.Observers {
clusterConfig.ConsensusConfig.Observers[i] = &types.PeerConfig{
NodeId: m.NodeId,
RaftId: m.RaftId,
PeerHost: m.PeerHost,
PeerPort: m.PeerPort,
}
if m.NodeId == conf.LocalConfig.Server.Identity.ID {
inObservers = true
}
}

if !inMembers && !inObservers {
return nil, errors.Errorf("Cannot find local Server.Identity.ID [%s] in SharedConfig.Consensus Members or Observers: %v",
conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Consensus)
}
if inObservers && inMembers {
return nil, errors.Errorf("local Server.Identity.ID [%s] cannot be in SharedConfig.Consensus both Members and Observers: %v",
conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Consensus)
}
// TODO add support for observers, see issue: https://github.ibm.com/blockchaindb/server/issues/403
if inObservers {
return nil, errors.Errorf("not supported yet: local Server.Identity.ID [%s] is in SharedConfig.Consensus.Observers: %v",
conf.LocalConfig.Server.Identity.ID, conf.SharedConfig.Consensus)
}

return &types.ConfigTxEnvelope{
Payload: &types.ConfigTx{
TxId: uuid.New().String(), // TODO: we need to change TxID to string
NewConfig: clusterConfig,
},
// TODO: we can make the node itself sign the transaction
}, nil
}

type certsInGenesisConfig struct {
nodeCertificates map[string][]byte
adminCert []byte
Expand Down
37 changes: 9 additions & 28 deletions internal/bcdb/mocks/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 575572c

Please sign in to comment.