Skip to content

Commit

Permalink
Remove PauseAccept of the reverse proxy from the e2e test
Browse files Browse the repository at this point in the history
Part of the patches to fix etcd-io#17737

During the development of etcd-io#17938,
we agreed that during the transition to L7 forward proxy, unused
features and features targeting L4 reverse proxy will be dropped.
  • Loading branch information
henrybear327 committed Sep 25, 2024
1 parent 1a08fb2 commit 5212b13
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 111 deletions.
64 changes: 2 additions & 62 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ type Server interface {
// Close closes listener and transport.
Close() error

// PauseAccept stops accepting new connections.
PauseAccept()
// UnpauseAccept removes pause operation on accepting new connections.
UnpauseAccept()

// DelayAccept adds latency ± random variable to accepting
// new incoming connections.
DelayAccept(latency, rv time.Duration)
Expand Down Expand Up @@ -115,16 +110,6 @@ type Server interface {
// UnblackholeRx removes blackhole operation on "receiving".
UnblackholeRx()

// PauseTx stops "forwarding" packets; "outgoing" traffic blocks.
PauseTx()
// UnpauseTx removes "forwarding" pause operation.
UnpauseTx()

// PauseRx stops "receiving" packets; "incoming" traffic blocks.
PauseRx()
// UnpauseRx removes "receiving" pause operation.
UnpauseRx()

// ResetListener closes and restarts listener.
ResetListener() error
}
Expand Down Expand Up @@ -164,9 +149,6 @@ type server struct {
listenerMu sync.RWMutex
listener net.Listener

pauseAcceptMu sync.Mutex
pauseAcceptc chan struct{}

latencyAcceptMu sync.RWMutex
latencyAccept time.Duration

Expand Down Expand Up @@ -208,9 +190,8 @@ func NewServer(cfg ServerConfig) Server {
donec: make(chan struct{}),
errc: make(chan error, 16),

pauseAcceptc: make(chan struct{}),
pauseTxc: make(chan struct{}),
pauseRxc: make(chan struct{}),
pauseTxc: make(chan struct{}),
pauseRxc: make(chan struct{}),
}

_, fromPort, err := net.SplitHostPort(cfg.From.Host)
Expand All @@ -233,7 +214,6 @@ func NewServer(cfg ServerConfig) Server {
s.retryInterval = defaultRetryInterval
}

close(s.pauseAcceptc)
close(s.pauseTxc)
close(s.pauseRxc)

Expand Down Expand Up @@ -290,15 +270,6 @@ func (s *server) listenAndServe() {
close(s.readyc)

for {
s.pauseAcceptMu.Lock()
pausec := s.pauseAcceptc
s.pauseAcceptMu.Unlock()
select {
case <-pausec:
case <-s.donec:
return
}

s.latencyAcceptMu.RLock()
lat := s.latencyAccept
s.latencyAcceptMu.RUnlock()
Expand Down Expand Up @@ -645,37 +616,6 @@ func (s *server) Close() (err error) {
return err
}

func (s *server) PauseAccept() {
s.pauseAcceptMu.Lock()
s.pauseAcceptc = make(chan struct{})
s.pauseAcceptMu.Unlock()

s.lg.Info(
"paused accept",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) UnpauseAccept() {
s.pauseAcceptMu.Lock()
select {
case <-s.pauseAcceptc: // already unpaused
case <-s.donec:
s.pauseAcceptMu.Unlock()
return
default:
close(s.pauseAcceptc)
}
s.pauseAcceptMu.Unlock()

s.lg.Info(
"unpaused accept",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) DelayAccept(latency, rv time.Duration) {
if latency <= 0 {
return
Expand Down
49 changes: 0 additions & 49 deletions pkg/proxy/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,55 +234,6 @@ func testServerDelayAccept(t *testing.T, secure bool) {
}
}

func TestServer_PauseTx(t *testing.T) {
lg := zaptest.NewLogger(t)
scheme := "unix"
srcAddr, dstAddr := newUnixAddr(), newUnixAddr()
defer func() {
os.RemoveAll(srcAddr)
os.RemoveAll(dstAddr)
}()
ln := listen(t, scheme, dstAddr, transport.TLSInfo{})
defer ln.Close()

p := NewServer(ServerConfig{
Logger: lg,
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
})

waitForServer(t, p)

defer p.Close()

p.PauseTx()

data := []byte("Hello World!")
send(t, data, scheme, srcAddr, transport.TLSInfo{})

recvc := make(chan []byte, 1)
go func() {
recvc <- receive(t, ln)
}()

select {
case d := <-recvc:
t.Fatalf("received unexpected data %q during pause", string(d))
case <-time.After(200 * time.Millisecond):
}

p.UnpauseTx()

select {
case d := <-recvc:
if !bytes.Equal(data, d) {
t.Fatalf("expected %q, got %q", string(data), string(d))
}
case <-time.After(2 * time.Second):
t.Fatal("took too long to receive after unpause")
}
}

func TestServer_ModifyTx_corrupt(t *testing.T) {
lg := zaptest.NewLogger(t)
scheme := "unix"
Expand Down

0 comments on commit 5212b13

Please sign in to comment.