From 5fa00ffc1130addd53e71179326b1d51cb604f26 Mon Sep 17 00:00:00 2001 From: Will Lahti Date: Thu, 4 Jan 2018 19:16:14 -0500 Subject: [PATCH] [FAB-7604] Peer deliver unusable when pol. not defined After FAB-7521, the peer deliver service is unusable because the BLOCKEVENT policy is not set by default. This CR uses the aclmgmt package, which will check for the policy and, if not set, use the default value (in this case, channel readers). It also restores the behave tests to their previous state to ensure peer deliver remains usable by default. Change-Id: I46e71853881271539e28a110ce8b81d3bd248d19 Signed-off-by: Will Lahti --- bddtests/features/bootstrap.feature | 4 +- common/config/api.go | 4 -- common/deliver/deliver.go | 21 +++--- common/deliver/deliver_test.go | 103 ++++++++++++---------------- core/peer/atomicbroadcast.go | 14 +--- core/peer/peer.go | 14 ---- orderer/common/server/server.go | 13 +++- peer/node/start.go | 8 ++- 8 files changed, 73 insertions(+), 108 deletions(-) diff --git a/bddtests/features/bootstrap.feature b/bddtests/features/bootstrap.feature index 6ddce48cd6a..b2fa0b0ec69 100644 --- a/bddtests/features/bootstrap.feature +++ b/bddtests/features/bootstrap.feature @@ -254,14 +254,14 @@ Feature: Bootstrap | ChainId | Start | End | | com.acme.blockchain.jdoe.channel1 | 0 | 0 | - Then user "dev0Org0" should get a delivery "genesisBlockForMyNewChannel" from "peer0" of "0" blocks with "0" messages within "1" seconds + Then user "dev0Org0" should get a delivery "genesisBlockForMyNewChannel" from "peer0" of "1" blocks with "1" messages within "1" seconds When user "dev0Org0" using cert alias "consortium1-cert" connects to deliver function on orderer "peer2" using port "7051" And user "dev0Org0" sends deliver a seek request on orderer "peer2" with properties: | ChainId | Start | End | | com.acme.blockchain.jdoe.channel1 | 0 | 0 | - Then user "dev0Org0" should get a delivery "genesisBlockForMyNewChannelFromOtherOrgsPeer" from "peer2" of "0" blocks with "0" messages within "1" seconds + Then user "dev0Org0" should get a delivery "genesisBlockForMyNewChannelFromOtherOrgsPeer" from "peer2" of "1" blocks with "1" messages within "1" seconds # Entry point for invoking on an existing channel When user "peer0Admin" creates a chaincode spec "ccSpec" with name "example02" of type "GOLANG" for chaincode "github.com/hyperledger/fabric/examples/chaincode/go/chaincode_example02" with args diff --git a/common/config/api.go b/common/config/api.go index 0c281671974..c3bfaa7ec7f 100644 --- a/common/config/api.go +++ b/common/config/api.go @@ -6,7 +6,6 @@ SPDX-License-Identifier: Apache-2.0 package config import ( - "github.com/hyperledger/fabric/common/resourcesconfig" cb "github.com/hyperledger/fabric/protos/common" ) @@ -26,7 +25,4 @@ type Manager interface { // GetResourceConfig defines methods that are related to resource configuration GetResourceConfig(channel string) Config - - // GetPolicyMapper returns API to the policy mapper - GetPolicyMapper(channel string) resourcesconfig.PolicyMapper } diff --git a/common/deliver/deliver.go b/common/deliver/deliver.go index e62404e07a5..2271077d9a5 100644 --- a/common/deliver/deliver.go +++ b/common/deliver/deliver.go @@ -68,18 +68,19 @@ type Support interface { Errored() <-chan struct{} } -// PolicyNameProvider provides a policy name given the channel id -type PolicyNameProvider func(chainID string) (string, error) +// PolicyChecker checks the envelope against the policy logic supplied by the +// function +type PolicyChecker func(envelope *cb.Envelope, channelID string) error type deliverServer struct { sm SupportManager - policyProvider PolicyNameProvider + policyChecker PolicyChecker timeWindow time.Duration bindingInspector comm.BindingInspector } // NewHandlerImpl creates an implementation of the Handler interface -func NewHandlerImpl(sm SupportManager, policyProvider PolicyNameProvider, timeWindow time.Duration, mutualTLS bool) Handler { +func NewHandlerImpl(sm SupportManager, policyChecker PolicyChecker, timeWindow time.Duration, mutualTLS bool) Handler { // function to extract the TLS cert hash from a channel header extract := func(msg proto.Message) []byte { chdr, isChannelHeader := msg.(*cb.ChannelHeader) @@ -92,7 +93,7 @@ func NewHandlerImpl(sm SupportManager, policyProvider PolicyNameProvider, timeWi return &deliverServer{ sm: sm, - policyProvider: policyProvider, + policyChecker: policyChecker, timeWindow: timeWindow, bindingInspector: bindingInspector, } @@ -166,13 +167,7 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env lastConfigSequence := chain.Sequence() - policyName, err := ds.policyProvider(chdr.ChannelId) - if err != nil { - logger.Warningf("[channel: %s] failed to obtain policy name due to %s", chdr.ChannelId, err) - return sendStatusReply(srv, cb.Status_BAD_REQUEST) - } - sf := NewSigFilter(policyName, chain) - if err := sf.Apply(envelope); err != nil { + if err := ds.policyChecker(envelope, chdr.ChannelId); err != nil { logger.Warningf("[channel: %s] Received unauthorized deliver request from %s: %s", chdr.ChannelId, addr, err) return sendStatusReply(srv, cb.Status_FORBIDDEN) } @@ -225,7 +220,7 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env currentConfigSequence := chain.Sequence() if currentConfigSequence > lastConfigSequence { lastConfigSequence = currentConfigSequence - if err := sf.Apply(envelope); err != nil { + if err := ds.policyChecker(envelope, chdr.ChannelId); err != nil { logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err) return sendStatusReply(srv, cb.Status_FORBIDDEN) } diff --git a/common/deliver/deliver_test.go b/common/deliver/deliver_test.go index 6e899a3406b..e41d7ee93e1 100644 --- a/common/deliver/deliver_test.go +++ b/common/deliver/deliver_test.go @@ -45,10 +45,6 @@ import ( var genesisBlock = cb.NewBlock(0, nil) var systemChainID = "systemChain" -var policyNameProvider = func(_ string) (string, error) { - return policies.ChannelReaders, nil -} - var timeWindow = time.Duration(15 * time.Minute) var testCert = &x509.Certificate{ Raw: []byte("test"), @@ -172,14 +168,24 @@ func NewRAMLedger() blockledger.ReadWriter { return rl } -func initializeDeliverHandler() Handler { - mm := newMockMultichainManager() - for i := 1; i < ledgerSize; i++ { - l := mm.chains[systemChainID].ledger - l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}})) +func initializeDeliverHandler(mm *mockSupportManager, mutualTLS bool) Handler { + if mm == nil { + mm = newMockMultichainManager() + ms := mm.chains[systemChainID] + l := ms.ledger + for i := 1; i < ledgerSize; i++ { + l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}})) + } } - - return NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS) + policyChecker := func(env *cb.Envelope, channelID string) error { + chain, ok := mm.GetChain(channelID) + if !ok { + return fmt.Errorf("channel %s not found", channelID) + } + sf := NewSigFilter(policies.ChannelReaders, chain) + return sf.Apply(env) + } + return NewHandlerImpl(mm, policyChecker, timeWindow, mutualTLS) } func newMockMultichainManager() *mockSupportManager { @@ -237,7 +243,7 @@ func TestWholeChainSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekOldest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) @@ -269,7 +275,7 @@ func TestNewestSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) @@ -291,7 +297,7 @@ func TestSpecificSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) specifiedStart := uint64(3) @@ -328,7 +334,7 @@ func TestUnauthorizedSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS) + ds := initializeDeliverHandler(mm, !mutualTLS) go ds.Handle(m) @@ -337,7 +343,7 @@ func TestUnauthorizedSeek(t *testing.T) { select { case deliverReply := <-m.sendChan: if deliverReply.GetStatus() != cb.Status_FORBIDDEN { - t.Fatalf("Received wrong error on the reply channel") + t.Fatalf("Received wrong error on the reply channel: %s", deliverReply.GetStatus()) } case <-time.After(time.Second): t.Fatalf("Timed out waiting to get all blocks") @@ -353,7 +359,7 @@ func TestRevokedAuthorizationSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS) + ds := initializeDeliverHandler(mm, !mutualTLS) go ds.Handle(m) @@ -384,7 +390,7 @@ func TestOutOfBoundSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(uint64(3 * ledgerSize)), Stop: seekSpecified(uint64(3 * ledgerSize)), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) @@ -403,7 +409,7 @@ func TestFailFastSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(uint64(ledgerSize - 1)), Stop: seekSpecified(ledgerSize), Behavior: ab.SeekInfo_FAIL_IF_NOT_READY}) @@ -436,7 +442,7 @@ func TestBlockingSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS) + ds := initializeDeliverHandler(mm, !mutualTLS) go ds.Handle(m) @@ -490,7 +496,7 @@ func TestErroredSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS) + ds := initializeDeliverHandler(mm, !mutualTLS) go ds.Handle(m) @@ -514,7 +520,7 @@ func TestErroredBlockingSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS) + ds := initializeDeliverHandler(mm, !mutualTLS) go ds.Handle(m) @@ -539,7 +545,7 @@ func TestErroredBlockingSeek(t *testing.T) { func TestSGracefulShutdown(t *testing.T) { m := newMockD() - ds := NewHandlerImpl(nil, policyNameProvider, timeWindow, !mutualTLS) + ds := initializeDeliverHandler(nil, !mutualTLS) close(m.recvChan) assert.NoError(t, ds.Handle(m), "Expected no error for hangup") @@ -549,7 +555,7 @@ func TestReversedSeqSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) specifiedStart := uint64(7) @@ -567,13 +573,13 @@ func TestReversedSeqSeek(t *testing.T) { } func TestBadStreamRecv(t *testing.T) { - bh := NewHandlerImpl(nil, policyNameProvider, timeWindow, !mutualTLS) + bh := initializeDeliverHandler(nil, !mutualTLS) assert.Error(t, bh.Handle(&erroneousRecvMockD{}), "Should catch unexpected stream error") } func TestBadStreamSend(t *testing.T) { m := &erroneousSendMockD{recvVal: makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY})} - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) assert.Error(t, ds.Handle(m), "Should catch unexpected stream error") } @@ -581,7 +587,7 @@ func TestOldestSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekOldest, Stop: seekOldest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) @@ -599,7 +605,7 @@ func TestNoPayloadSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- &cb.Envelope{Payload: []byte("Foo")} @@ -616,7 +622,7 @@ func TestNilPayloadHeaderSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{})} @@ -633,7 +639,7 @@ func TestBadChannelHeader(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{ @@ -656,7 +662,7 @@ func TestChainNotFound(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS) + ds := initializeDeliverHandler(mm, !mutualTLS) go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) @@ -673,7 +679,7 @@ func TestBadSeekInfoPayload(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- &cb.Envelope{ @@ -701,7 +707,7 @@ func TestMissingSeekPosition(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- &cb.Envelope{ @@ -729,7 +735,7 @@ func TestNilTimestamp(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- &cb.Envelope{ @@ -756,7 +762,7 @@ func TestTimestampOutOfTimeWindow(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := initializeDeliverHandler() + ds := initializeDeliverHandler(nil, !mutualTLS) go ds.Handle(m) m.recvChan <- &cb.Envelope{ @@ -781,17 +787,10 @@ func TestTimestampOutOfTimeWindow(t *testing.T) { } func TestSeekWithMutualTLS(t *testing.T) { - mm := newMockMultichainManager() - ms := mm.chains[systemChainID] - l := ms.ledger - for i := 1; i < ledgerSize; i++ { - l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}})) - } - m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, mutualTLS) + ds := initializeDeliverHandler(nil, mutualTLS) go ds.Handle(m) m.recvChan <- makeSeekWithTLSCertHash(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}, testCert) @@ -810,17 +809,10 @@ func TestSeekWithMutualTLS(t *testing.T) { } func TestSeekWithMutualTLS_wrongTLSCert(t *testing.T) { - mm := newMockMultichainManager() - ms := mm.chains[systemChainID] - l := ms.ledger - for i := 1; i < ledgerSize; i++ { - l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}})) - } - m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, mutualTLS) + ds := initializeDeliverHandler(nil, mutualTLS) go ds.Handle(m) wrongCert := &x509.Certificate{ Raw: []byte("wrong"), @@ -836,17 +828,10 @@ func TestSeekWithMutualTLS_wrongTLSCert(t *testing.T) { } func TestSeekWithMutualTLS_noTLSCert(t *testing.T) { - mm := newMockMultichainManager() - ms := mm.chains[systemChainID] - l := ms.ledger - for i := 1; i < ledgerSize; i++ { - l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}})) - } - m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, mutualTLS) + ds := initializeDeliverHandler(nil, mutualTLS) go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) diff --git a/core/peer/atomicbroadcast.go b/core/peer/atomicbroadcast.go index f10a50a03c1..83e9741ff10 100644 --- a/core/peer/atomicbroadcast.go +++ b/core/peer/atomicbroadcast.go @@ -16,17 +16,14 @@ limitations under the License. package peer import ( - "fmt" "runtime/debug" "time" "github.com/hyperledger/fabric/common/deliver" "github.com/hyperledger/fabric/common/flogging" - "github.com/hyperledger/fabric/core/aclmgmt/resources" "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" "github.com/op/go-logging" - "github.com/pkg/errors" ) const pkgLogID = "common/peer" @@ -62,16 +59,9 @@ func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error { // NewAtomicBroadcastServer creates an ab.AtomicBroadcastServer based on the // ledger Reader. Broadcast is not implemented/supported on the peer. -func NewAtomicBroadcastServer(timeWindow time.Duration, mutualTLS bool) ab.AtomicBroadcastServer { - configSupport := NewConfigSupport() +func NewAtomicBroadcastServer(timeWindow time.Duration, mutualTLS bool, policyChecker deliver.PolicyChecker) ab.AtomicBroadcastServer { s := &server{ - dh: deliver.NewHandlerImpl(DeliverSupportManager{}, func(chainID string) (string, error) { - policyMapper := configSupport.GetPolicyMapper(chainID) - if policyMapper == nil { - return "", errors.New(fmt.Sprintf("cannot find policy mapper for channel %s", chainID)) - } - return policyMapper.PolicyRefForAPI(resources.BLOCKEVENT), nil - }, timeWindow, mutualTLS), + dh: deliver.NewHandlerImpl(DeliverSupportManager{}, policyChecker, timeWindow, mutualTLS), } return s } diff --git a/core/peer/peer.go b/core/peer/peer.go index 6f278974be4..f16c4a5d9eb 100644 --- a/core/peer/peer.go +++ b/core/peer/peer.go @@ -809,17 +809,3 @@ func (*configSupport) GetResourceConfig(channel string) cc.Config { } return chain.cs.bundleSource.ConfigtxValidator() } - -// GetPolicyMapper returns an instance of a object that represents -// an API policy mapper which provides a mapping from specific API -// function to its policy -func (*configSupport) GetPolicyMapper(channel string) resourcesconfig.PolicyMapper { - chains.RLock() - defer chains.RUnlock() - chain := chains.list[channel] - if chain == nil { - peerLogger.Error("GetPolicyMapper: channel", channel, "not found in the list of channels associated with this peer") - return nil - } - return chain.cs.bundleSource.APIPolicyMapper() -} diff --git a/orderer/common/server/server.go b/orderer/common/server/server.go index 4de03c36e7f..5b2d2ca59ad 100644 --- a/orderer/common/server/server.go +++ b/orderer/common/server/server.go @@ -22,6 +22,7 @@ import ( "github.com/hyperledger/fabric/orderer/common/multichannel" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" + "github.com/pkg/errors" ) type broadcastSupport struct { @@ -49,9 +50,15 @@ type server struct { // NewServer creates an ab.AtomicBroadcastServer based on the broadcast target and ledger Reader func NewServer(r *multichannel.Registrar, _ crypto.LocalSigner, debug *localconfig.Debug, timeWindow time.Duration, mutualTLS bool) ab.AtomicBroadcastServer { s := &server{ - dh: deliver.NewHandlerImpl(deliverSupport{Registrar: r}, func(_ string) (string, error) { - return policies.ChannelReaders, nil - }, timeWindow, mutualTLS), + dh: deliver.NewHandlerImpl(deliverSupport{Registrar: r}, + func(env *cb.Envelope, channelID string) error { + chain, ok := r.GetChain(channelID) + if !ok { + return errors.Errorf("channel %s not found", channelID) + } + sf := deliver.NewSigFilter(policies.ChannelReaders, chain) + return sf.Apply(env) + }, timeWindow, mutualTLS), bh: broadcast.NewHandlerImpl(broadcastSupport{Registrar: r}), debug: debug, } diff --git a/peer/node/start.go b/peer/node/start.go index d9254b33560..7c2d38e3f9e 100644 --- a/peer/node/start.go +++ b/peer/node/start.go @@ -23,6 +23,8 @@ import ( "github.com/hyperledger/fabric/common/localmsp" "github.com/hyperledger/fabric/common/viperutil" "github.com/hyperledger/fabric/core" + "github.com/hyperledger/fabric/core/aclmgmt" + "github.com/hyperledger/fabric/core/aclmgmt/resources" "github.com/hyperledger/fabric/core/chaincode" "github.com/hyperledger/fabric/core/chaincode/accesscontrol" "github.com/hyperledger/fabric/core/comm" @@ -42,6 +44,7 @@ import ( "github.com/hyperledger/fabric/peer/common" peergossip "github.com/hyperledger/fabric/peer/gossip" "github.com/hyperledger/fabric/peer/version" + cb "github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/ledger/rwset" ab "github.com/hyperledger/fabric/protos/orderer" pb "github.com/hyperledger/fabric/protos/peer" @@ -175,7 +178,10 @@ func serve(args []string) error { // broadcast mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert timeWindow := viper.GetDuration("peer.authentication.timewindow") - abServer := peer.NewAtomicBroadcastServer(timeWindow, mutualTLS) + policyChecker := func(env *cb.Envelope, channelID string) error { + return aclmgmt.GetACLProvider().CheckACL(resources.BLOCKEVENT, channelID, env) + } + abServer := peer.NewAtomicBroadcastServer(timeWindow, mutualTLS, policyChecker) ab.RegisterAtomicBroadcastServer(peerServer.Server(), abServer) // enable the cache of chaincode info