Skip to content

Commit

Permalink
expose BlockCensorshipTimeout in config - stage 1
Browse files Browse the repository at this point in the history
Signed-off-by: May Rosenbaum <[email protected]>
  • Loading branch information
MayRosenbaum committed Oct 2, 2023
1 parent 58ecff9 commit 9e0c3af
Show file tree
Hide file tree
Showing 12 changed files with 77 additions and 8 deletions.
3 changes: 3 additions & 0 deletions common/deliver/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package deliver

import (
"context"
"fmt"
"io"
"math"
"strconv"
Expand Down Expand Up @@ -257,6 +258,7 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E
return cb.Status_BAD_REQUEST, nil
}

fmt.Printf("\n\n\n DELIVERRRR: [channel: %s] Received seekInfo (%p) %v from %s \n\n\n", chdr.ChannelId, seekInfo, seekInfo, addr)
logger.Debugf("[channel: %s] Received seekInfo (%p) %v from %s", chdr.ChannelId, seekInfo, seekInfo, addr)

cursor, number := chain.Reader().Iterator(seekInfo.Start)
Expand Down Expand Up @@ -297,6 +299,7 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E
iterCh := make(chan struct{})
go func() {
block, status = cursor.Next()
fmt.Printf("\n\n\n DELIVERRRRR: block data is: %v and block number is: %v \n\n\n", block.GetData(), block.Header.Number)
close(iterCh)
}()

Expand Down
24 changes: 24 additions & 0 deletions core/deliverservice/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
DefaultReConnectBackoffThreshold = time.Hour * 1
DefaultReConnectTotalTimeThreshold = time.Second * 60 * 60
DefaultConnectionTimeout = time.Second * 3
DefaultBlockCensorshipTimeout = time.Second * 30
DefaultMinimalReconnectInterval = time.Millisecond * 100
)

