diff --git a/core/deliverservice/config.go b/core/deliverservice/config.go index ea908f521f9..50d35267f1a 100644 --- a/core/deliverservice/config.go +++ b/core/deliverservice/config.go @@ -23,6 +23,8 @@ const ( DefaultReConnectBackoffThreshold = time.Hour * 1 DefaultReConnectTotalTimeThreshold = time.Second * 60 * 60 DefaultConnectionTimeout = time.Second * 3 + DefaultBlockCensorshipTimeout = time.Second * 30 + DefaultMinimalReconnectInterval = time.Millisecond * 100 ) // DeliverServiceConfig is the struct that defines the deliverservice configuration. @@ -42,6 +44,10 @@ type DeliverServiceConfig struct { KeepaliveOptions comm.KeepaliveOptions // SecOpts provides the TLS info for connections SecOpts comm.SecureOptions + // If a certain header from a header reciever is in front of the block receiver for more than this time, a censorship event is declared and the block source is changed + BlockCensorshipTimeout time.Duration + // The initial value of the actual retry interval, which is increased on every failed retry + MinimalReconnectInterval time.Duration // OrdererEndpointOverrides is a map of orderer addresses which should be // re-mapped to a different orderer endpoint. @@ -124,6 +130,24 @@ func (c *DeliverServiceConfig) loadDeliverServiceConfig() { } c.BlockGossipEnabled = enabledConfigOptionMissing || viper.GetBool(enabledKey) + blockCensorshipTimeout := "peer.deliveryclient.blockCensorshipTimeout" + blockCensorshipTimeoutOptionMissing := !viper.IsSet(blockCensorshipTimeout) + if blockCensorshipTimeoutOptionMissing { + c.BlockCensorshipTimeout = DefaultBlockCensorshipTimeout + logger.Infof("peer.deliveryclient.blockCensorshipTimeout is not set, defaulting to 30s.") + } else { + c.BlockCensorshipTimeout = viper.GetDuration(blockCensorshipTimeout) + } + + minimalReconnectInterval := "peer.deliveryclient.minimalReconnectInterval" + MinimalReconnectIntervalOptionMissing := !viper.IsSet(minimalReconnectInterval) + if MinimalReconnectIntervalOptionMissing { + c.MinimalReconnectInterval = DefaultMinimalReconnectInterval + logger.Infof("peer.deliveryclient.minimalReconnectInterval is not set, defaulting to 100ms.") + } else { + c.MinimalReconnectInterval = viper.GetDuration(minimalReconnectInterval) + } + c.PeerTLSEnabled = viper.GetBool("peer.tls.enabled") c.ReConnectBackoffThreshold = viper.GetDuration("peer.deliveryclient.reConnectBackoffThreshold") diff --git a/core/deliverservice/config_test.go b/core/deliverservice/config_test.go index ddaa069e035..6f09cebef66 100644 --- a/core/deliverservice/config_test.go +++ b/core/deliverservice/config_test.go @@ -87,6 +87,8 @@ func TestGlobalConfig(t *testing.T) { viper.Set("peer.deliveryclient.connTimeout", "10s") viper.Set("peer.keepalive.deliveryClient.interval", "5s") viper.Set("peer.keepalive.deliveryClient.timeout", "2s") + viper.Set("peer.deliveryClient.blockCensorshipTimeout", "40s") + viper.Set("peer.deliveryClient.minimalReconnectInterval", "100ms") coreConfig := deliverservice.GlobalConfig() @@ -106,6 +108,8 @@ func TestGlobalConfig(t *testing.T) { SecOpts: comm.SecureOptions{ UseTLS: true, }, + BlockCensorshipTimeout: time.Second * 40, + MinimalReconnectInterval: time.Millisecond * 100, } require.Equal(t, expectedConfig, coreConfig) @@ -124,6 +128,8 @@ func TestGlobalConfigDefault(t *testing.T) { ReconnectTotalTimeThreshold: deliverservice.DefaultReConnectTotalTimeThreshold, ConnectionTimeout: deliverservice.DefaultConnectionTimeout, KeepaliveOptions: comm.DefaultKeepaliveOptions, + BlockCensorshipTimeout: deliverservice.DefaultBlockCensorshipTimeout, + MinimalReconnectInterval: deliverservice.DefaultMinimalReconnectInterval, } require.Equal(t, expectedConfig, coreConfig) diff --git a/core/deliverservice/deliveryclient.go b/core/deliverservice/deliveryclient.go index 9dabaa3f900..0de1ccf15fd 100644 --- a/core/deliverservice/deliveryclient.go +++ b/core/deliverservice/deliveryclient.go @@ -224,10 +224,10 @@ func (d *deliverServiceImpl) createBlockDelivererBFT(chainID string, ledgerInfo DeliverStreamer: blocksprovider.DeliverAdapter{}, CensorshipDetectorFactory: &blocksprovider.BFTCensorshipMonitorFactory{}, Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID), - InitialRetryInterval: 100 * time.Millisecond, // TODO expose in config + InitialRetryInterval: d.conf.DeliverServiceConfig.MinimalReconnectInterval, MaxRetryInterval: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold, - BlockCensorshipTimeout: 30 * time.Second, // TODO expose in config - MaxRetryDuration: 12 * time.Hour, // In v3 block gossip is no longer supported. We set it long to avoid needlessly calling the handler. + BlockCensorshipTimeout: d.conf.DeliverServiceConfig.BlockCensorshipTimeout, + MaxRetryDuration: 12 * time.Hour, // In v3 block gossip is no longer supported. We set it long to avoid needlessly calling the handler. MaxRetryDurationExceededHandler: func() (stopRetries bool) { return false // In v3 block gossip is no longer supported, the peer never stops retrying. }, diff --git a/integration/nwo/template/core_template.go b/integration/nwo/template/core_template.go index bbcb54b470a..ceaa093bdbe 100644 --- a/integration/nwo/template/core_template.go +++ b/integration/nwo/template/core_template.go @@ -114,6 +114,11 @@ peer: localMspId: {{ (.Organization Peer.Organization).MSPID }} deliveryclient: reconnectTotalTimeThreshold: 3600s + blockGossipEnabled: true + connTimeout: 3s + reConnectBackoffThreshold: 3600s + blockCensorshipTimeout: 20s + minimalReconnectInterval: 110ms localMspType: bccsp profile: enabled: false diff --git a/internal/peer/gossip/mcs.go b/internal/peer/gossip/mcs.go index b46df8b6abe..57606efffa3 100644 --- a/internal/peer/gossip/mcs.go +++ b/internal/peer/gossip/mcs.go @@ -136,6 +136,7 @@ func (s *MSPMessageCryptoService) VerifyBlock(chainID common.ChannelID, seqNum u } // - Extract channelID and compare with chainID + fmt.Printf("\n\n\n!!! %v\n\n\n", block) channelID, err := protoutil.GetChannelIDFromBlock(block) if err != nil { return fmt.Errorf("Failed getting channel id from block with id [%d] on channel [%s]: [%s]", block.Header.Number, chainID, err) diff --git a/protoutil/blockutils.go b/protoutil/blockutils.go index 8527869e49e..d6e91f3ee84 100644 --- a/protoutil/blockutils.go +++ b/protoutil/blockutils.go @@ -83,8 +83,11 @@ func GetChannelIDFromBlockBytes(bytes []byte) (string, error) { // GetChannelIDFromBlock returns channel ID in the block func GetChannelIDFromBlock(block *cb.Block) (string, error) { + fmt.Printf("\n\n\n###1%v\n\n\n", block) if block == nil || block.Data == nil || block.Data.Data == nil || len(block.Data.Data) == 0 { - return "", errors.New("failed to retrieve channel id - block is empty") + panic("!") + // fmt.Printf("\n\n\n###2%v\n\n\n", block.Data) + // return "", errors.New("failed to retrieve channel id - block is empty") } var err error envelope, err := GetEnvelopeFromBlock(block.Data.Data[0]) @@ -103,7 +106,7 @@ func GetChannelIDFromBlock(block *cb.Block) (string, error) { if err != nil { return "", err } - + fmt.Printf("\n\n\n??? %v\n %v \n\n\n", payload, chdr) return chdr.ChannelId, nil } diff --git a/sampleconfig/core.yaml b/sampleconfig/core.yaml index 7809b001daa..4613e7579be 100644 --- a/sampleconfig/core.yaml +++ b/sampleconfig/core.yaml @@ -386,6 +386,11 @@ peer: # Time between retries will have exponential backoff until hitting this threshold. reConnectBackoffThreshold: 3600s + # If a certain header from a header receiver is in front of the block receiver for more than this time, a censorship event is declared and the block source is changed + blockCensorshipTimeout: 20s + + minimalReconnectInterval: 110ms + # A list of orderer endpoint addresses which should be overridden # when found in channel configurations. addressOverrides: