Skip to content

Commit

Permalink
[FAB-5265] Rm blockcutter message validation
Browse files Browse the repository at this point in the history
The primary goal of the series in FAB-5258 is to prevent all OSNs from
having to validate all messages for all channels.  Since all messages
pass through the block cutter, the block cutter cannot be involved in
message validation as it currently is.

This CR pulls the message validation out of the blockcutter and pushes
it into the msgprocessor definitions.  Ultimately, the msgprocessor
interfaces will only be called if necessary, eliminating the performance
bottleneck.

Change-Id: I3c0d41e47873aa6e764c70fd176722306f00655c
Signed-off-by: Jason Yellick <[email protected]>
  • Loading branch information
Jason Yellick committed Jul 27, 2017
1 parent 258b25c commit ed9517e
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 144 deletions.
16 changes: 2 additions & 14 deletions orderer/common/blockcutter/blockcutter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,15 @@ type Receiver interface {

type receiver struct {
sharedConfigManager config.Orderer
filters *filter.RuleSet
pendingBatch []*cb.Envelope
pendingBatchSizeBytes uint32
pendingCommitters []filter.Committer
}

// NewReceiverImpl creates a Receiver implementation based on the given configtxorderer manager and filters
func NewReceiverImpl(sharedConfigManager config.Orderer, filters *filter.RuleSet) Receiver {
func NewReceiverImpl(sharedConfigManager config.Orderer) Receiver {
return &receiver{
sharedConfigManager: sharedConfigManager,
filters: filters,
}
}

Expand All @@ -83,17 +81,7 @@ func NewReceiverImpl(sharedConfigManager config.Orderer, filters *filter.RuleSet
// - The current message will cause the pending batch size in bytes to exceed BatchSize.PreferredMaxBytes.
// - After adding the current message to the pending batch, the message count has reached BatchSize.MaxMessageCount.
func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, bool) {
// The messages must be filtered a second time in case configuration has changed since the message was received
committer, err := r.filters.Apply(msg)
if err != nil {
logger.Debugf("Rejecting message: %s", err)
return nil, false
}

if committer.Isolated() {
logger.Panicf("The use of isolated committers has been deprecated and should no longer appear in this path")
}

// The messages are not filtered a second time, this is pushed onto the Consenter
messageSizeBytes := messageSizeBytes(msg)

if messageSizeBytes > r.sharedConfigManager.BatchSize().PreferredMaxBytes {
Expand Down
121 changes: 3 additions & 118 deletions orderer/common/blockcutter/blockcutter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ limitations under the License.
package blockcutter

import (
"bytes"
"testing"

mockconfig "github.com/hyperledger/fabric/common/mocks/config"
"github.com/hyperledger/fabric/orderer/common/filter"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"

Expand All @@ -33,59 +31,14 @@ func init() {
logging.SetLevel(logging.DEBUG, "")
}

type isolatedCommitter struct{}

func (ic isolatedCommitter) Isolated() bool { return true }

func (ic isolatedCommitter) Commit() {}

type mockIsolatedFilter struct{}

func (mif *mockIsolatedFilter) Apply(msg *cb.Envelope) (filter.Action, filter.Committer) {
if bytes.Equal(msg.Payload, isolatedTx.Payload) {
return filter.Accept, isolatedCommitter{}
}
return filter.Forward, nil
}

type mockRejectFilter struct{}

func (mrf mockRejectFilter) Apply(message *cb.Envelope) (filter.Action, filter.Committer) {
if bytes.Equal(message.Payload, badTx.Payload) {
return filter.Reject, nil
}
return filter.Forward, nil
}

type mockAcceptFilter struct{}

func (mrf mockAcceptFilter) Apply(message *cb.Envelope) (filter.Action, filter.Committer) {
if bytes.Equal(message.Payload, goodTx.Payload) {
return filter.Accept, filter.NoopCommitter
}
return filter.Forward, nil
}

func getFilters() *filter.RuleSet {
return filter.NewRuleSet([]filter.Rule{
&mockIsolatedFilter{},
&mockRejectFilter{},
&mockAcceptFilter{},
})
}

var badTx = &cb.Envelope{Payload: []byte("BAD")}
var goodTx = &cb.Envelope{Payload: []byte("GOOD")}
var goodTxLarge = &cb.Envelope{Payload: []byte("GOOD"), Signature: make([]byte, 1000)}
var isolatedTx = &cb.Envelope{Payload: []byte("ISOLATED")}
var unmatchedTx = &cb.Envelope{Payload: []byte("UNMATCHED")}

func TestNormalBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}})

batches, ok := r.Ordered(goodTx)
assert.Nil(t, batches, "Should not have created batch")
Expand All @@ -96,73 +49,7 @@ func TestNormalBatch(t *testing.T) {
assert.True(t, ok, "Should have enqueued second message into batch")
}

func TestBadMessageInBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)

batches, ok := r.Ordered(badTx)
assert.Nil(t, batches, "Should not have created batch")
assert.False(t, ok, "Should not have enqueued bad message into batch")

batches, ok = r.Ordered(goodTx)
assert.Nil(t, batches, "Should not have created batch")
assert.True(t, ok, "Should have enqueued good message into batch")

batches, ok = r.Ordered(badTx)
assert.Nil(t, batches, "Should not have created batch")
assert.False(t, ok, "Should not have enqueued second bad message into batch")
}

func TestUnmatchedMessageInBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)

batches, ok := r.Ordered(unmatchedTx)
assert.Nil(t, batches, "Should not have created batch")
assert.False(t, ok, "Should not have enqueued unmatched message into batch")

batches, ok = r.Ordered(goodTx)
assert.Nil(t, batches, "Should not have created batch")
assert.True(t, ok, "Should have enqueued good message into batch")

batches, ok = r.Ordered(unmatchedTx)
assert.Nil(t, batches, "Should not have created batch from unmatched message")
assert.False(t, ok, "Should not have enqueued second bad message into batch")
}

func TestIsolatedEmptyBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)

assert.Panics(t, func() { r.Ordered(isolatedTx) }, "Should not have handled an isolated by committer message")
}

func TestIsolatedPartialBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)

batches, ok := r.Ordered(goodTx)
assert.Nil(t, batches, "Should not have created batch")
assert.True(t, ok, "Should have enqueued good message into batch")

assert.Panics(t, func() { r.Ordered(isolatedTx) }, "Should not have handled an isolated by committer message")
}

func TestBatchSizePreferredMaxBytesOverflow(t *testing.T) {
filters := getFilters()

goodTxBytes := messageSizeBytes(goodTx)

// set preferred max bytes such that 10 goodTx will not fit
Expand All @@ -171,7 +58,7 @@ func TestBatchSizePreferredMaxBytesOverflow(t *testing.T) {
// set message count > 9
maxMessageCount := uint32(20)

r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 2, PreferredMaxBytes: preferredMaxBytes}}, filters)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 2, PreferredMaxBytes: preferredMaxBytes}})

// enqueue 9 messages
for i := 0; i < 9; i++ {
Expand All @@ -194,8 +81,6 @@ func TestBatchSizePreferredMaxBytesOverflow(t *testing.T) {
}

func TestBatchSizePreferredMaxBytesOverflowNoPending(t *testing.T) {
filters := getFilters()

goodTxLargeBytes := messageSizeBytes(goodTxLarge)

// set preferred max bytes such that 1 goodTxLarge will not fit
Expand All @@ -204,7 +89,7 @@ func TestBatchSizePreferredMaxBytesOverflowNoPending(t *testing.T) {
// set message count > 1
maxMessageCount := uint32(20)

r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 3, PreferredMaxBytes: preferredMaxBytes}}, filters)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 3, PreferredMaxBytes: preferredMaxBytes}})

// submit large message
batches, ok := r.Ordered(goodTxLarge)
Expand Down
2 changes: 1 addition & 1 deletion orderer/common/multichannel/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func newChainSupport(
signer crypto.LocalSigner,
) *ChainSupport {

cutter := blockcutter.NewReceiverImpl(ledgerResources.SharedConfig(), filters)
cutter := blockcutter.NewReceiverImpl(ledgerResources.SharedConfig())
consenterType := ledgerResources.SharedConfig().ConsensusType()
consenter, ok := consenters[consenterType]
if !ok {
Expand Down
18 changes: 12 additions & 6 deletions orderer/consensus/kafka/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,21 +379,27 @@ func processRegular(regularMessage *ab.KafkaMessageRegular, support consensus.Co
}
switch class {
case msgprocessor.ConfigUpdateMsg:
batch := support.BlockCutter().Cut()
if batch != nil {
block := support.CreateNextBlock(batch)
support.WriteBlock(block, nil)
}

_, err := support.ProcessNormalMsg(env)
if err != nil {
logger.Warningf("[channel: %s] Discarding bad config message: %s", support.ChainID(), err)
break
}

batch := support.BlockCutter().Cut()
if batch != nil {
block := support.CreateNextBlock(batch)
support.WriteBlock(block, nil)
}
block := support.CreateNextBlock([]*cb.Envelope{env})
support.WriteConfigBlock(block, nil)
*timer = nil
case msgprocessor.NormalMsg:
_, err := support.ProcessNormalMsg(env)
if err != nil {
logger.Warningf("Discarding bad normal message: %s", err)
break
}

batches, ok := support.BlockCutter().Ordered(env)
logger.Debugf("[channel: %s] Ordering results: items in batch = %d, ok = %v", support.ChainID(), len(batches), ok)
if ok && len(batches) == 0 && *timer == nil {
Expand Down
17 changes: 12 additions & 5 deletions orderer/consensus/solo/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,28 @@ func (ch *chain) main() {
}
switch class {
case msgprocessor.ConfigUpdateMsg:
_, err := ch.support.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("Discarding bad config message: %s", err)
continue
}

batch := ch.support.BlockCutter().Cut()
if batch != nil {
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, nil)
}

_, err := ch.support.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("Discarding bad config message: %s", err)
continue
}
block := ch.support.CreateNextBlock([]*cb.Envelope{msg})
ch.support.WriteConfigBlock(block, nil)
timer = nil
case msgprocessor.NormalMsg:
_, err := ch.support.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("Discarding bad normal message: %s", err)
continue
}

batches, ok := ch.support.BlockCutter().Ordered(msg)
if ok && len(batches) == 0 && timer == nil {
timer = time.After(ch.support.SharedConfig().BatchTimeout())
Expand Down

0 comments on commit ed9517e

Please sign in to comment.