Skip to content

Commit

Permalink
p2p, swarm: fix node up races by granular locking (#18976)
Browse files Browse the repository at this point in the history
* swarm/network: DRY out repeated giga comment

I not necessarily agree with the way we wait for event propagation.
But I truly disagree with having duplicated giga comments.

* p2p/simulations: encapsulate Node.Up field so we avoid data races

The Node.Up field was accessed concurrently without "proper" locking.
There was a lock on Network and that was used sometimes to access
the  field. Other times the locking was missed and we had
a data race.

For example: ethereum/go-ethereum#18464
The case above was solved, but there were still intermittent/hard to
reproduce races. So let's solve the issue permanently.

resolves: ethersphere/swarm#1146

* p2p/simulations: fix unmarshal of simulations.Node

Making Node.Up field private in 13292ee897e345045fbfab3bda23a77589a271c1
broke TestHTTPNetwork and TestHTTPSnapshot. Because the default
UnmarshalJSON does not handle unexported fields.

Important: The fix is partial and not proper to my taste. But I cut
scope as I think the fix may require a change to the current
serialization format. New ticket:
ethersphere/swarm#1177

* p2p/simulations: Add a sanity test case for Node.Config UnmarshalJSON

* p2p/simulations: revert back to defer Unlock() pattern for Network

It's a good patten to call `defer Unlock()` right after `Lock()` so
(new) error cases won't miss to unlock. Let's get back to that pattern.

The patten was abandoned in 85a79b3ad3c5863f8612d25c246bcfad339f36b7,
while fixing a data race. That data race does not exist anymore,
since the Node.Up field got hidden behind its own lock.

* p2p/simulations: consistent naming for test providers Node.UnmarshalJSON

* p2p/simulations: remove JSON annotation from private fields of Node

As unexported fields are not serialized.

* p2p/simulations: fix deadlock in Network.GetRandomDownNode()

Problem: GetRandomDownNode() locks -> getDownNodeIDs() ->
GetNodes() tries to lock -> deadlock

On Network type, unexported functions must assume that `net.lock`
is already acquired and should not call exported functions which
might try to lock again.

* p2p/simulations: ensure method conformity for Network

Connect* methods were moved to p2p/simulations.Network from
swarm/network/simulation. However these new methods did not follow
the pattern of Network methods, i.e., all exported method locks
the whole Network either for read or write.

* p2p/simulations: fix deadlock during network shutdown

`TestDiscoveryPersistenceSimulationSimAdapter` often got into deadlock.
The execution was stuck on two locks, i.e, `Kademlia.lock` and
`p2p/simulations.Network.lock`. Usually the test got stuck once in each
20 executions with high confidence.

`Kademlia` was stuck in `Kademlia.EachAddr()` and `Network` in
`Network.Stop()`.

Solution: in `Network.Stop()` `net.lock` must be released before
calling `node.Stop()` as stopping a node (somehow - I did not find
the exact code path) causes `Network.InitConn()` to be called from
`Kademlia.SuggestPeer()` and that blocks on `net.lock`.

Related ticket: ethersphere/swarm#1223

* swarm/state: simplify if statement in DBStore.Put()

* p2p/simulations: remove faulty godoc from private function

The comment started with the wrong method name.

The method is simple and self explanatory. Also, it's private.
=> Let's just remove the comment.

(cherry picked from commit 50b872bf05b8644f14b9bea340092ced6968dd59)
  • Loading branch information
HighGoal1991 committed Feb 19, 2019
1 parent ceea75d commit 0396842
Show file tree
Hide file tree
Showing 12 changed files with 323 additions and 123 deletions.
43 changes: 32 additions & 11 deletions p2p/simulations/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ var (
// It is useful when constructing a chain network topology
// when Network adds and removes nodes dynamically.
func (net *Network) ConnectToLastNode(id enode.ID) (err error) {
net.lock.Lock()
defer net.lock.Unlock()

ids := net.getUpNodeIDs()
l := len(ids)
if l < 2 {
Expand All @@ -41,29 +44,35 @@ func (net *Network) ConnectToLastNode(id enode.ID) (err error) {
if last == id {
last = ids[l-2]
}
return net.connect(last, id)
return net.connectNotConnected(last, id)
}

// ConnectToRandomNode connects the node with provided NodeID
// to a random node that is up.
func (net *Network) ConnectToRandomNode(id enode.ID) (err error) {
selected := net.GetRandomUpNode(id)
net.lock.Lock()
defer net.lock.Unlock()

selected := net.getRandomUpNode(id)
if selected == nil {
return ErrNodeNotFound
}
return net.connect(selected.ID(), id)
return net.connectNotConnected(selected.ID(), id)
}

// ConnectNodesFull connects all nodes one to another.
// It provides a complete connectivity in the network
// which should be rarely needed.
func (net *Network) ConnectNodesFull(ids []enode.ID) (err error) {
net.lock.Lock()
defer net.lock.Unlock()

if ids == nil {
ids = net.getUpNodeIDs()
}
for i, lid := range ids {
for _, rid := range ids[i+1:] {
if err = net.connect(lid, rid); err != nil {
if err = net.connectNotConnected(lid, rid); err != nil {
return err
}
}
Expand All @@ -74,12 +83,19 @@ func (net *Network) ConnectNodesFull(ids []enode.ID) (err error) {
// ConnectNodesChain connects all nodes in a chain topology.
// If ids argument is nil, all nodes that are up will be connected.
func (net *Network) ConnectNodesChain(ids []enode.ID) (err error) {
net.lock.Lock()
defer net.lock.Unlock()

return net.connectNodesChain(ids)
}

func (net *Network) connectNodesChain(ids []enode.ID) (err error) {
if ids == nil {
ids = net.getUpNodeIDs()
}
l := len(ids)
for i := 0; i < l-1; i++ {
if err := net.connect(ids[i], ids[i+1]); err != nil {
if err := net.connectNotConnected(ids[i], ids[i+1]); err != nil {
return err
}
}
Expand All @@ -89,39 +105,44 @@ func (net *Network) ConnectNodesChain(ids []enode.ID) (err error) {
// ConnectNodesRing connects all nodes in a ring topology.
// If ids argument is nil, all nodes that are up will be connected.
func (net *Network) ConnectNodesRing(ids []enode.ID) (err error) {
net.lock.Lock()
defer net.lock.Unlock()

if ids == nil {
ids = net.getUpNodeIDs()
}
l := len(ids)
if l < 2 {
return nil
}
if err := net.ConnectNodesChain(ids); err != nil {
if err := net.connectNodesChain(ids); err != nil {
return err
}
return net.connect(ids[l-1], ids[0])
return net.connectNotConnected(ids[l-1], ids[0])
}

// ConnectNodesStar connects all nodes into a star topology
// If ids argument is nil, all nodes that are up will be connected.
func (net *Network) ConnectNodesStar(ids []enode.ID, center enode.ID) (err error) {
net.lock.Lock()
defer net.lock.Unlock()

if ids == nil {
ids = net.getUpNodeIDs()
}
for _, id := range ids {
if center == id {
continue
}
if err := net.connect(center, id); err != nil {
if err := net.connectNotConnected(center, id); err != nil {
return err
}
}
return nil
}

// connect connects two nodes but ignores already connected error.
func (net *Network) connect(oneID, otherID enode.ID) error {
return ignoreAlreadyConnectedErr(net.Connect(oneID, otherID))
func (net *Network) connectNotConnected(oneID, otherID enode.ID) error {
return ignoreAlreadyConnectedErr(net.connect(oneID, otherID))
}

func ignoreAlreadyConnectedErr(err error) error {
Expand Down
2 changes: 1 addition & 1 deletion p2p/simulations/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func ControlEvent(v interface{}) *Event {
func (e *Event) String() string {
switch e.Type {
case EventTypeNode:
return fmt.Sprintf("<node-event> id: %s up: %t", e.Node.ID().TerminalString(), e.Node.Up)
return fmt.Sprintf("<node-event> id: %s up: %t", e.Node.ID().TerminalString(), e.Node.Up())
case EventTypeConn:
return fmt.Sprintf("<conn-event> nodes: %s->%s up: %t", e.Conn.One.TerminalString(), e.Conn.Other.TerminalString(), e.Conn.Up)
case EventTypeMsg:
Expand Down
18 changes: 10 additions & 8 deletions p2p/simulations/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,15 @@ type expectEvents struct {
}

func (t *expectEvents) nodeEvent(id string, up bool) *Event {
node := Node{
Config: &adapters.NodeConfig{
ID: enode.HexID(id),
},
up: up,
}
return &Event{
Type: EventTypeNode,
Node: &Node{
Config: &adapters.NodeConfig{
ID: enode.HexID(id),
},
Up: up,
},
Node: &node,
}
}

Expand Down Expand Up @@ -480,6 +481,7 @@ loop:
}

func (t *expectEvents) expect(events ...*Event) {
t.Helper()
timeout := time.After(10 * time.Second)
i := 0
for {
Expand All @@ -501,8 +503,8 @@ func (t *expectEvents) expect(events ...*Event) {
if event.Node.ID() != expected.Node.ID() {
t.Fatalf("expected node event %d to have id %q, got %q", i, expected.Node.ID().TerminalString(), event.Node.ID().TerminalString())
}
if event.Node.Up != expected.Node.Up {
t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up, event.Node.Up)
if event.Node.Up() != expected.Node.Up() {
t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up(), event.Node.Up())
}

case EventTypeConn:
Expand Down
9 changes: 5 additions & 4 deletions p2p/simulations/mocker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,12 @@ func TestMocker(t *testing.T) {
for {
select {
case event := <-events:
//if the event is a node Up event only
if event.Node != nil && event.Node.Up {
if isNodeUp(event) {
//add the correspondent node ID to the map
nodemap[event.Node.Config.ID] = true
//this means all nodes got a nodeUp event, so we can continue the test
if len(nodemap) == nodeCount {
nodesComplete = true
//wait for 3s as the mocker will need time to connect the nodes
//time.Sleep( 3 *time.Second)
}
} else if event.Conn != nil && nodesComplete {
connCount += 1
Expand Down Expand Up @@ -169,3 +166,7 @@ func TestMocker(t *testing.T) {
t.Fatalf("Expected empty list of nodes, got: %d", len(nodesInfo))
}
}

func isNodeUp(event *Event) bool {
return event.Node != nil && event.Node.Up()
}
Loading

0 comments on commit 0396842

Please sign in to comment.