Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memberlist: prepare data to send before starting counting the write timeout #89

Merged
merged 1 commit into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@
* [ENHANCEMENT] Add spanlogger package. #42
* [ENHANCEMENT] Add runutil.CloseWithLogOnErr function. #58
* [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #76 #77
* [ENHANCEMENT] Memberlist: prepare the data to send on the write before starting counting the elapsed time for `-memberlist.packet-write-timeout`, in order to reduce chances we hit the timeout when sending a packet to other node. #89
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
35 changes: 20 additions & 15 deletions kv/memberlist/tcp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,16 +439,15 @@ func (t *TCPTransport) writeTo(b []byte, addr string) error {
}
}()

if t.cfg.PacketWriteTimeout > 0 {
deadline := time.Now().Add(t.cfg.PacketWriteTimeout)
err := c.SetDeadline(deadline)
if err != nil {
return fmt.Errorf("setting deadline: %v", err)
}
}
// Compute the digest *before* setting the deadline on the connection (so that the time
// it takes to compute the digest is not taken in account).
// We use md5 as quick and relatively short hash, not in cryptographic context.
// It's also used to detect if the whole packet has been received on the receiver side.
digest := md5.Sum(b)

buf := bytes.Buffer{}
buf.WriteByte(byte(packet))
// Prepare the header *before* setting the deadline on the connection.
headerBuf := bytes.Buffer{}
headerBuf.WriteByte(byte(packet))

// We need to send our address to the other side, otherwise other side can only see IP and port from TCP header.
// But that doesn't match our node address (new TCP connection has new random port), which confuses memberlist.
Expand All @@ -459,10 +458,18 @@ func (t *TCPTransport) writeTo(b []byte, addr string) error {
return fmt.Errorf("local address too long")
}

buf.WriteByte(byte(len(ourAddr)))
buf.WriteString(ourAddr)
headerBuf.WriteByte(byte(len(ourAddr)))
headerBuf.WriteString(ourAddr)

_, err = c.Write(buf.Bytes())
if t.cfg.PacketWriteTimeout > 0 {
deadline := time.Now().Add(t.cfg.PacketWriteTimeout)
err := c.SetDeadline(deadline)
if err != nil {
return fmt.Errorf("setting deadline: %v", err)
}
}

_, err = c.Write(headerBuf.Bytes())
if err != nil {
return fmt.Errorf("sending local address: %v", err)
}
Expand All @@ -475,9 +482,7 @@ func (t *TCPTransport) writeTo(b []byte, addr string) error {
return fmt.Errorf("sending data: short write")
}

// Append digest. We use md5 as quick and relatively short hash, not in cryptographic context.
// This helped to find some bugs, so let's keep it.
digest := md5.Sum(b)
// Append digest.
n, err = c.Write(digest[:])
if err != nil {
return fmt.Errorf("digest: %v", err)
Expand Down