Skip to content

Commit

Permalink
Remove latency accept
Browse files Browse the repository at this point in the history
The reason for removing latency accept is that we are operating on L7
now, so we are not going to hijack L4-level accept() function call.

Also, the initial implementation has a bug where if connections are not
created consecutively, the latency accept will not work, as what will
happen is:
a) set latency accept
b) latency in effect
c) accept waiting for new connections
d) new connection comes in, establish it
e) go to b -> so the time spent on waiting for a new connection is
actually counted as the "latency accept" time, not just the time between
the connection request is initialized by the client and the time the
server actually accepts it.

Signed-off-by: Chun-Hung Tseng <[email protected]>
  • Loading branch information
henrybear327 committed Sep 25, 2024
1 parent 8d627e2 commit 26b6772
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 166 deletions.
133 changes: 6 additions & 127 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,6 @@ type Server interface {
// Close closes listener and transport.
Close() error

// DelayAccept adds latency ± random variable to accepting
// new incoming connections.
DelayAccept(latency, rv time.Duration)
// UndelayAccept removes sending latencies.
UndelayAccept()
// LatencyAccept returns current latency on accepting
// new incoming connections.
LatencyAccept() time.Duration

// DelayTx adds latency ± random variable for "outgoing" traffic
// in "sending" layer.
DelayTx(latency, rv time.Duration)
Expand Down Expand Up @@ -151,10 +142,7 @@ type server struct {
closeHijackedConn sync.WaitGroup

listenerMu sync.RWMutex
listener *customListener

latencyAcceptMu sync.RWMutex
latencyAccept time.Duration
listener *net.Listener

modifyTxMu sync.RWMutex
modifyTx func(data []byte) []byte
Expand Down Expand Up @@ -242,10 +230,7 @@ func NewServer(cfg ServerConfig) Server {
return nil
}

s.listener = &customListener{
s: s,
l: &ln,
}
s.listener = &ln

go func() {
defer s.closeWg.Done()
Expand All @@ -256,7 +241,7 @@ func NewServer(cfg ServerConfig) Server {

s.lg.Info("proxy is listening on", zap.String("listen on", s.Listen()))
close(s.readyc)
if err := s.httpServer.Serve(s.listener); err != http.ErrServerClosed {
if err := s.httpServer.Serve(*s.listener); err != http.ErrServerClosed {
// always returns error. ErrServerClosed on graceful close
panic(fmt.Sprintf("startHTTPServer Serve(): %v", err))
}
Expand All @@ -266,74 +251,6 @@ func NewServer(cfg ServerConfig) Server {
return s
}

// Because we are implementing L7 proxy, but would like to keep the L4 features,
// thus, we need to encapsulate the L4 functionalities in our custom Listener
type customListener struct {
s *server
l *net.Listener
}

func (c *customListener) Accept() (net.Conn, error) {
// we implement the L4 features here
c.s.latencyAcceptMu.RLock()
lat := c.s.latencyAccept
c.s.lg.Info(
"get accept latency",
zap.Duration("latency", lat),
)
c.s.latencyAcceptMu.RUnlock()
if lat > 0 {
select {
case <-time.After(lat):
case <-c.s.donec:
return nil, fmt.Errorf("listener is closed")
}
}

c.s.listenerMu.RLock()
conn, err := (*c.l).Accept()
c.s.listenerMu.RUnlock()
if err != nil {
select {
case c.s.errc <- err:
select {
case <-c.s.donec:
return nil, err
default:
}
case <-c.s.donec:
return nil, err
}
c.s.lg.Debug("listener accept error", zap.Error(err))

if strings.HasSuffix(err.Error(), "use of closed network connection") {
select {
case <-time.After(c.s.retryInterval):
case <-c.s.donec:
return nil, err
}
c.s.lg.Debug("listener is closed")
}
}

return conn, err
}

// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
func (c *customListener) Close() error {
c.s.listenerMu.RLock()
defer c.s.listenerMu.RUnlock()
return (*c.l).Close()
}

// Addr returns the listener's network address.
func (c *customListener) Addr() net.Addr {
c.s.listenerMu.RLock()
defer c.s.listenerMu.RUnlock()
return (*c.l).Addr()
}

type serverHandler struct {
s *server
}
Expand Down Expand Up @@ -511,7 +428,7 @@ func (s *server) ioCopy(dst, src net.Conn, ptype proxyType, peerPort int) {
}
data := buf[:nr1]

// alters/corrupts/drops data
// drops data
switch ptype {
case proxyTx:
s.modifyTxMu.RLock()
Expand Down Expand Up @@ -550,15 +467,15 @@ func (s *server) ioCopy(dst, src net.Conn, ptype proxyType, peerPort int) {
switch ptype {
case proxyTx:
s.lg.Debug(
"modified tx",
"proxyTx",
zap.String("data-received", humanize.Bytes(uint64(nr1))),
zap.String("data-modified", humanize.Bytes(uint64(nr2))),
zap.String("proxy listening on", s.Listen()),
zap.Int("to peer port", peerPort),
)
case proxyRx:
s.lg.Debug(
"modified rx",
"proxyRx",
zap.String("data-received", humanize.Bytes(uint64(nr1))),
zap.String("data-modified", humanize.Bytes(uint64(nr2))),
zap.String("proxy listening on", s.Listen()),
Expand Down Expand Up @@ -706,44 +623,6 @@ func (s *server) Close() (err error) {
return err
}

func (s *server) DelayAccept(latency, rv time.Duration) {
if latency <= 0 {
return
}
d := computeLatency(latency, rv)
s.latencyAcceptMu.Lock()
s.latencyAccept = d
s.latencyAcceptMu.Unlock()

s.lg.Info(
"set accept latency",
zap.Duration("latency", d),
zap.Duration("given-latency", latency),
zap.Duration("given-latency-random-variable", rv),
zap.String("proxy listening on", s.Listen()),
)
}

func (s *server) UndelayAccept() {
s.latencyAcceptMu.Lock()
d := s.latencyAccept
s.latencyAccept = 0
s.latencyAcceptMu.Unlock()

s.lg.Info(
"removed accept latency",
zap.Duration("latency", d),
zap.String("proxy listening on", s.Listen()),
)
}

func (s *server) LatencyAccept() time.Duration {
s.latencyAcceptMu.RLock()
d := s.latencyAccept
s.latencyAcceptMu.RUnlock()
return d
}

func (s *server) DelayTx(latency, rv time.Duration) {
if latency <= 0 {
return
Expand Down
40 changes: 1 addition & 39 deletions pkg/proxy/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ func destroy(t *testing.T, writec chan []byte, donec chan struct{}, proxyServer
select {
case <-proxyServer.Done():
case err := <-proxyServer.Error():
if !strings.HasPrefix(err.Error(), "accept ") ||
!strings.HasSuffix(err.Error(), "use of closed network connection") {
if !strings.HasPrefix(err.Error(), "accept ") && !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Fatal(err)
}
case <-time.After(3 * time.Second):
Expand Down Expand Up @@ -295,43 +294,6 @@ func testServer(t *testing.T, delayTx bool) {
}
}

func TestServer_DelayAccept(t *testing.T) {
recvc, donec, writec, proxyServer, httpServer, sendData := prepare(t, false)
defer destroy(t, writec, donec, proxyServer, false, httpServer)
go func() {
defer close(donec)
for data := range writec {
sendData(data)
}
}()

data := []byte("Hello World!")
now := time.Now()
writec <- data
if d := <-recvc; !bytes.Equal(data, d) {
t.Fatalf("expected %q, got %q", string(data), string(d))
}
took1 := time.Since(now)
t.Logf("took %v with no latency", took1)
time.Sleep(1 * time.Second) // wait for the idle connection to timeout

lat, rv := 700*time.Millisecond, 10*time.Millisecond
proxyServer.DelayAccept(lat, rv)
defer proxyServer.UndelayAccept()

now = time.Now()
writec <- data
if d := <-recvc; !bytes.Equal(data, d) {
t.Fatalf("expected %q, got %q", string(data), string(d))
}
took2 := time.Since(now)
t.Logf("took %v with latency %v±%v", took2, lat, rv)

if took1 >= took2 {
t.Fatalf("expected took1 %v < took2 %v", took1, took2)
}
}

func TestServer_BlackholeTx(t *testing.T) {
recvc, donec, writec, proxyServer, httpServer, sendData := prepare(t, false)
defer destroy(t, writec, donec, proxyServer, false, httpServer)
Expand Down

0 comments on commit 26b6772

Please sign in to comment.