diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index bc71c3a16605..8166d3197680 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -59,11 +59,6 @@ type Server interface { // Close closes listener and transport. Close() error - // PauseAccept stops accepting new connections. - PauseAccept() - // UnpauseAccept removes pause operation on accepting new connections. - UnpauseAccept() - // DelayAccept adds latency ± random variable to accepting // new incoming connections. DelayAccept(latency, rv time.Duration) @@ -115,16 +110,6 @@ type Server interface { // UnblackholeRx removes blackhole operation on "receiving". UnblackholeRx() - // PauseTx stops "forwarding" packets; "outgoing" traffic blocks. - PauseTx() - // UnpauseTx removes "forwarding" pause operation. - UnpauseTx() - - // PauseRx stops "receiving" packets; "incoming" traffic blocks. - PauseRx() - // UnpauseRx removes "receiving" pause operation. - UnpauseRx() - // ResetListener closes and restarts listener. ResetListener() error } @@ -164,9 +149,6 @@ type server struct { listenerMu sync.RWMutex listener net.Listener - pauseAcceptMu sync.Mutex - pauseAcceptc chan struct{} - latencyAcceptMu sync.RWMutex latencyAccept time.Duration @@ -208,9 +190,8 @@ func NewServer(cfg ServerConfig) Server { donec: make(chan struct{}), errc: make(chan error, 16), - pauseAcceptc: make(chan struct{}), - pauseTxc: make(chan struct{}), - pauseRxc: make(chan struct{}), + pauseTxc: make(chan struct{}), + pauseRxc: make(chan struct{}), } _, fromPort, err := net.SplitHostPort(cfg.From.Host) @@ -233,7 +214,6 @@ func NewServer(cfg ServerConfig) Server { s.retryInterval = defaultRetryInterval } - close(s.pauseAcceptc) close(s.pauseTxc) close(s.pauseRxc) @@ -290,15 +270,6 @@ func (s *server) listenAndServe() { close(s.readyc) for { - s.pauseAcceptMu.Lock() - pausec := s.pauseAcceptc - s.pauseAcceptMu.Unlock() - select { - case <-pausec: - case <-s.donec: - return - } - s.latencyAcceptMu.RLock() lat := s.latencyAccept s.latencyAcceptMu.RUnlock() @@ -645,37 +616,6 @@ func (s *server) Close() (err error) { return err } -func (s *server) PauseAccept() { - s.pauseAcceptMu.Lock() - s.pauseAcceptc = make(chan struct{}) - s.pauseAcceptMu.Unlock() - - s.lg.Info( - "paused accept", - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - -func (s *server) UnpauseAccept() { - s.pauseAcceptMu.Lock() - select { - case <-s.pauseAcceptc: // already unpaused - case <-s.donec: - s.pauseAcceptMu.Unlock() - return - default: - close(s.pauseAcceptc) - } - s.pauseAcceptMu.Unlock() - - s.lg.Info( - "unpaused accept", - zap.String("from", s.From()), - zap.String("to", s.To()), - ) -} - func (s *server) DelayAccept(latency, rv time.Duration) { if latency <= 0 { return diff --git a/pkg/proxy/server_test.go b/pkg/proxy/server_test.go index d19c947c6465..90fd61305d8c 100644 --- a/pkg/proxy/server_test.go +++ b/pkg/proxy/server_test.go @@ -234,55 +234,6 @@ func testServerDelayAccept(t *testing.T, secure bool) { } } -func TestServer_PauseTx(t *testing.T) { - lg := zaptest.NewLogger(t) - scheme := "unix" - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) - }() - ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) - defer ln.Close() - - p := NewServer(ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - }) - - waitForServer(t, p) - - defer p.Close() - - p.PauseTx() - - data := []byte("Hello World!") - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - - recvc := make(chan []byte, 1) - go func() { - recvc <- receive(t, ln) - }() - - select { - case d := <-recvc: - t.Fatalf("received unexpected data %q during pause", string(d)) - case <-time.After(200 * time.Millisecond): - } - - p.UnpauseTx() - - select { - case d := <-recvc: - if !bytes.Equal(data, d) { - t.Fatalf("expected %q, got %q", string(data), string(d)) - } - case <-time.After(2 * time.Second): - t.Fatal("took too long to receive after unpause") - } -} - func TestServer_ModifyTx_corrupt(t *testing.T) { lg := zaptest.NewLogger(t) scheme := "unix"