// DeliverServiceConfig is the struct that defines the deliverservice configuration.
Expand All @@ -42,6 +44,10 @@ type DeliverServiceConfig struct {
KeepaliveOptions comm.KeepaliveOptions
// SecOpts provides the TLS info for connections
SecOpts comm.SecureOptions
// If a certain header from a header reciever is in front of the block receiver for more than this time, a censorship event is declared and the block source is changed
BlockCensorshipTimeout time.Duration
// The initial value of the actual retry interval, which is increased on every failed retry
MinimalReconnectInterval time.Duration

// OrdererEndpointOverrides is a map of orderer addresses which should be
// re-mapped to a different orderer endpoint.
Expand Down Expand Up @@ -124,6 +130,24 @@ func (c *DeliverServiceConfig) loadDeliverServiceConfig() {
}
c.BlockGossipEnabled = enabledConfigOptionMissing || viper.GetBool(enabledKey)

blockCensorshipTimeout := "peer.deliveryclient.blockCensorshipTimeout"
blockCensorshipTimeoutOptionMissing := !viper.IsSet(blockCensorshipTimeout)
if blockCensorshipTimeoutOptionMissing {
c.BlockCensorshipTimeout = DefaultBlockCensorshipTimeout
logger.Infof("peer.deliveryclient.blockCensorshipTimeout is not set, defaulting to 30s.")
} else {
c.BlockCensorshipTimeout = viper.GetDuration(blockCensorshipTimeout)
}

minimalReconnectInterval := "peer.deliveryclient.minimalReconnectInterval"
MinimalReconnectIntervalOptionMissing := !viper.IsSet(minimalReconnectInterval)
if MinimalReconnectIntervalOptionMissing {
c.MinimalReconnectInterval = DefaultMinimalReconnectInterval
logger.Infof("peer.deliveryclient.minimalReconnectInterval is not set, defaulting to 100ms.")
} else {
c.MinimalReconnectInterval = viper.GetDuration(minimalReconnectInterval)
}

c.PeerTLSEnabled = viper.GetBool("peer.tls.enabled")

c.ReConnectBackoffThreshold = viper.GetDuration("peer.deliveryclient.reConnectBackoffThreshold")
Expand Down
6 changes: 6 additions & 0 deletions core/deliverservice/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func TestGlobalConfig(t *testing.T) {
viper.Set("peer.deliveryclient.connTimeout", "10s")
viper.Set("peer.keepalive.deliveryClient.interval", "5s")
viper.Set("peer.keepalive.deliveryClient.timeout", "2s")
viper.Set("peer.deliveryClient.blockCensorshipTimeout", "40s")
viper.Set("peer.deliveryClient.minimalReconnectInterval", "100ms")

coreConfig := deliverservice.GlobalConfig()

Expand All @@ -106,6 +108,8 @@ func TestGlobalConfig(t *testing.T) {
SecOpts: comm.SecureOptions{
UseTLS: true,
},
BlockCensorshipTimeout: time.Second * 40,
MinimalReconnectInterval: time.Millisecond * 100,
}

require.Equal(t, expectedConfig, coreConfig)
Expand All @@ -124,6 +128,8 @@ func TestGlobalConfigDefault(t *testing.T) {
ReconnectTotalTimeThreshold: deliverservice.DefaultReConnectTotalTimeThreshold,
ConnectionTimeout: deliverservice.DefaultConnectionTimeout,
KeepaliveOptions: comm.DefaultKeepaliveOptions,
BlockCensorshipTimeout: deliverservice.DefaultBlockCensorshipTimeout,
MinimalReconnectInterval: deliverservice.DefaultMinimalReconnectInterval,
}

require.Equal(t, expectedConfig, coreConfig)
Expand Down
11 changes: 7 additions & 4 deletions core/deliverservice/deliveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo b
}

d.channelID = chainID

fmt.Printf("\n\n\n<<<<<<<<< we are in StartDeliverForChannel: we are going to DeliverBlocks >>>>>> \n\n\n")
go func() {
d.blockDeliverer.DeliverBlocks()
finalizer()
Expand Down Expand Up @@ -224,14 +224,17 @@ func (d *deliverServiceImpl) createBlockDelivererBFT(chainID string, ledgerInfo
DeliverStreamer: blocksprovider.DeliverAdapter{},
CensorshipDetectorFactory: &blocksprovider.BFTCensorshipMonitorFactory{},
Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID),
InitialRetryInterval: 100 * time.Millisecond, // TODO expose in config
InitialRetryInterval: d.conf.DeliverServiceConfig.MinimalReconnectInterval,
MaxRetryInterval: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold,
BlockCensorshipTimeout: 30 * time.Second, // TODO expose in config
MaxRetryDuration: 12 * time.Hour, // In v3 block gossip is no longer supported. We set it long to avoid needlessly calling the handler.
BlockCensorshipTimeout: d.conf.DeliverServiceConfig.BlockCensorshipTimeout,
MaxRetryDuration: 12 * time.Hour, // In v3 block gossip is no longer supported. We set it long to avoid needlessly calling the handler.
MaxRetryDurationExceededHandler: func() (stopRetries bool) {
return false // In v3 block gossip is no longer supported, the peer never stops retrying.
},
}
fmt.Printf("\n\n\n <<<<<<<<<<<<<<< createBlockDelivererBFT >>>>>>>>>>>>>>> \n\n\n")
fmt.Printf("\n\n\n!!! dcBFT.BlockCensorshipTimeout is: !!! %v\n\n\n", dcBFT.BlockCensorshipTimeout)
fmt.Printf("\n\n\n!!! dcBFT.InitialRetryInterval is: !!! %v\n\n\n", dcBFT.InitialRetryInterval)

if d.conf.DeliverServiceConfig.SecOpts.RequireClientCert {
cert, err := d.conf.DeliverServiceConfig.SecOpts.ClientCertificate()
Expand Down
5 changes: 5 additions & 0 deletions integration/nwo/template/core_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ peer:
localMspId: {{ (.Organization Peer.Organization).MSPID }}
deliveryclient:
reconnectTotalTimeThreshold: 3600s
blockGossipEnabled: true
connTimeout: 3s
reConnectBackoffThreshold: 3600s
blockCensorshipTimeout: 20s
minimalReconnectInterval: 110ms
localMspType: bccsp
profile:
enabled: false
Expand Down
1 change: 1 addition & 0 deletions internal/peer/gossip/mcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func (s *MSPMessageCryptoService) VerifyBlock(chainID common.ChannelID, seqNum u
}

// - Extract channelID and compare with chainID
fmt.Printf("\n\n\n!!!we are inside verifyBlock!!! %v\n\n\n", block)
channelID, err := protoutil.GetChannelIDFromBlock(block)
if err != nil {
return fmt.Errorf("Failed getting channel id from block with id [%d] on channel [%s]: [%s]", block.Header.Number, chainID, err)
Expand Down
8 changes: 7 additions & 1 deletion internal/pkg/peer/blocksprovider/bft_deliverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (d *BFTDeliverer) DeliverBlocks() {
MaxRetryInterval: d.MaxRetryInterval,
BlockCensorshipTimeout: d.BlockCensorshipTimeout,
}
fmt.Printf("\n\n\n!!!we are inside DeliverBlocks. timeoutConfig.BlockCensorshipTimeout is: !!! %v\n\n\n", timeoutConfig.BlockCensorshipTimeout)

// Refresh and randomize the sources, selects a random initial source, and incurs a random iteration order.
d.refreshSources()
Expand Down Expand Up @@ -171,6 +172,8 @@ FetchAndMonitorLoop:
}
// start a block fetcher and a monitor
// a buffered channel so that the fetcher goroutine can send an error and exit w/o waiting for it to be consumed.
fmt.Printf("\n\n\n!!! we are before FetchBlocks. !!!!\n\n\n")

d.fetchErrorsC = make(chan error, 1)
source := d.fetchSources[d.fetchSourceIndex]
go d.FetchBlocks(source)
Expand Down Expand Up @@ -268,7 +271,8 @@ func (d *BFTDeliverer) FetchBlocks(source *orderers.Endpoint) {
return
default:
}

fmt.Printf("\n\n\n <<<<<<<<<<<<<<< Trying to fetch blocks from orderer: %s\n\n\n", source.Address)
fmt.Printf("\n\n\n <<<<<<<<<<<<<<< Number of Block to get: %v\n\n\n", d.getNextBlockNumber())
seekInfoEnv, err := d.requester.SeekInfoBlocksFrom(d.getNextBlockNumber())
if err != nil {
d.Logger.Errorf("Could not create a signed Deliver SeekInfo message, something is critically wrong: %s", err)
Expand All @@ -277,6 +281,7 @@ func (d *BFTDeliverer) FetchBlocks(source *orderers.Endpoint) {
}

deliverClient, cancel, err := d.requester.Connect(seekInfoEnv, source)
fmt.Printf("<<<<<<<<<<<<<<<connet to an ordering service: %v>>>>>>>>>>>>>>>>", source)
if err != nil {
d.Logger.Warningf("Could not connect to ordering service: %s", err)
d.fetchErrorsC <- errors.Wrapf(err, "could not connect to ordering service, orderer-address: %s", source.Address)
Expand All @@ -303,6 +308,7 @@ func (d *BFTDeliverer) FetchBlocks(source *orderers.Endpoint) {
blockRcv.Start()

// Consume blocks fom the `recvC` channel
fmt.Printf("\n\n\n!!!we are before process incoming!!! \n\n\n")
if errProc := blockRcv.ProcessIncoming(d.onBlockProcessingSuccess); errProc != nil {
switch errProc.(type) {
case *ErrStopping:
Expand Down
7 changes: 6 additions & 1 deletion internal/pkg/peer/blocksprovider/block_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ func (br *BlockReceiver) Start() {
br.logger.Infof("BlockReceiver starting")
go func() {
for {
// fmt.Printf("we get nil from Recv")
resp, err := br.deliverClient.Recv()
fmt.Printf("\n\n\n !!! 1-get block from receiver - data of block is %v\n\n\n", resp.GetBlock().GetData())
fmt.Printf("!!! 2-we are inside start!!! and the data is: %v", resp.GetBlock().GetData())
if err != nil {
br.logger.Warningf("Encountered an error reading from deliver stream: %s", err)
close(br.recvC)
Expand Down Expand Up @@ -88,7 +91,7 @@ func (br *BlockReceiver) Stop() {
// ProcessIncoming processes incoming messages until stopped or encounters an error.
func (br *BlockReceiver) ProcessIncoming(onSuccess func(blockNum uint64)) error {
var err error

fmt.Printf("\n\n\n!!!we are inside process incoming!!! \n\n\n")
RecvLoop: // Loop until the endpoint is refreshed, or there is an error on the connection
for {
select {
Expand All @@ -103,6 +106,7 @@ RecvLoop: // Loop until the endpoint is refreshed, or there is an error on the c
break RecvLoop
}
var blockNum uint64
fmt.Printf("!!!we are inside process incoming!!! and the data is from type: %v", response.GetType())
blockNum, err = br.processMsg(response)
if err != nil {
br.logger.Warningf("Got error while attempting to receive blocks: %v", err)
Expand Down Expand Up @@ -134,6 +138,7 @@ func (br *BlockReceiver) processMsg(msg *orderer.DeliverResponse) (uint64, error
return 0, errors.Errorf("received bad status %v from orderer", t.Status)
case *orderer.DeliverResponse_Block:
blockNum := t.Block.Header.Number
fmt.Printf("\n\n\n!!!we are inside processMsg. !!! data is: %v\n\n\n", t.Block.Data.GetData())
if err := br.blockVerifier.VerifyBlock(gossipcommon.ChannelID(br.channelID), blockNum, t.Block); err != nil {
return 0, errors.WithMessage(err, "block from orderer could not be verified")
}
Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/peer/blocksprovider/delivery_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package blocksprovider

import (
"context"
"fmt"
"math"

"github.com/hyperledger/fabric-protos-go/common"
Expand Down Expand Up @@ -46,6 +47,7 @@ func NewDeliveryRequester(

// SeekInfoBlocksFrom produces a signed SeekInfo envelope requesting a stream of blocks from a certain block number.
func (dr *DeliveryRequester) SeekInfoBlocksFrom(ledgerHeight uint64) (*common.Envelope, error) {
fmt.Printf("\n\n\n <<<<<<<<<<<<<<< SeekInfoBlocksFrom >>>>>>>>>>>>>>> \n\n\n")
return protoutil.CreateSignedEnvelopeWithTLSBinding(
common.HeaderType_DELIVER_SEEK_INFO,
dr.channelID,
Expand All @@ -60,6 +62,7 @@ func (dr *DeliveryRequester) SeekInfoBlocksFrom(ledgerHeight uint64) (*common.En
// SeekInfoHeadersFrom produces a signed SeekInfo envelope requesting a stream of headers (block attestations) from
// a certain block number.
func (dr *DeliveryRequester) SeekInfoHeadersFrom(ledgerHeight uint64) (*common.Envelope, error) {
fmt.Printf("\n\n\n <<<<<<<<<<<<<<< SeekInfoHeadersFrom >>>>>>>>>>>>>>> \n\n\n")
return protoutil.CreateSignedEnvelopeWithTLSBinding(
common.HeaderType_DELIVER_SEEK_INFO,
dr.channelID,
Expand Down
2 changes: 2 additions & 0 deletions orderer/consensus/smartbft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,10 @@ func (c *BFTChain) Deliver(proposal types.Proposal, signatures []types.Signature
c.reportIsLeader() // report the leader
if protoutil.IsConfigBlock(block) {
c.support.WriteConfigBlock(block, nil)
fmt.Printf("/n/n/n writing config block with data: %v", block.GetData())
} else {
c.support.WriteBlock(block, nil)
fmt.Printf("/n/n/n writing config block with data: %v", block.GetData())
}

reconfig := c.updateRuntimeConfig(block)
Expand Down
10 changes: 8 additions & 2 deletions protoutil/blockutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,14 @@ func GetChannelIDFromBlockBytes(bytes []byte) (string, error) {

// GetChannelIDFromBlock returns channel ID in the block
func GetChannelIDFromBlock(block *cb.Block) (string, error) {
fmt.Printf("\n\n\n###1: GetChannelIDFromBlock, the block is: %v\n\n\n", block)
if block == nil || block.Data == nil || block.Data.Data == nil || len(block.Data.Data) == 0 {
return "", errors.New("failed to retrieve channel id - block is empty")
if block != nil {
fmt.Printf("\n\n\n###2 %v\n\n\n", block.Data)
}
panic("!")
// fmt.Printf("\n\n\n###2%v\n\n\n", block.Data)
// return "", errors.New("failed to retrieve channel id - block is empty")
}
var err error
envelope, err := GetEnvelopeFromBlock(block.Data.Data[0])
Expand All @@ -103,7 +109,7 @@ func GetChannelIDFromBlock(block *cb.Block) (string, error) {
if err != nil {
return "", err
}

// fmt.Printf("\n\n\n??? %v\n %v \n\n\n", payload, chdr)
return chdr.ChannelId, nil
}

Expand Down
5 changes: 5 additions & 0 deletions sampleconfig/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,11 @@ peer:
# Time between retries will have exponential backoff until hitting this threshold.
reConnectBackoffThreshold: 3600s

# If a certain header from a header receiver is in front of the block receiver for more than this time, a censorship event is declared and the block source is changed
blockCensorshipTimeout: 20s

minimalReconnectInterval: 110ms

# A list of orderer endpoint addresses which should be overridden
# when found in channel configurations.
addressOverrides:
Expand Down

0 comments on commit 9e0c3af

Please sign in to comment.