Skip to content

Commit

Permalink
expose BlockCensorshipTimeout in config - stage 1
Browse files Browse the repository at this point in the history
Signed-off-by: May Rosenbaum <[email protected]>
  • Loading branch information
MayRosenbaum committed Sep 18, 2023
1 parent 58ecff9 commit dbf2467
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 4 deletions.
24 changes: 24 additions & 0 deletions core/deliverservice/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
DefaultReConnectBackoffThreshold = time.Hour * 1
DefaultReConnectTotalTimeThreshold = time.Second * 60 * 60
DefaultConnectionTimeout = time.Second * 3
DefaultBlockCensorshipTimeout = time.Second * 30
DefaultMinimalReconnectInterval = time.Millisecond * 100
)

// DeliverServiceConfig is the struct that defines the deliverservice configuration.
Expand All @@ -42,6 +44,10 @@ type DeliverServiceConfig struct {
KeepaliveOptions comm.KeepaliveOptions
// SecOpts provides the TLS info for connections
SecOpts comm.SecureOptions
// If a certain header from a header reciever is in front of the block receiver for more than this time, a censorship event is declared and the block source is changed
BlockCensorshipTimeout time.Duration
// The initial value of the actual retry interval, which is increased on every failed retry
MinimalReconnectInterval time.Duration

// OrdererEndpointOverrides is a map of orderer addresses which should be
// re-mapped to a different orderer endpoint.
Expand Down Expand Up @@ -124,6 +130,24 @@ func (c *DeliverServiceConfig) loadDeliverServiceConfig() {
}
c.BlockGossipEnabled = enabledConfigOptionMissing || viper.GetBool(enabledKey)

blockCensorshipTimeout := "peer.deliveryclient.blockCensorshipTimeout"
blockCensorshipTimeoutOptionMissing := !viper.IsSet(blockCensorshipTimeout)
if blockCensorshipTimeoutOptionMissing {
c.BlockCensorshipTimeout = DefaultBlockCensorshipTimeout
logger.Infof("peer.deliveryclient.blockCensorshipTimeout is not set, defaulting to 30s.")
} else {
c.BlockCensorshipTimeout = viper.GetDuration(blockCensorshipTimeout)
}

minimalReconnectInterval := "peer.deliveryclient.minimalReconnectInterval"
MinimalReconnectIntervalOptionMissing := !viper.IsSet(minimalReconnectInterval)
if MinimalReconnectIntervalOptionMissing {
c.MinimalReconnectInterval = DefaultMinimalReconnectInterval
logger.Infof("peer.deliveryclient.minimalReconnectInterval is not set, defaulting to 100ms.")
} else {
c.MinimalReconnectInterval = viper.GetDuration(minimalReconnectInterval)
}

c.PeerTLSEnabled = viper.GetBool("peer.tls.enabled")

c.ReConnectBackoffThreshold = viper.GetDuration("peer.deliveryclient.reConnectBackoffThreshold")
Expand Down
6 changes: 6 additions & 0 deletions core/deliverservice/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func TestGlobalConfig(t *testing.T) {
viper.Set("peer.deliveryclient.connTimeout", "10s")
viper.Set("peer.keepalive.deliveryClient.interval", "5s")
viper.Set("peer.keepalive.deliveryClient.timeout", "2s")
viper.Set("peer.deliveryClient.blockCensorshipTimeout", "40s")
viper.Set("peer.deliveryClient.minimalReconnectInterval", "100ms")

coreConfig := deliverservice.GlobalConfig()

Expand All @@ -106,6 +108,8 @@ func TestGlobalConfig(t *testing.T) {
SecOpts: comm.SecureOptions{
UseTLS: true,
},
BlockCensorshipTimeout: time.Second * 40,
MinimalReconnectInterval: time.Millisecond * 100,
}

require.Equal(t, expectedConfig, coreConfig)
Expand All @@ -124,6 +128,8 @@ func TestGlobalConfigDefault(t *testing.T) {
ReconnectTotalTimeThreshold: deliverservice.DefaultReConnectTotalTimeThreshold,
ConnectionTimeout: deliverservice.DefaultConnectionTimeout,
KeepaliveOptions: comm.DefaultKeepaliveOptions,
BlockCensorshipTimeout: deliverservice.DefaultBlockCensorshipTimeout,
MinimalReconnectInterval: deliverservice.DefaultMinimalReconnectInterval,
}

require.Equal(t, expectedConfig, coreConfig)
Expand Down
6 changes: 3 additions & 3 deletions core/deliverservice/deliveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,10 @@ func (d *deliverServiceImpl) createBlockDelivererBFT(chainID string, ledgerInfo
DeliverStreamer: blocksprovider.DeliverAdapter{},
CensorshipDetectorFactory: &blocksprovider.BFTCensorshipMonitorFactory{},
Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID),
InitialRetryInterval: 100 * time.Millisecond, // TODO expose in config
InitialRetryInterval: d.conf.DeliverServiceConfig.MinimalReconnectInterval,
MaxRetryInterval: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold,
BlockCensorshipTimeout: 30 * time.Second, // TODO expose in config
MaxRetryDuration: 12 * time.Hour, // In v3 block gossip is no longer supported. We set it long to avoid needlessly calling the handler.
BlockCensorshipTimeout: d.conf.DeliverServiceConfig.BlockCensorshipTimeout,
MaxRetryDuration: 12 * time.Hour, // In v3 block gossip is no longer supported. We set it long to avoid needlessly calling the handler.
MaxRetryDurationExceededHandler: func() (stopRetries bool) {
return false // In v3 block gossip is no longer supported, the peer never stops retrying.
},
Expand Down
5 changes: 5 additions & 0 deletions integration/nwo/template/core_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ peer:
localMspId: {{ (.Organization Peer.Organization).MSPID }}
deliveryclient:
reconnectTotalTimeThreshold: 3600s
blockGossipEnabled: true
connTimeout: 3s
reConnectBackoffThreshold: 3600s
blockCensorshipTimeout: 20s
minimalReconnectInterval: 110ms
localMspType: bccsp
profile:
enabled: false
Expand Down
1 change: 1 addition & 0 deletions internal/peer/gossip/mcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func (s *MSPMessageCryptoService) VerifyBlock(chainID common.ChannelID, seqNum u
}

// - Extract channelID and compare with chainID
fmt.Printf("\n\n\n!!! %v\n\n\n", block)
channelID, err := protoutil.GetChannelIDFromBlock(block)
if err != nil {
return fmt.Errorf("Failed getting channel id from block with id [%d] on channel [%s]: [%s]", block.Header.Number, chainID, err)
Expand Down
5 changes: 4 additions & 1 deletion protoutil/blockutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ func GetChannelIDFromBlockBytes(bytes []byte) (string, error) {

// GetChannelIDFromBlock returns channel ID in the block
func GetChannelIDFromBlock(block *cb.Block) (string, error) {
fmt.Printf("\n\n\n###1%v\n\n\n", block)
if block == nil || block.Data == nil || block.Data.Data == nil || len(block.Data.Data) == 0 {
fmt.Printf("\n\n\n###2%v\n\n\n", block.Data)
panic("!")
return "", errors.New("failed to retrieve channel id - block is empty")
}
var err error
Expand All @@ -103,7 +106,7 @@ func GetChannelIDFromBlock(block *cb.Block) (string, error) {
if err != nil {
return "", err
}

fmt.Printf("\n\n\n??? %v\n %v \n\n\n", payload, chdr)
return chdr.ChannelId, nil
}

Expand Down
5 changes: 5 additions & 0 deletions sampleconfig/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,11 @@ peer:
# Time between retries will have exponential backoff until hitting this threshold.
reConnectBackoffThreshold: 3600s

# If a certain header from a header receiver is in front of the block receiver for more than this time, a censorship event is declared and the block source is changed
blockCensorshipTimeout: 20s

minimalReconnectInterval: 110ms

# A list of orderer endpoint addresses which should be overridden
# when found in channel configurations.
addressOverrides:
Expand Down

0 comments on commit dbf2467

Please sign in to comment.