-
Notifications
You must be signed in to change notification settings - Fork 616
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
raft: transport package #1748
raft: transport package #1748
Conversation
|
||
func (p *peer) stop() { | ||
close(p.stopped) | ||
<-p.done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this could block for a long time if p.msgc
has a lot of messages enqueued. Maybe processLoop
should check if p.stopped
is closed before selecting on p.msgc
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe it would make sense to reuse WithContext
here, and closing p.stopped
would cause the context to be cancelled.
case <-p.stopped: | ||
return errors.New("peer stopped") | ||
case <-ctx.Done(): | ||
return ctx.Err() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<-p.stopped
and <-ctx.Done
should be checked in a different select
from p.msgc <- m
. Otherwise the p.msgc <-m
branch can be randomly chosen.
for _, e := range errs { | ||
errStr += "\n" + e.Error() | ||
} | ||
return errors.Errorf("errors occured during Send: %s", errStr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume these eventually end up in the log. I think it's a lot cleaner to just log each error separately. Logging something containing newlines is pretty bad.
p.active = false | ||
p.mu.Unlock() | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this defer
is necessary at all. If the message queue is full, then most likely the node is down and sendProcessMessage
will call ReportUnreachable
. If the context is cancelled or p.stopped
is closed, I don't think calling ReportUnreachable
is appropriate.
if err != nil { | ||
return err | ||
} | ||
p, err := newPeer(m.To, addr, t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's problematic to use a peer that isn't tracked in the t.peers
list. For example, that peer might call ReportUnreachable
after raft has shut down.
ffcf2fd
to
1e956bc
Compare
@aaronlehmann PTAL. I'm still not sure how to handle message to unknown peer properly. Now I just create peer for it, which might not be desirable on |
What is the downside? |
Current coverage is 55.00% (diff: 65.15%)@@ master #1748 diff @@
==========================================
Files 103 105 +2
Lines 17250 17518 +268
Methods 0 0
Messages 0 0
Branches 0 0
==========================================
+ Hits 9456 9636 +180
- Misses 6649 6724 +75
- Partials 1145 1158 +13
|
@aaronlehmann it calls Dial (we probably need to call health check there as well) which might block sending messages for some time. |
Yeah, that's not good. If a dedicated goroutine for sending messages to unknown peers would solve the problem, it's worth considering. |
1e956bc
to
9923164
Compare
@aaronlehmann PTAL |
require.NoError(t, c.Add(2)) | ||
|
||
// set channel to nil to emulate full queue | ||
c.Get(1).tr.peers[2].msgc = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data race here
|
||
// unknownSender sends messages to unknown peers. It creates new peer for each | ||
// message and discards it after send. | ||
func (t *Transport) unknownSender(ctx context.Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this goroutine is giving us anything over the original goroutine-per-unknown-send approach. The problem I had with the original approach is that there was no way to make sure the goroutine was done before shutting down raft, but this seems to have the same problem.
Maybe going back to the original approach of spawning a goroutine every time we need to talk to an unknown sender, plus a wait group that run
waits on, would do the trick?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why we need to be sure that the raft is up in this case. ReportUnreachable
and ReportSnapshot
is no-op in those cases. Cancelling request should be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can move this to run
goroutine, so Done
will wait until unknown stuff is processed as well.
8d6fc76
to
542aad9
Compare
@aaronlehmann PTAL, now you can wait on Done until everything is finished. |
That looks much better, thanks. |
p.tr.config.Raft.ReportSnapshot(m.To, raft.SnapshotFailure) | ||
} | ||
p.tr.config.Raft.ReportUnreachable(m.To) | ||
if grpc.ErrorDesc(err) == membership.ErrMemberRemoved.Error() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a slight change to this in #1779 that should be replicated here.
case <-ctx.Done(): | ||
return ctx.Err() | ||
case <-t.ctx.Done(): | ||
return ctx.Err() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return t.ctx.Err()
?
@@ -539,6 +539,8 @@ func ContextErr(err error) StreamError { | |||
case context.Canceled: | |||
return streamErrorf(codes.Canceled, "%v", err) | |||
} | |||
fmt.Printf("%T %v\n", err, err) | |||
fmt.Printf("%T %v\n", context.Canceled, context.Canceled) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Committed unintentionally?
I think we should move forward with trying to port the raft code to use the new transport package. If we wait, it will become harder as the code diverges. |
@aaronlehmann thanks! will do. |
542aad9
to
262958c
Compare
@aaronlehmann I've replaced the code with transport package. However, I blindly removed stuff with |
8bc8c79
to
5b73465
Compare
Ok, it passes docker integration. Will fix comments at monday. |
@aaronlehmann @cyli I've split logic from restoreFromSnapshot. |
5d49d47
to
c047a61
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
m, ok := oldMembers[removedMember] | ||
if !ok { | ||
continue | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should call RemoveMember
even if the member is not in oldMembers
, because we need to keep track of the fact that this member ID was removed from the cluster.
@@ -215,38 +222,18 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) { | |||
<-viewStarted | |||
} | |||
|
|||
func (n *Node) restoreFromSnapshot(data []byte, forceNewCluster bool) error { | |||
func (n *Node) restoreFromSnapshot(data []byte, forceNewCluster bool) (api.ClusterSnapshot, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forceNewCluster
is not used anymore.
@aaronlehmann Fixed, thanks! Also, I have 52 sequential passes of integration test on my machine so far. |
acee090
to
b576841
Compare
@aaronlehmann PTAL. I've added address change handling. |
type hostsStore struct { | ||
mu sync.Mutex | ||
hosts map[uint64]string | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for?
@@ -841,11 +935,15 @@ func (n *Node) addMember(ctx context.Context, addr string, raftID uint64, nodeID | |||
return n.configure(ctx, cc) | |||
} | |||
|
|||
// updateMember submits a configuration change to change a member's address. | |||
func (n *Node) updateMember(ctx context.Context, addr string, raftID uint64, nodeID string) error { | |||
// updateNodeBlocking runs synchronous job to update node addres in whole cluster. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: address
@@ -435,9 +497,36 @@ func (n *Node) Run(ctx context.Context) error { | |||
// saveToStorage. | |||
if !raft.IsEmptySnap(rd.Snapshot) { | |||
// Load the snapshot data into the store | |||
if err := n.restoreFromSnapshot(rd.Snapshot.Data, false); err != nil { | |||
snapCluster, err := n.clusterSnapshot(rd.Snapshot.Data) | |||
if err != nil { | |||
log.G(ctx).WithError(err).Error("failed to restore from snapshot") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this clear snapCluster
? It seems bad to use it below if an error was returned.
if err := p.updateAddr(addr); err != nil { | ||
return err | ||
} | ||
log.G(t.ctx).Debugf("peer %x updated to address %s, it will be used if old failed", id, addr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my testing, with a three node cluster that has one node down, I'm seeing this log line every second in each of the two remaining managers' logs (referring to the other remaining manager in each case). This doesn't seem right because neither address has changed.
I tested the address change detection and it seems to work, except for the spammy log message. |
@aaronlehmann I've fixed message and other your comments. Thanks for review and testing! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This package is separate grpc transport layer for raft package. Before we used membership package + one very big method in raft package. Signed-off-by: Alexander Morozov <[email protected]>
Ok, I'm trying last time with docker and then merging. |
Only TestSwarmNetworkPlugin fails which is expected after #1856 |
I'm trying to come up with a smaller testable package for raft transport to simplify membership.Cluster and have better coverage. It's just a start, and it doesn't pass the linters or whatever for now. Will appreciate any feedback.
ping @aaronlehmann