Skip to content

Commit

Permalink
[FAB-5269] Rm filter committers entirely
Browse files Browse the repository at this point in the history
The filter committers are no longer needed, as the blockcutter no longer
returns them.

This CR moves the small bit of remaining logic from the filter
committers into chainsupport.go and then removes the entire concept of
the filter.Committer from the codebase.

Change-Id: I4471ebe4b26c17669afc53fcef30b4a393276f14
Signed-off-by: Jason Yellick <[email protected]>
  • Loading branch information
Jason Yellick committed Jul 27, 2017
1 parent f5e25a3 commit 9018aea
Show file tree
Hide file tree
Showing 17 changed files with 126 additions and 217 deletions.
14 changes: 3 additions & 11 deletions orderer/common/blockcutter/blockcutter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package blockcutter

import (
"github.com/hyperledger/fabric/common/config"
"github.com/hyperledger/fabric/orderer/common/filter"
cb "github.com/hyperledger/fabric/protos/common"

"github.com/op/go-logging"
Expand Down Expand Up @@ -53,10 +52,9 @@ type receiver struct {
sharedConfigManager config.Orderer
pendingBatch []*cb.Envelope
pendingBatchSizeBytes uint32
pendingCommitters []filter.Committer
}

// NewReceiverImpl creates a Receiver implementation based on the given configtxorderer manager and filters
// NewReceiverImpl creates a Receiver implementation based on the given configtxorderer manager
func NewReceiverImpl(sharedConfigManager config.Orderer) Receiver {
return &receiver{
sharedConfigManager: sharedConfigManager,
Expand All @@ -67,17 +65,11 @@ func NewReceiverImpl(sharedConfigManager config.Orderer) Receiver {
// If the current message valid, and no batches need to be cut:
// - Ordered will return nil, nil, and true (indicating ok).
// If the current message valid, and batches need to be cut:
// - Ordered will return 1 or 2 batches of messages, 1 or 2 batches of committers, and true (indicating ok).
// - Ordered will return 1 or 2 batches of messages and true (indicating ok).
// If the current message is invalid:
// - Ordered will return nil, nil, and false (to indicate not ok).
//
// Given a valid message, if the current message needs to be isolated (as determined during filtering).
// - Ordered will return:
// * The pending batch of (if not empty), and a second batch containing only the isolated message.
// * The corresponding batches of committers.
// * true (indicating ok).
// Otherwise, given a valid message, the pending batch, if not empty, will be cut and returned if:
// - The current message needs to be isolated (as determined during filtering).
// Given a valid message, the pending batch, if not empty, will be cut and returned if:
// - The current message will cause the pending batch size in bytes to exceed BatchSize.PreferredMaxBytes.
// - After adding the current message to the pending batch, the message count has reached BatchSize.MaxMessageCount.
func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, bool) {
Expand Down
39 changes: 9 additions & 30 deletions orderer/common/configtxfilter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package configtxfilter

import (
"fmt"

"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/configtx/api"
"github.com/hyperledger/fabric/orderer/common/filter"
Expand All @@ -30,61 +28,42 @@ type configFilter struct {
configManager api.Manager
}

// New creates a new configfilter Rule based on the given Manager
// NewFilter creates a new configfilter Rule based on the given Manager
func NewFilter(manager api.Manager) filter.Rule {
return &configFilter{
configManager: manager,
}
}

type configCommitter struct {
manager api.Manager
configEnvelope *cb.ConfigEnvelope
}

func (cc *configCommitter) Commit() {
err := cc.manager.Apply(cc.configEnvelope)
if err != nil {
panic(fmt.Errorf("Could not apply config transaction which should have already been validated: %s", err))
}
}

func (cc *configCommitter) Isolated() bool {
return true
}

// Apply applies the rule to the given Envelope, replying with the Action to take for the message
func (cf *configFilter) Apply(message *cb.Envelope) (filter.Action, filter.Committer) {
func (cf *configFilter) Apply(message *cb.Envelope) filter.Action {
msgData, err := utils.UnmarshalPayload(message.Payload)
if err != nil {
return filter.Forward, nil
return filter.Forward
}

if msgData.Header == nil /* || msgData.Header.ChannelHeader == nil */ {
return filter.Forward, nil
return filter.Forward
}

chdr, err := utils.UnmarshalChannelHeader(msgData.Header.ChannelHeader)
if err != nil {
return filter.Forward, nil
return filter.Forward
}

if chdr.Type != int32(cb.HeaderType_CONFIG) {
return filter.Forward, nil
return filter.Forward
}

configEnvelope, err := configtx.UnmarshalConfigEnvelope(msgData.Data)
if err != nil {
return filter.Reject, nil
return filter.Reject
}

err = cf.configManager.Validate(configEnvelope)
if err != nil {
return filter.Reject, nil
return filter.Reject
}

return filter.Accept, &configCommitter{
manager: cf.configManager,
configEnvelope: configEnvelope,
}
return filter.Accept
}
50 changes: 7 additions & 43 deletions orderer/common/configtxfilter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ import (

func TestForwardOpaquePayload(t *testing.T) {
cf := NewFilter(&mockconfigtx.Manager{})
result, _ := cf.Apply(&cb.Envelope{
result := cf.Apply(&cb.Envelope{
Payload: []byte("Opaque"),
})
assert.EqualValues(t, filter.Forward, result, "Should have forwarded opaque message")
}

func TestForwardNilHeader(t *testing.T) {
cf := NewFilter(&mockconfigtx.Manager{})
result, _ := cf.Apply(&cb.Envelope{
result := cf.Apply(&cb.Envelope{
Payload: utils.MarshalOrPanic(&cb.Payload{
Header: nil,
}),
Expand All @@ -49,7 +49,7 @@ func TestForwardNilHeader(t *testing.T) {

func TestForwardBadHeader(t *testing.T) {
cf := NewFilter(&mockconfigtx.Manager{})
result, _ := cf.Apply(&cb.Envelope{
result := cf.Apply(&cb.Envelope{
Payload: utils.MarshalOrPanic(&cb.Payload{
Header: &cb.Header{ChannelHeader: []byte("Hello, world!")},
}),
Expand All @@ -59,7 +59,7 @@ func TestForwardBadHeader(t *testing.T) {

func TestForwardNonConfig(t *testing.T) {
cf := NewFilter(&mockconfigtx.Manager{})
result, _ := cf.Apply(&cb.Envelope{
result := cf.Apply(&cb.Envelope{
Payload: utils.MarshalOrPanic(&cb.Payload{
Header: &cb.Header{ChannelHeader: []byte{}},
}),
Expand All @@ -69,7 +69,7 @@ func TestForwardNonConfig(t *testing.T) {

func TestRejectMalformedData(t *testing.T) {
cf := NewFilter(&mockconfigtx.Manager{})
result, _ := cf.Apply(&cb.Envelope{
result := cf.Apply(&cb.Envelope{
Payload: utils.MarshalOrPanic(&cb.Payload{
Header: &cb.Header{
ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{
Expand Down Expand Up @@ -107,51 +107,15 @@ func TestAcceptGoodConfig(t *testing.T) {
configEnvelope := &cb.Envelope{
Payload: configBytes,
}
result, committer := cf.Apply(configEnvelope)
result := cf.Apply(configEnvelope)
assert.EqualValues(t, filter.Accept, result, "Should have indicated a good config message causes a reconfig")
assert.True(t, committer.Isolated(), "Config transactions should be isolated to their own block")

committer.Commit()
assert.Equal(t, mcm.AppliedConfigUpdateEnvelope, configEnv, "Should have applied new config on commit got %+v and %+v", mcm.AppliedConfigUpdateEnvelope, configEnv.LastUpdate)
}

func TestPanicApplyingValidatedConfig(t *testing.T) {
mcm := &mockconfigtx.Manager{ApplyVal: fmt.Errorf("Error applying config tx")}
cf := NewFilter(mcm)
configGroup := cb.NewConfigGroup()
configGroup.Values["Foo"] = &cb.ConfigValue{}
configUpdateEnv := &cb.ConfigUpdateEnvelope{
ConfigUpdate: utils.MarshalOrPanic(configGroup),
}
configEnv := &cb.ConfigEnvelope{
LastUpdate: &cb.Envelope{
Payload: utils.MarshalOrPanic(&cb.Payload{
Header: &cb.Header{
ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{
Type: int32(cb.HeaderType_CONFIG_UPDATE),
}),
},
Data: utils.MarshalOrPanic(configUpdateEnv),
}),
},
}
configEnvBytes := utils.MarshalOrPanic(configEnv)
configBytes := utils.MarshalOrPanic(&cb.Payload{Header: &cb.Header{ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{Type: int32(cb.HeaderType_CONFIG)})}, Data: configEnvBytes})
configEnvelope := &cb.Envelope{
Payload: configBytes,
}
result, committer := cf.Apply(configEnvelope)

assert.EqualValues(t, filter.Accept, result, "Should have indicated a good config message causes a reconfig")
assert.True(t, committer.Isolated(), "Config transactions should be isolated to their own block")
assert.Panics(t, func() { committer.Commit() }, "Should panic upon error applying a validated config tx")
}

func TestRejectBadConfig(t *testing.T) {
cf := NewFilter(&mockconfigtx.Manager{ValidateVal: fmt.Errorf("Error")})
config, _ := proto.Marshal(&cb.ConfigEnvelope{})
configBytes, _ := proto.Marshal(&cb.Payload{Header: &cb.Header{ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{Type: int32(cb.HeaderType_CONFIG)})}, Data: config})
result, _ := cf.Apply(&cb.Envelope{
result := cf.Apply(&cb.Envelope{
Payload: configBytes,
})

Expand Down
6 changes: 2 additions & 4 deletions orderer/common/deliver/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
lastConfigSequence := chain.Sequence()

sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
if result != filter.Forward {
if sf.Apply(envelope) != filter.Forward {
logger.Warningf("[channel: %s] Received unauthorized deliver request", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}
Expand Down Expand Up @@ -186,8 +185,7 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
if currentConfigSequence > lastConfigSequence {
lastConfigSequence = currentConfigSequence
sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
if result != filter.Forward {
if sf.Apply(envelope) != filter.Forward {
logger.Warningf("[channel: %s] Client authorization revoked for deliver request", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}
Expand Down
42 changes: 12 additions & 30 deletions orderer/common/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,46 +37,28 @@ const (
// Rule defines a filter function which accepts, rejects, or forwards (to the next rule) an Envelope
type Rule interface {
// Apply applies the rule to the given Envelope, replying with the Action to take for the message
// If the filter Accepts a message, it should provide a committer to use when writing the message to the chain
Apply(message *ab.Envelope) (Action, Committer)
Apply(message *ab.Envelope) Action
}

// Committer is returned by postfiltering and should be invoked once the message has been written to the blockchain
type Committer interface {
// Commit performs whatever action should be performed upon committing of a message
Commit()

// Isolated returns whether this transaction should have a block to itself or may be mixed with other transactions
Isolated() bool
}

type noopCommitter struct{}

func (nc noopCommitter) Commit() {}
func (nc noopCommitter) Isolated() bool { return false }

// NoopCommitter does nothing on commit and is not isolated
var NoopCommitter = Committer(noopCommitter{})

// EmptyRejectRule rejects empty messages
var EmptyRejectRule = Rule(emptyRejectRule{})

type emptyRejectRule struct{}

func (a emptyRejectRule) Apply(message *ab.Envelope) (Action, Committer) {
func (a emptyRejectRule) Apply(message *ab.Envelope) Action {
if message.Payload == nil {
return Reject, nil
return Reject
}
return Forward, nil
return Forward
}

// AcceptRule always returns Accept as a result for Apply
var AcceptRule = Rule(acceptRule{})

type acceptRule struct{}

func (a acceptRule) Apply(message *ab.Envelope) (Action, Committer) {
return Accept, NoopCommitter
func (a acceptRule) Apply(message *ab.Envelope) Action {
return Accept
}

// RuleSet is used to apply a collection of rules
Expand All @@ -91,17 +73,17 @@ func NewRuleSet(rules []Rule) *RuleSet {
}
}

// Apply applies the rules given for this set in order, returning the committer, nil on valid, or nil, err on invalid
func (rs *RuleSet) Apply(message *ab.Envelope) (Committer, error) {
// Apply applies the rules given for this set in order, returning nil on valid or err on invalid
func (rs *RuleSet) Apply(message *ab.Envelope) error {
for _, rule := range rs.rules {
action, committer := rule.Apply(message)
action := rule.Apply(message)
switch action {
case Accept:
return committer, nil
return nil
case Reject:
return nil, fmt.Errorf("Rejected by rule: %T", rule)
return fmt.Errorf("Rejected by rule: %T", rule)
default:
}
}
return nil, fmt.Errorf("No matching filter found")
return fmt.Errorf("No matching filter found")
}
Loading

0 comments on commit 9018aea

Please sign in to comment.