From f422ae5f9990060e0b7d304b2ad075c498c20b7f Mon Sep 17 00:00:00 2001 From: enfein <83481737+enfein@users.noreply.github.com> Date: Thu, 31 Aug 2023 23:18:19 +0000 Subject: [PATCH] protocolv2: fix a lock leak in UDP underlay --- pkg/protocolv2/mux_test.go | 4 ++-- pkg/protocolv2/segment.go | 2 +- pkg/protocolv2/session.go | 13 ++++++------- pkg/protocolv2/underlay_udp.go | 2 ++ 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/pkg/protocolv2/mux_test.go b/pkg/protocolv2/mux_test.go index 5630dfb9..c5b07137 100644 --- a/pkg/protocolv2/mux_test.go +++ b/pkg/protocolv2/mux_test.go @@ -196,7 +196,7 @@ func TestIPv4UDPUnderlay(t *testing.T) { transportProtocol: netutil.UDPTransport, remoteAddr: &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: port}, } - runClient(t, clientDescriptor, []byte("xiaochitang"), []byte("kuiranbudong"), 1) + runClient(t, clientDescriptor, []byte("xiaochitang"), []byte("kuiranbudong"), 2) if err := serverMux.Close(); err != nil { t.Errorf("Server mux close failed: %v", err) } @@ -234,7 +234,7 @@ func TestIPv6UDPUnderlay(t *testing.T) { transportProtocol: netutil.UDPTransport, remoteAddr: &net.UDPAddr{IP: net.ParseIP("::1"), Port: port}, } - runClient(t, clientDescriptor, []byte("xiaochitang"), []byte("kuiranbudong"), 1) + runClient(t, clientDescriptor, []byte("xiaochitang"), []byte("kuiranbudong"), 2) if err := serverMux.Close(); err != nil { t.Errorf("Server mux close failed: %v", err) } diff --git a/pkg/protocolv2/segment.go b/pkg/protocolv2/segment.go index bb666beb..9c727601 100644 --- a/pkg/protocolv2/segment.go +++ b/pkg/protocolv2/segment.go @@ -33,7 +33,7 @@ const ( MaxPDU = 16 * 1024 // Maxinum number of transmission before marking the session as dead. - txCountLimit = 10 + txCountLimit = 20 // Number of fast ack received before retransmission. fastAckLimit = 3 diff --git a/pkg/protocolv2/session.go b/pkg/protocolv2/session.go index a00d1e8f..2f081a49 100644 --- a/pkg/protocolv2/session.go +++ b/pkg/protocolv2/session.go @@ -35,10 +35,10 @@ import ( const ( segmentTreeCapacity = 4096 segmentChanCapacity = 256 - minWindowSize = 32 + minWindowSize = 16 maxWindowSize = 4096 segmentPollInterval = 10 * time.Millisecond - segmentAckDelay = 20 * time.Millisecond + segmentAckDelay = 50 * time.Millisecond ) type sessionState int @@ -73,7 +73,6 @@ type Session struct { nextSeq uint32 // next sequence number to send a segment nextRecv uint32 // next sequence number to receive - unackSeq uint32 // unacknowledged sequence number lastTXTime time.Time // last timestamp when a segment is sent unreadBuf []byte // payload removed from the recvQueue that haven't been read by application @@ -276,7 +275,7 @@ func (s *Session) Write(b []byte) (n int, err error) { }, sessionID: s.id, seq: s.nextSeq, - unAckSeq: s.unackSeq, + unAckSeq: s.nextRecv, windowSize: uint16(mathext.Max(0, int(s.sendAlgorithm.CongestionWindowSize())-s.recvBuf.Len())), fragment: uint8(i), payloadLen: uint16(partLen), @@ -478,8 +477,8 @@ func (s *Session) runOutputLoop(ctx context.Context) error { metadata: &dataAckStruct{ baseStruct: baseStruct, sessionID: s.id, - seq: s.unackSeq, - unAckSeq: s.unackSeq, + seq: uint32(mathext.Max(0, int(s.nextRecv)-1)), + unAckSeq: s.nextRecv, windowSize: uint16(mathext.Max(0, int(s.sendAlgorithm.CongestionWindowSize())-s.recvBuf.Len())), }, } @@ -588,8 +587,8 @@ func (s *Session) inputAck(seg *segment) error { } s.rttStat.UpdateRTT(time.Since(seg2.txTime)) s.sendAlgorithm.OnAck() - s.remoteWindowSize = seg2.metadata.(*dataAckStruct).windowSize } + s.remoteWindowSize = das.windowSize return nil default: return fmt.Errorf("unsupported transport protocol %v", s.conn.TransportProtocol()) diff --git a/pkg/protocolv2/underlay_udp.go b/pkg/protocolv2/underlay_udp.go index 78d7a8e6..fb01698a 100644 --- a/pkg/protocolv2/underlay_udp.go +++ b/pkg/protocolv2/underlay_udp.go @@ -533,9 +533,11 @@ func (u *UDPUnderlay) writeOneSegment(seg *segment, addr *net.UDPAddr) error { u.sessionLock.Lock() session, ok := u.sessionMap[sessionID] if !ok { + u.sessionLock.Unlock() return fmt.Errorf("session %d not found", sessionID) } if session.block == nil { + u.sessionLock.Unlock() return fmt.Errorf("%v cipher block is not ready: %w", session, stderror.ErrNotReady) } else { blockCipher = session.block