From 0a0088714dc55bf495d63c70a31c3be217199c97 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Sat, 27 Jan 2024 19:29:21 -0500 Subject: [PATCH] Fix P-chain validator set lookup race condition --- network/p2p/client.go | 19 ++++----- network/p2p/gossip/gossip.go | 29 ++++--------- network/p2p/gossip/handler.go | 31 ++++---------- network/p2p/gossip/message.go | 59 ++++++++++++++++++++++++++ network/p2p/network.go | 6 ++- network/p2p/router.go | 25 ++++++++--- vms/platformvm/network/network.go | 6 +-- vms/platformvm/vm.go | 5 ++- vms/platformvm/vm_regression_test.go | 62 ++++++++++++++++++++++++++++ vms/platformvm/vm_test.go | 3 +- 10 files changed, 180 insertions(+), 65 deletions(-) create mode 100644 network/p2p/gossip/message.go diff --git a/network/p2p/client.go b/network/p2p/client.go index 6107f8abb7d9..b506baf9c630 100644 --- a/network/p2p/client.go +++ b/network/p2p/client.go @@ -75,7 +75,7 @@ func (c *Client) AppRequest( c.router.lock.Lock() defer c.router.lock.Unlock() - appRequestBytes = c.prefixMessage(appRequestBytes) + appRequestBytes = PrefixMessage(c.handlerPrefix, appRequestBytes) for nodeID := range nodeIDs { requestID := c.router.requestID if _, ok := c.router.pendingAppRequests[requestID]; ok { @@ -112,7 +112,7 @@ func (c *Client) AppGossip( ) error { return c.sender.SendAppGossip( ctx, - c.prefixMessage(appGossipBytes), + PrefixMessage(c.handlerPrefix, appGossipBytes), ) } @@ -125,7 +125,7 @@ func (c *Client) AppGossipSpecific( return c.sender.SendAppGossipSpecific( ctx, nodeIDs, - c.prefixMessage(appGossipBytes), + PrefixMessage(c.handlerPrefix, appGossipBytes), ) } @@ -153,7 +153,7 @@ func (c *Client) CrossChainAppRequest( ctx, chainID, requestID, - c.prefixMessage(appRequestBytes), + PrefixMessage(c.handlerPrefix, appRequestBytes), ); err != nil { return err } @@ -167,15 +167,14 @@ func (c *Client) CrossChainAppRequest( return nil } -// prefixMessage prefixes the original message with the handler identifier -// corresponding to this client. +// PrefixMessage prefixes the original message with the protocol identifier. // // Only gossip and request messages need to be prefixed. // Response messages don't need to be prefixed because request ids are tracked // which map to the expected response handler. -func (c *Client) prefixMessage(src []byte) []byte { - messageBytes := make([]byte, len(c.handlerPrefix)+len(src)) - copy(messageBytes, c.handlerPrefix) - copy(messageBytes[len(c.handlerPrefix):], src) +func PrefixMessage(prefix, msg []byte) []byte { + messageBytes := make([]byte, len(prefix)+len(msg)) + copy(messageBytes, prefix) + copy(messageBytes[len(prefix):], msg) return messageBytes } diff --git a/network/p2p/gossip/gossip.go b/network/p2p/gossip/gossip.go index 97e90e6e99af..ab90e593b5a3 100644 --- a/network/p2p/gossip/gossip.go +++ b/network/p2p/gossip/gossip.go @@ -14,11 +14,8 @@ import ( "go.uber.org/zap" - "google.golang.org/protobuf/proto" - "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p" - "github.com/ava-labs/avalanchego/proto/pb/sdk" "github.com/ava-labs/avalanchego/utils" "github.com/ava-labs/avalanchego/utils/buffer" "github.com/ava-labs/avalanchego/utils/logging" @@ -151,12 +148,7 @@ type PullGossiper[T Gossipable] struct { } func (p *PullGossiper[_]) Gossip(ctx context.Context) error { - bloom, salt := p.set.GetFilter() - request := &sdk.PullGossipRequest{ - Filter: bloom, - Salt: salt, - } - msgBytes, err := proto.Marshal(request) + msgBytes, err := MarshalAppRequest(p.set.GetFilter()) if err != nil { return err } @@ -186,14 +178,14 @@ func (p *PullGossiper[_]) handleResponse( return } - response := &sdk.PullGossipResponse{} - if err := proto.Unmarshal(responseBytes, response); err != nil { + gossip, err := ParseAppResponse(responseBytes) + if err != nil { p.log.Debug("failed to unmarshal gossip response", zap.Error(err)) return } receivedBytes := 0 - for _, bytes := range response.Gossip { + for _, bytes := range gossip { receivedBytes += len(bytes) gossipable, err := p.marshaller.UnmarshalGossip(bytes) @@ -235,7 +227,7 @@ func (p *PullGossiper[_]) handleResponse( return } - receivedCountMetric.Add(float64(len(response.Gossip))) + receivedCountMetric.Add(float64(len(gossip))) receivedBytesMetric.Add(float64(receivedBytes)) } @@ -270,11 +262,8 @@ func (p *PushGossiper[T]) Gossip(ctx context.Context) error { return nil } - msg := &sdk.PushGossip{ - Gossip: make([][]byte, 0, p.pending.Len()), - } - sentBytes := 0 + gossip := make([][]byte, 0, p.pending.Len()) for sentBytes < p.targetGossipSize { gossipable, ok := p.pending.PeekLeft() if !ok { @@ -288,12 +277,12 @@ func (p *PushGossiper[T]) Gossip(ctx context.Context) error { return err } - msg.Gossip = append(msg.Gossip, bytes) + gossip = append(gossip, bytes) sentBytes += len(bytes) p.pending.PopLeft() } - msgBytes, err := proto.Marshal(msg) + msgBytes, err := MarshalAppGossip(gossip) if err != nil { return err } @@ -308,7 +297,7 @@ func (p *PushGossiper[T]) Gossip(ctx context.Context) error { return fmt.Errorf("failed to get sent bytes metric: %w", err) } - sentCountMetric.Add(float64(len(msg.Gossip))) + sentCountMetric.Add(float64(len(gossip))) sentBytesMetric.Add(float64(sentBytes)) return p.client.AppGossip(ctx, msgBytes) diff --git a/network/p2p/gossip/handler.go b/network/p2p/gossip/handler.go index 15ef1fe16684..38e883926366 100644 --- a/network/p2p/gossip/handler.go +++ b/network/p2p/gossip/handler.go @@ -10,11 +10,8 @@ import ( "go.uber.org/zap" - "google.golang.org/protobuf/proto" - "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p" - "github.com/ava-labs/avalanchego/proto/pb/sdk" "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/logging" ) @@ -51,17 +48,7 @@ type Handler[T Gossipable] struct { } func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, error) { - request := &sdk.PullGossipRequest{} - if err := proto.Unmarshal(requestBytes, request); err != nil { - return nil, err - } - - salt, err := ids.ToID(request.Salt) - if err != nil { - return nil, err - } - - filter, err := bloom.Parse(request.Filter) + filter, salt, err := ParseAppRequest(requestBytes) if err != nil { return nil, err } @@ -94,10 +81,6 @@ func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, req return nil, err } - response := &sdk.PullGossipResponse{ - Gossip: gossipBytes, - } - sentCountMetric, err := h.metrics.sentCount.GetMetricWith(pullLabels) if err != nil { return nil, fmt.Errorf("failed to get sent count metric: %w", err) @@ -108,21 +91,21 @@ func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, req return nil, fmt.Errorf("failed to get sent bytes metric: %w", err) } - sentCountMetric.Add(float64(len(response.Gossip))) + sentCountMetric.Add(float64(len(gossipBytes))) sentBytesMetric.Add(float64(responseSize)) - return proto.Marshal(response) + return MarshalAppResponse(gossipBytes) } func (h Handler[_]) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte) { - msg := &sdk.PushGossip{} - if err := proto.Unmarshal(gossipBytes, msg); err != nil { + gossip, err := ParseAppGossip(gossipBytes) + if err != nil { h.log.Debug("failed to unmarshal gossip", zap.Error(err)) return } receivedBytes := 0 - for _, bytes := range msg.Gossip { + for _, bytes := range gossip { receivedBytes += len(bytes) gossipable, err := h.marshaller.UnmarshalGossip(bytes) if err != nil { @@ -164,6 +147,6 @@ func (h Handler[_]) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipByte return } - receivedCountMetric.Add(float64(len(msg.Gossip))) + receivedCountMetric.Add(float64(len(gossip))) receivedBytesMetric.Add(float64(receivedBytes)) } diff --git a/network/p2p/gossip/message.go b/network/p2p/gossip/message.go new file mode 100644 index 000000000000..47e6784e43d8 --- /dev/null +++ b/network/p2p/gossip/message.go @@ -0,0 +1,59 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package gossip + +import ( + "google.golang.org/protobuf/proto" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/proto/pb/sdk" + "github.com/ava-labs/avalanchego/utils/bloom" +) + +func MarshalAppRequest(filter, salt []byte) ([]byte, error) { + request := &sdk.PullGossipRequest{ + Filter: filter, + Salt: salt, + } + return proto.Marshal(request) +} + +func ParseAppRequest(bytes []byte) (*bloom.ReadFilter, ids.ID, error) { + request := &sdk.PullGossipRequest{} + if err := proto.Unmarshal(bytes, request); err != nil { + return nil, ids.Empty, err + } + + salt, err := ids.ToID(request.Salt) + if err != nil { + return nil, ids.Empty, err + } + + filter, err := bloom.Parse(request.Filter) + return filter, salt, err +} + +func MarshalAppResponse(gossip [][]byte) ([]byte, error) { + return proto.Marshal(&sdk.PullGossipResponse{ + Gossip: gossip, + }) +} + +func ParseAppResponse(bytes []byte) ([][]byte, error) { + response := &sdk.PullGossipResponse{} + err := proto.Unmarshal(bytes, response) + return response.Gossip, err +} + +func MarshalAppGossip(gossip [][]byte) ([]byte, error) { + return proto.Marshal(&sdk.PushGossip{ + Gossip: gossip, + }) +} + +func ParseAppGossip(bytes []byte) ([][]byte, error) { + msg := &sdk.PushGossip{} + err := proto.Unmarshal(bytes, msg) + return msg.Gossip, err +} diff --git a/network/p2p/network.go b/network/p2p/network.go index 604d06db617f..a98579c44183 100644 --- a/network/p2p/network.go +++ b/network/p2p/network.go @@ -217,7 +217,7 @@ func (n *Network) NewClient(handlerID uint64, options ...ClientOption) *Client { client := &Client{ handlerID: handlerID, handlerIDStr: strconv.FormatUint(handlerID, 10), - handlerPrefix: binary.AppendUvarint(nil, handlerID), + handlerPrefix: ProtocolPrefix(handlerID), sender: n.sender, router: n.router, options: &clientOptions{ @@ -281,3 +281,7 @@ type peerSampler struct { func (p peerSampler) Sample(_ context.Context, limit int) []ids.NodeID { return p.peers.Sample(limit) } + +func ProtocolPrefix(handlerID uint64) []byte { + return binary.AppendUvarint(nil, handlerID) +} diff --git a/network/p2p/router.go b/network/p2p/router.go index 82fdbf24fbc3..13a38abc56c5 100644 --- a/network/p2p/router.go +++ b/network/p2p/router.go @@ -399,18 +399,19 @@ func (r *router) CrossChainAppResponse(ctx context.Context, chainID ids.ID, requ // - A boolean indicating that parsing succeeded. // // Invariant: Assumes [r.lock] isn't held. -func (r *router) parse(msg []byte) ([]byte, *meteredHandler, string, bool) { - handlerID, bytesRead := binary.Uvarint(msg) - if bytesRead <= 0 { +func (r *router) parse(prefixedMsg []byte) ([]byte, *meteredHandler, string, bool) { + handlerID, msg, ok := ParseMessage(prefixedMsg) + if !ok { return nil, nil, "", false } + handlerStr := strconv.FormatUint(handlerID, 10) + r.lock.RLock() defer r.lock.RUnlock() - handlerStr := strconv.FormatUint(handlerID, 10) handler, ok := r.handlers[handlerID] - return msg[bytesRead:], handler, handlerStr, ok + return msg, handler, handlerStr, ok } // Invariant: Assumes [r.lock] isn't held. @@ -432,3 +433,17 @@ func (r *router) clearCrossChainAppRequest(requestID uint32) (pendingCrossChainA delete(r.pendingCrossChainAppRequests, requestID) return callback, ok } + +// Parse a gossip or request message. +// +// Returns: +// - The protocol ID. +// - The unprefixed protocol message. +// - A boolean indicating that parsing succeeded. +func ParseMessage(msg []byte) (uint64, []byte, bool) { + handlerID, bytesRead := binary.Uvarint(msg) + if bytesRead <= 0 { + return 0, nil, false + } + return handlerID, msg[bytesRead:], true +} diff --git a/vms/platformvm/network/network.go b/vms/platformvm/network/network.go index dbce242ffebd..39f6ee1dea0e 100644 --- a/vms/platformvm/network/network.go +++ b/vms/platformvm/network/network.go @@ -24,7 +24,7 @@ import ( "github.com/ava-labs/avalanchego/vms/platformvm/txs/mempool" ) -const txGossipHandlerID = 0 +const TxGossipHandlerID = 0 type Network interface { common.AppHandler @@ -80,7 +80,7 @@ func New( config.MaxValidatorSetStaleness, ) txGossipClient := p2pNetwork.NewClient( - txGossipHandlerID, + TxGossipHandlerID, p2p.WithValidatorSampling(validators), ) txGossipMetrics, err := gossip.NewMetrics(registerer, "tx") @@ -154,7 +154,7 @@ func New( appRequestHandler: validatorHandler, } - if err := p2pNetwork.AddHandler(txGossipHandlerID, txGossipHandler); err != nil { + if err := p2pNetwork.AddHandler(TxGossipHandlerID, txGossipHandler); err != nil { return nil, err } diff --git a/vms/platformvm/vm.go b/vms/platformvm/vm.go index 8c4b527e4539..6e38d803e870 100644 --- a/vms/platformvm/vm.go +++ b/vms/platformvm/vm.go @@ -205,7 +205,10 @@ func (vm *VM) Initialize( chainCtx.Log, chainCtx.NodeID, chainCtx.SubnetID, - chainCtx.ValidatorState, + validators.NewLockedState( + &chainCtx.Lock, + validatorManager, + ), txVerifier, mempool, txExecutorBackend.Config.PartialSyncPrimaryNetwork, diff --git a/vms/platformvm/vm_regression_test.go b/vms/platformvm/vm_regression_test.go index 52993d219632..0a87758fdb1b 100644 --- a/vms/platformvm/vm_regression_test.go +++ b/vms/platformvm/vm_regression_test.go @@ -14,26 +14,33 @@ import ( "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "github.com/ava-labs/avalanchego/chains" "github.com/ava-labs/avalanchego/chains/atomic" "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/memdb" "github.com/ava-labs/avalanchego/database/prefixdb" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/network/p2p/gossip" "github.com/ava-labs/avalanchego/snow/choices" "github.com/ava-labs/avalanchego/snow/consensus/snowman" "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/snowtest" "github.com/ava-labs/avalanchego/snow/uptime" "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/crypto/bls" "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" "github.com/ava-labs/avalanchego/utils/timer/mockable" + "github.com/ava-labs/avalanchego/version" "github.com/ava-labs/avalanchego/vms/components/avax" "github.com/ava-labs/avalanchego/vms/platformvm/block" "github.com/ava-labs/avalanchego/vms/platformvm/config" "github.com/ava-labs/avalanchego/vms/platformvm/metrics" + "github.com/ava-labs/avalanchego/vms/platformvm/network" "github.com/ava-labs/avalanchego/vms/platformvm/reward" "github.com/ava-labs/avalanchego/vms/platformvm/signer" "github.com/ava-labs/avalanchego/vms/platformvm/state" @@ -2218,6 +2225,61 @@ func TestSubnetValidatorSetAfterPrimaryNetworkValidatorRemoval(t *testing.T) { require.NoError(err) } +func TestValidatorSetRaceCondition(t *testing.T) { + require := require.New(t) + vm, _, _ := defaultVM(t, cortinaFork) + vm.ctx.Lock.Lock() + defer vm.ctx.Lock.Unlock() + + nodeID := ids.GenerateTestNodeID() + require.NoError(vm.Connected(context.Background(), nodeID, version.CurrentApp)) + + protocolAppRequestBytest, err := gossip.MarshalAppRequest( + bloom.EmptyFilter.Marshal(), + ids.Empty[:], + ) + require.NoError(err) + + appRequestBytes := p2p.PrefixMessage( + p2p.ProtocolPrefix(network.TxGossipHandlerID), + protocolAppRequestBytest, + ) + + var eg errgroup.Group + for i := 0; i < 100; i++ { + eg.Go(func() error { + return vm.AppRequest( + context.Background(), + nodeID, + 0, + time.Now().Add(time.Hour), + appRequestBytes, + ) + }) + } + + // If the validator set lock isn't held, the race detector should fail here. + for i := uint64(0); i < 100; i++ { + blk, err := block.NewBanffStandardBlock( + time.Now(), + vm.state.GetLastAccepted(), + i, + nil, + ) + require.NoError(err) + + vm.state.SetLastAccepted(blk.ID()) + vm.state.SetHeight(blk.Height()) + vm.state.AddStatelessBlock(blk) + } + + // If the validator set lock is grabbed, we need to make sure to release the + // lock to avoid a deadlock. + vm.ctx.Lock.Unlock() + require.NoError(eg.Wait()) + vm.ctx.Lock.Lock() +} + func buildAndAcceptStandardBlock(vm *VM) error { blk, err := vm.Builder.BuildBlock(context.Background()) if err != nil { diff --git a/vms/platformvm/vm_test.go b/vms/platformvm/vm_test.go index 1775e5db2c20..7cc53bb2320d 100644 --- a/vms/platformvm/vm_test.go +++ b/vms/platformvm/vm_test.go @@ -277,13 +277,14 @@ func defaultVM(t *testing.T, fork activeFork) (*VM, database.Database, *mutableS return nil } + dynamicConfigBytes := []byte(`{"network":{"max-validator-set-staleness":0}}`) require.NoError(vm.Initialize( context.Background(), ctx, chainDB, genesisBytes, nil, - nil, + dynamicConfigBytes, msgChan, nil, appSender,