diff --git a/common/deliver/deliver.go b/common/deliver/deliver.go index d42687edaaa..55772a067b0 100644 --- a/common/deliver/deliver.go +++ b/common/deliver/deliver.go @@ -8,6 +8,7 @@ package deliver import ( "context" + "fmt" "io" "math" "strconv" @@ -257,6 +258,7 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E return cb.Status_BAD_REQUEST, nil } + fmt.Printf("\n\n\n DELIVERRRR: [channel: %s] Received seekInfo (%p) %v from %s \n\n\n", chdr.ChannelId, seekInfo, seekInfo, addr) logger.Debugf("[channel: %s] Received seekInfo (%p) %v from %s", chdr.ChannelId, seekInfo, seekInfo, addr) cursor, number := chain.Reader().Iterator(seekInfo.Start) @@ -297,6 +299,7 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E iterCh := make(chan struct{}) go func() { block, status = cursor.Next() + fmt.Printf("\n\n\n DELIVERRRRR: block data is: %v and block number is: %v \n\n\n", block.GetData(), block.Header.Number) close(iterCh) }() diff --git a/core/deliverservice/config.go b/core/deliverservice/config.go index ea908f521f9..50d35267f1a 100644 --- a/core/deliverservice/config.go +++ b/core/deliverservice/config.go @@ -23,6 +23,8 @@ const ( DefaultReConnectBackoffThreshold = time.Hour * 1 DefaultReConnectTotalTimeThreshold = time.Second * 60 * 60 DefaultConnectionTimeout = time.Second * 3 + DefaultBlockCensorshipTimeout = time.Second * 30 + DefaultMinimalReconnectInterval = time.Millisecond * 100 ) // DeliverServiceConfig is the struct that defines the deliverservice configuration. @@ -42,6 +44,10 @@ type DeliverServiceConfig struct { KeepaliveOptions comm.KeepaliveOptions // SecOpts provides the TLS info for connections SecOpts comm.SecureOptions + // If a certain header from a header reciever is in front of the block receiver for more than this time, a censorship event is declared and the block source is changed + BlockCensorshipTimeout time.Duration + // The initial value of the actual retry interval, which is increased on every failed retry + MinimalReconnectInterval time.Duration // OrdererEndpointOverrides is a map of orderer addresses which should be // re-mapped to a different orderer endpoint. @@ -124,6 +130,24 @@ func (c *DeliverServiceConfig) loadDeliverServiceConfig() { } c.BlockGossipEnabled = enabledConfigOptionMissing || viper.GetBool(enabledKey) + blockCensorshipTimeout := "peer.deliveryclient.blockCensorshipTimeout" + blockCensorshipTimeoutOptionMissing := !viper.IsSet(blockCensorshipTimeout) + if blockCensorshipTimeoutOptionMissing { + c.BlockCensorshipTimeout = DefaultBlockCensorshipTimeout + logger.Infof("peer.deliveryclient.blockCensorshipTimeout is not set, defaulting to 30s.") + } else { + c.BlockCensorshipTimeout = viper.GetDuration(blockCensorshipTimeout) + } + + minimalReconnectInterval := "peer.deliveryclient.minimalReconnectInterval" + MinimalReconnectIntervalOptionMissing := !viper.IsSet(minimalReconnectInterval) + if MinimalReconnectIntervalOptionMissing { + c.MinimalReconnectInterval = DefaultMinimalReconnectInterval + logger.Infof("peer.deliveryclient.minimalReconnectInterval is not set, defaulting to 100ms.") + } else { + c.MinimalReconnectInterval = viper.GetDuration(minimalReconnectInterval) + } + c.PeerTLSEnabled = viper.GetBool("peer.tls.enabled") c.ReConnectBackoffThreshold = viper.GetDuration("peer.deliveryclient.reConnectBackoffThreshold") diff --git a/core/deliverservice/config_test.go b/core/deliverservice/config_test.go index ddaa069e035..6f09cebef66 100644 --- a/core/deliverservice/config_test.go +++ b/core/deliverservice/config_test.go @@ -87,6 +87,8 @@ func TestGlobalConfig(t *testing.T) { viper.Set("peer.deliveryclient.connTimeout", "10s") viper.Set("peer.keepalive.deliveryClient.interval", "5s") viper.Set("peer.keepalive.deliveryClient.timeout", "2s") + viper.Set("peer.deliveryClient.blockCensorshipTimeout", "40s") + viper.Set("peer.deliveryClient.minimalReconnectInterval", "100ms") coreConfig := deliverservice.GlobalConfig() @@ -106,6 +108,8 @@ func TestGlobalConfig(t *testing.T) { SecOpts: comm.SecureOptions{ UseTLS: true, }, + BlockCensorshipTimeout: time.Second * 40, + MinimalReconnectInterval: time.Millisecond * 100, } require.Equal(t, expectedConfig, coreConfig) @@ -124,6 +128,8 @@ func TestGlobalConfigDefault(t *testing.T) { ReconnectTotalTimeThreshold: deliverservice.DefaultReConnectTotalTimeThreshold, ConnectionTimeout: deliverservice.DefaultConnectionTimeout, KeepaliveOptions: comm.DefaultKeepaliveOptions, + BlockCensorshipTimeout: deliverservice.DefaultBlockCensorshipTimeout, + MinimalReconnectInterval: deliverservice.DefaultMinimalReconnectInterval, } require.Equal(t, expectedConfig, coreConfig) diff --git a/core/deliverservice/deliveryclient.go b/core/deliverservice/deliveryclient.go index 9dabaa3f900..ff6f10b7b99 100644 --- a/core/deliverservice/deliveryclient.go +++ b/core/deliverservice/deliveryclient.go @@ -148,7 +148,7 @@ func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo b } d.channelID = chainID - + fmt.Printf("\n\n\n<<<<<<<<< we are in StartDeliverForChannel: we are going to DeliverBlocks >>>>>> \n\n\n") go func() { d.blockDeliverer.DeliverBlocks() finalizer() @@ -224,14 +224,17 @@ func (d *deliverServiceImpl) createBlockDelivererBFT(chainID string, ledgerInfo DeliverStreamer: blocksprovider.DeliverAdapter{}, CensorshipDetectorFactory: &blocksprovider.BFTCensorshipMonitorFactory{}, Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID), - InitialRetryInterval: 100 * time.Millisecond, // TODO expose in config + InitialRetryInterval: d.conf.DeliverServiceConfig.MinimalReconnectInterval, MaxRetryInterval: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold, - BlockCensorshipTimeout: 30 * time.Second, // TODO expose in config - MaxRetryDuration: 12 * time.Hour, // In v3 block gossip is no longer supported. We set it long to avoid needlessly calling the handler. + BlockCensorshipTimeout: d.conf.DeliverServiceConfig.BlockCensorshipTimeout, + MaxRetryDuration: 12 * time.Hour, // In v3 block gossip is no longer supported. We set it long to avoid needlessly calling the handler. MaxRetryDurationExceededHandler: func() (stopRetries bool) { return false // In v3 block gossip is no longer supported, the peer never stops retrying. }, } + fmt.Printf("\n\n\n <<<<<<<<<<<<<<< createBlockDelivererBFT >>>>>>>>>>>>>>> \n\n\n") + fmt.Printf("\n\n\n!!! dcBFT.BlockCensorshipTimeout is: !!! %v\n\n\n", dcBFT.BlockCensorshipTimeout) + fmt.Printf("\n\n\n!!! dcBFT.InitialRetryInterval is: !!! %v\n\n\n", dcBFT.InitialRetryInterval) if d.conf.DeliverServiceConfig.SecOpts.RequireClientCert { cert, err := d.conf.DeliverServiceConfig.SecOpts.ClientCertificate() diff --git a/integration/nwo/template/core_template.go b/integration/nwo/template/core_template.go index bbcb54b470a..ceaa093bdbe 100644 --- a/integration/nwo/template/core_template.go +++ b/integration/nwo/template/core_template.go @@ -114,6 +114,11 @@ peer: localMspId: {{ (.Organization Peer.Organization).MSPID }} deliveryclient: reconnectTotalTimeThreshold: 3600s + blockGossipEnabled: true + connTimeout: 3s + reConnectBackoffThreshold: 3600s + blockCensorshipTimeout: 20s + minimalReconnectInterval: 110ms localMspType: bccsp profile: enabled: false diff --git a/internal/peer/gossip/mcs.go b/internal/peer/gossip/mcs.go index b46df8b6abe..ac008d655ca 100644 --- a/internal/peer/gossip/mcs.go +++ b/internal/peer/gossip/mcs.go @@ -136,6 +136,7 @@ func (s *MSPMessageCryptoService) VerifyBlock(chainID common.ChannelID, seqNum u } // - Extract channelID and compare with chainID + fmt.Printf("\n\n\n!!!we are inside verifyBlock!!! %v\n\n\n", block) channelID, err := protoutil.GetChannelIDFromBlock(block) if err != nil { return fmt.Errorf("Failed getting channel id from block with id [%d] on channel [%s]: [%s]", block.Header.Number, chainID, err) diff --git a/internal/pkg/peer/blocksprovider/bft_deliverer.go b/internal/pkg/peer/blocksprovider/bft_deliverer.go index f328019dd45..3cd69a22f6c 100644 --- a/internal/pkg/peer/blocksprovider/bft_deliverer.go +++ b/internal/pkg/peer/blocksprovider/bft_deliverer.go @@ -134,6 +134,7 @@ func (d *BFTDeliverer) DeliverBlocks() { MaxRetryInterval: d.MaxRetryInterval, BlockCensorshipTimeout: d.BlockCensorshipTimeout, } + fmt.Printf("\n\n\n!!!we are inside DeliverBlocks. timeoutConfig.BlockCensorshipTimeout is: !!! %v\n\n\n", timeoutConfig.BlockCensorshipTimeout) // Refresh and randomize the sources, selects a random initial source, and incurs a random iteration order. d.refreshSources() @@ -171,6 +172,8 @@ FetchAndMonitorLoop: } // start a block fetcher and a monitor // a buffered channel so that the fetcher goroutine can send an error and exit w/o waiting for it to be consumed. + fmt.Printf("\n\n\n!!! we are before FetchBlocks. !!!!\n\n\n") + d.fetchErrorsC = make(chan error, 1) source := d.fetchSources[d.fetchSourceIndex] go d.FetchBlocks(source) @@ -268,7 +271,8 @@ func (d *BFTDeliverer) FetchBlocks(source *orderers.Endpoint) { return default: } - + fmt.Printf("\n\n\n <<<<<<<<<<<<<<< Trying to fetch blocks from orderer: %s\n\n\n", source.Address) + fmt.Printf("\n\n\n <<<<<<<<<<<<<<< Number of Block to get: %v\n\n\n", d.getNextBlockNumber()) seekInfoEnv, err := d.requester.SeekInfoBlocksFrom(d.getNextBlockNumber()) if err != nil { d.Logger.Errorf("Could not create a signed Deliver SeekInfo message, something is critically wrong: %s", err) @@ -277,6 +281,7 @@ func (d *BFTDeliverer) FetchBlocks(source *orderers.Endpoint) { } deliverClient, cancel, err := d.requester.Connect(seekInfoEnv, source) + fmt.Printf("<<<<<<<<<<<<<<>>>>>>>>>>>>>>>", source) if err != nil { d.Logger.Warningf("Could not connect to ordering service: %s", err) d.fetchErrorsC <- errors.Wrapf(err, "could not connect to ordering service, orderer-address: %s", source.Address) @@ -303,6 +308,7 @@ func (d *BFTDeliverer) FetchBlocks(source *orderers.Endpoint) { blockRcv.Start() // Consume blocks fom the `recvC` channel + fmt.Printf("\n\n\n!!!we are before process incoming!!! \n\n\n") if errProc := blockRcv.ProcessIncoming(d.onBlockProcessingSuccess); errProc != nil { switch errProc.(type) { case *ErrStopping: diff --git a/internal/pkg/peer/blocksprovider/block_receiver.go b/internal/pkg/peer/blocksprovider/block_receiver.go index 0ffb7dba6d4..5ee33f4965b 100644 --- a/internal/pkg/peer/blocksprovider/block_receiver.go +++ b/internal/pkg/peer/blocksprovider/block_receiver.go @@ -49,7 +49,10 @@ func (br *BlockReceiver) Start() { br.logger.Infof("BlockReceiver starting") go func() { for { + // fmt.Printf("we get nil from Recv") resp, err := br.deliverClient.Recv() + fmt.Printf("\n\n\n !!! 1-get block from receiver - data of block is %v\n\n\n", resp.GetBlock().GetData()) + fmt.Printf("!!! 2-we are inside start!!! and the data is: %v", resp.GetBlock().GetData()) if err != nil { br.logger.Warningf("Encountered an error reading from deliver stream: %s", err) close(br.recvC) @@ -88,7 +91,7 @@ func (br *BlockReceiver) Stop() { // ProcessIncoming processes incoming messages until stopped or encounters an error. func (br *BlockReceiver) ProcessIncoming(onSuccess func(blockNum uint64)) error { var err error - + fmt.Printf("\n\n\n!!!we are inside process incoming!!! \n\n\n") RecvLoop: // Loop until the endpoint is refreshed, or there is an error on the connection for { select { @@ -103,6 +106,7 @@ RecvLoop: // Loop until the endpoint is refreshed, or there is an error on the c break RecvLoop } var blockNum uint64 + fmt.Printf("!!!we are inside process incoming!!! and the data is from type: %v", response.GetType()) blockNum, err = br.processMsg(response) if err != nil { br.logger.Warningf("Got error while attempting to receive blocks: %v", err) @@ -134,6 +138,7 @@ func (br *BlockReceiver) processMsg(msg *orderer.DeliverResponse) (uint64, error return 0, errors.Errorf("received bad status %v from orderer", t.Status) case *orderer.DeliverResponse_Block: blockNum := t.Block.Header.Number + fmt.Printf("\n\n\n!!!we are inside processMsg. !!! data is: %v\n\n\n", t.Block.Data.GetData()) if err := br.blockVerifier.VerifyBlock(gossipcommon.ChannelID(br.channelID), blockNum, t.Block); err != nil { return 0, errors.WithMessage(err, "block from orderer could not be verified") } diff --git a/internal/pkg/peer/blocksprovider/delivery_requester.go b/internal/pkg/peer/blocksprovider/delivery_requester.go index b13ae0d9248..fc46ae7acc0 100644 --- a/internal/pkg/peer/blocksprovider/delivery_requester.go +++ b/internal/pkg/peer/blocksprovider/delivery_requester.go @@ -8,6 +8,7 @@ package blocksprovider import ( "context" + "fmt" "math" "github.com/hyperledger/fabric-protos-go/common" @@ -46,6 +47,7 @@ func NewDeliveryRequester( // SeekInfoBlocksFrom produces a signed SeekInfo envelope requesting a stream of blocks from a certain block number. func (dr *DeliveryRequester) SeekInfoBlocksFrom(ledgerHeight uint64) (*common.Envelope, error) { + fmt.Printf("\n\n\n <<<<<<<<<<<<<<< SeekInfoBlocksFrom >>>>>>>>>>>>>>> \n\n\n") return protoutil.CreateSignedEnvelopeWithTLSBinding( common.HeaderType_DELIVER_SEEK_INFO, dr.channelID, @@ -60,6 +62,7 @@ func (dr *DeliveryRequester) SeekInfoBlocksFrom(ledgerHeight uint64) (*common.En // SeekInfoHeadersFrom produces a signed SeekInfo envelope requesting a stream of headers (block attestations) from // a certain block number. func (dr *DeliveryRequester) SeekInfoHeadersFrom(ledgerHeight uint64) (*common.Envelope, error) { + fmt.Printf("\n\n\n <<<<<<<<<<<<<<< SeekInfoHeadersFrom >>>>>>>>>>>>>>> \n\n\n") return protoutil.CreateSignedEnvelopeWithTLSBinding( common.HeaderType_DELIVER_SEEK_INFO, dr.channelID, diff --git a/orderer/consensus/smartbft/chain.go b/orderer/consensus/smartbft/chain.go index f62685197b9..744ce2c4770 100644 --- a/orderer/consensus/smartbft/chain.go +++ b/orderer/consensus/smartbft/chain.go @@ -419,8 +419,10 @@ func (c *BFTChain) Deliver(proposal types.Proposal, signatures []types.Signature c.reportIsLeader() // report the leader if protoutil.IsConfigBlock(block) { c.support.WriteConfigBlock(block, nil) + fmt.Printf("/n/n/n writing config block with data: %v", block.GetData()) } else { c.support.WriteBlock(block, nil) + fmt.Printf("/n/n/n writing config block with data: %v", block.GetData()) } reconfig := c.updateRuntimeConfig(block) diff --git a/protoutil/blockutils.go b/protoutil/blockutils.go index 8527869e49e..77bc3d75508 100644 --- a/protoutil/blockutils.go +++ b/protoutil/blockutils.go @@ -83,8 +83,14 @@ func GetChannelIDFromBlockBytes(bytes []byte) (string, error) { // GetChannelIDFromBlock returns channel ID in the block func GetChannelIDFromBlock(block *cb.Block) (string, error) { + fmt.Printf("\n\n\n###1: %v\n\n\n", block) if block == nil || block.Data == nil || block.Data.Data == nil || len(block.Data.Data) == 0 { - return "", errors.New("failed to retrieve channel id - block is empty") + if block != nil { + fmt.Printf("\n\n\n###2 %v\n\n\n", block.Data) + } + panic("!") + // fmt.Printf("\n\n\n###2%v\n\n\n", block.Data) + // return "", errors.New("failed to retrieve channel id - block is empty") } var err error envelope, err := GetEnvelopeFromBlock(block.Data.Data[0]) @@ -103,7 +109,7 @@ func GetChannelIDFromBlock(block *cb.Block) (string, error) { if err != nil { return "", err } - + //fmt.Printf("\n\n\n??? %v\n %v \n\n\n", payload, chdr) return chdr.ChannelId, nil } diff --git a/sampleconfig/core.yaml b/sampleconfig/core.yaml index 7809b001daa..4613e7579be 100644 --- a/sampleconfig/core.yaml +++ b/sampleconfig/core.yaml @@ -386,6 +386,11 @@ peer: # Time between retries will have exponential backoff until hitting this threshold. reConnectBackoffThreshold: 3600s + # If a certain header from a header receiver is in front of the block receiver for more than this time, a censorship event is declared and the block source is changed + blockCensorshipTimeout: 20s + + minimalReconnectInterval: 110ms + # A list of orderer endpoint addresses which should be overridden # when found in channel configurations. addressOverrides: