Skip to content

Commit

Permalink
protocolv2: fix a lock leak in UDP underlay
Browse files Browse the repository at this point in the history
  • Loading branch information
enfein committed Aug 31, 2023
1 parent 3755ef3 commit f422ae5
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 10 deletions.
4 changes: 2 additions & 2 deletions pkg/protocolv2/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func TestIPv4UDPUnderlay(t *testing.T) {
transportProtocol: netutil.UDPTransport,
remoteAddr: &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: port},
}
runClient(t, clientDescriptor, []byte("xiaochitang"), []byte("kuiranbudong"), 1)
runClient(t, clientDescriptor, []byte("xiaochitang"), []byte("kuiranbudong"), 2)
if err := serverMux.Close(); err != nil {
t.Errorf("Server mux close failed: %v", err)
}
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestIPv6UDPUnderlay(t *testing.T) {
transportProtocol: netutil.UDPTransport,
remoteAddr: &net.UDPAddr{IP: net.ParseIP("::1"), Port: port},
}
runClient(t, clientDescriptor, []byte("xiaochitang"), []byte("kuiranbudong"), 1)
runClient(t, clientDescriptor, []byte("xiaochitang"), []byte("kuiranbudong"), 2)
if err := serverMux.Close(); err != nil {
t.Errorf("Server mux close failed: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/protocolv2/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
MaxPDU = 16 * 1024

// Maxinum number of transmission before marking the session as dead.
txCountLimit = 10
txCountLimit = 20

// Number of fast ack received before retransmission.
fastAckLimit = 3
Expand Down
13 changes: 6 additions & 7 deletions pkg/protocolv2/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ import (
const (
segmentTreeCapacity = 4096
segmentChanCapacity = 256
minWindowSize = 32
minWindowSize = 16
maxWindowSize = 4096
segmentPollInterval = 10 * time.Millisecond
segmentAckDelay = 20 * time.Millisecond
segmentAckDelay = 50 * time.Millisecond
)

type sessionState int
Expand Down Expand Up @@ -73,7 +73,6 @@ type Session struct {

nextSeq uint32 // next sequence number to send a segment
nextRecv uint32 // next sequence number to receive
unackSeq uint32 // unacknowledged sequence number
lastTXTime time.Time // last timestamp when a segment is sent
unreadBuf []byte // payload removed from the recvQueue that haven't been read by application

Expand Down Expand Up @@ -276,7 +275,7 @@ func (s *Session) Write(b []byte) (n int, err error) {
},
sessionID: s.id,
seq: s.nextSeq,
unAckSeq: s.unackSeq,
unAckSeq: s.nextRecv,
windowSize: uint16(mathext.Max(0, int(s.sendAlgorithm.CongestionWindowSize())-s.recvBuf.Len())),
fragment: uint8(i),
payloadLen: uint16(partLen),
Expand Down Expand Up @@ -478,8 +477,8 @@ func (s *Session) runOutputLoop(ctx context.Context) error {
metadata: &dataAckStruct{
baseStruct: baseStruct,
sessionID: s.id,
seq: s.unackSeq,
unAckSeq: s.unackSeq,
seq: uint32(mathext.Max(0, int(s.nextRecv)-1)),
unAckSeq: s.nextRecv,
windowSize: uint16(mathext.Max(0, int(s.sendAlgorithm.CongestionWindowSize())-s.recvBuf.Len())),
},
}
Expand Down Expand Up @@ -588,8 +587,8 @@ func (s *Session) inputAck(seg *segment) error {
}
s.rttStat.UpdateRTT(time.Since(seg2.txTime))
s.sendAlgorithm.OnAck()
s.remoteWindowSize = seg2.metadata.(*dataAckStruct).windowSize
}
s.remoteWindowSize = das.windowSize
return nil
default:
return fmt.Errorf("unsupported transport protocol %v", s.conn.TransportProtocol())
Expand Down
2 changes: 2 additions & 0 deletions pkg/protocolv2/underlay_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,9 +533,11 @@ func (u *UDPUnderlay) writeOneSegment(seg *segment, addr *net.UDPAddr) error {
u.sessionLock.Lock()
session, ok := u.sessionMap[sessionID]
if !ok {
u.sessionLock.Unlock()
return fmt.Errorf("session %d not found", sessionID)
}
if session.block == nil {
u.sessionLock.Unlock()
return fmt.Errorf("%v cipher block is not ready: %w", session, stderror.ErrNotReady)
} else {
blockCipher = session.block
Expand Down

0 comments on commit f422ae5

Please sign in to comment.