Skip to content

Commit

Permalink
Added end2end test
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans committed Mar 7, 2024
1 parent 65454f5 commit 4d6f00f
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 121 deletions.
6 changes: 0 additions & 6 deletions internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,12 +453,6 @@ func (c *controlBuffer) finish() {
c.mu.Unlock()
}

func (c *controlBuffer) setError(err error) {
c.mu.Lock()
c.err = err
c.mu.Unlock()
}

type side int

const (
Expand Down
37 changes: 4 additions & 33 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,6 @@ type http2Client struct {
nextID uint32
registeredCompressors string

// goAwaySent is initialised with http2Client and fired when client
// transport is shutdown
goAwaySent *grpcsync.Event

// Do not access controlBuf with mu held.
mu sync.Mutex // guard the following variables
state transportState
Expand Down Expand Up @@ -331,7 +327,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
registeredCompressors: grpcutil.RegisteredCompressors(),
address: addr,
conn: conn,
goAwaySent: grpcsync.NewEvent(),
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
authInfo: authInfo,
Expand Down Expand Up @@ -456,12 +451,15 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
if err := t.loopy.run(); !isIOError(err) {
// Immediately close the connection, as the loopy writer returns
// when there are no more active streams and we were draining (the
// server sent a GOAWAY). For I/O errors, the reader will hit it
// after draining any remaining incoming data.
err = t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte("GOAWAY from client"))
if err != nil {
t.logger.Infof("Failed writing goaway %v", err)
}
t.conn.Close()
}
close(t.writerDone)
Expand Down Expand Up @@ -514,20 +512,6 @@ func (t *http2Client) getPeer() *peer.Peer {
}
}

// Handles outgoing GoAway and returns true if loopy needs to put itself
// in draining mode.
func (t *http2Client) outgoingGoAwayHandler(g *goAway) (bool, error) {
// Send out a GOAWAY frame so server is aware of client transport shutdown
if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil {
return false, err
}
t.goAwaySent.Fire()
if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
return false, err
}
return false, nil
}

func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
aud := t.createAudience(callHdr)
ri := credentials.RequestInfo{
Expand Down Expand Up @@ -1003,19 +987,6 @@ func (t *http2Client) Close(err error) {
}
t.mu.Unlock()
t.controlBuf.finish()
t.controlBuf.setError(nil)
err1 := t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("GOAWAY from client"), headsUp: true})
if err1 != nil {
t.logger.Infof("Failed to put goaway to server: %v\n", err1)
}
timer := time.NewTimer(2 * time.Second)
defer timer.Stop()
select {
case <-t.goAwaySent.Done():
case <-timer.C:
case <-t.ctx.Done():
}
t.controlBuf.setError(ErrConnClosing)
t.cancel()
t.conn.Close()
channelz.RemoveEntry(t.channelzID)
Expand Down
82 changes: 0 additions & 82 deletions internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,88 +1188,6 @@ func (s) TestServerConnDecoupledFromApplicationRead(t *testing.T) {

}

// Test that a transport level client shutdown successfully sends a GOAWAY frame
// to underlying connection.
func (s) TestClientSendsAGoAwayFrame(t *testing.T) {
// Create a server.
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening: %v", err)
}
defer lis.Close()
// The success channel verifies that the server's reader goroutine received
// a GOAWAY frame from the client.
success := testutils.NewChannel()
// Launch the server.
go func() {
sconn, err := lis.Accept()
if err != nil {
t.Errorf("Error while accepting: %v", err)
}
defer sconn.Close()
if _, err := io.ReadFull(sconn, make([]byte, len(clientPreface))); err != nil {
t.Errorf("Error while writing settings ack: %v", err)
return
}
sfr := http2.NewFramer(sconn, sconn)
if err := sfr.WriteSettings(); err != nil {
t.Errorf("Error while writing settings %v", err)
return
}
if err := sfr.WriteSettingsAck(); err != nil {
t.Errorf("Error while writing settings ack %v", err)
return
}

// Read frames off the wire. Should expect to see a GOAWAY frame after
// the client closes.
for {
frame, err := sfr.ReadFrame()
if err != nil {
return
}
switch frame.(type) {
case *http2.SettingsFrame:
// Do nothing. A settings frame is expected from client preface.
case *http2.GoAwayFrame:
// Records that the server successfully received a GOAWAY frame.
success.Send(nil)
return
default:
// The client should have sent nothing but settings and GOAWAY frame.
success.Send(errors.New("the server received a frame other than settings or GOAWAY"))
return
}
}
}()
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer cancel()

copts := ConnectOptions{ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil)}
ct, err := NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {})
if err != nil {
t.Fatalf("Error while creating client transport: %v", err)
}
callHdr := &CallHdr{
Host: "localhost",
Method: "foo.Small",
}
s1, err1 := ct.NewStream(connectCtx, callHdr)
if err1 != nil {
t.Fatalf("failed to open stream: %v", err1)
}
if s1.id != 1 {
t.Fatalf("wrong stream id: %d", s1.id)
}
ct.Close(errors.New("manually closed by client"))
t.Logf("Closed the client connection")
if e, err := success.Receive(connectCtx); e != nil || err != nil {
t.Fatalf("Error in frame received: %v. Error receiving from channel: %v", e, err)
} else {
t.Logf("Server received the GOAWAY from client")
}
}

func (s) TestServerWithMisbehavedClient(t *testing.T) {
server := setUpServerOnly(t, 0, &ServerConfig{}, suspended)
defer server.stop()
Expand Down
47 changes: 47 additions & 0 deletions test/goaway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,3 +761,50 @@ func (s) TestTwoGoAwayPingFrames(t *testing.T) {
t.Fatalf("Error waiting for graceful shutdown of the server: %v", err)
}
}

func (s) TestClientSendsAGoAwayFrame(t *testing.T) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to listen: %v", err)
}
defer lis.Close()
s := grpc.NewServer()
defer s.Stop()
go s.Serve(lis)

conn, err := net.DialTimeout("tcp", lis.Addr().String(), defaultTestTimeout)
if err != nil {
t.Fatalf("Failed to dial: %v", err)
}

st := newServerTesterFromConn(t, conn)
st.greet()
goAwayFrameReceived := testutils.NewChannel()
go func() {
for {
f, err := st.readFrame()
if err != nil {
return
}
switch f.(type) {
case *http2.GoAwayFrame:
t.Logf("Received GoAway Frame")
goAwayFrameReceived.Send(nil)
default:
t.Logf("server tester received unexpected frame type %T", f)
}
}
}()
go func() {
s.GracefulStop()
}()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := goAwayFrameReceived.Receive(ctx); err != nil {
t.Fatalf("Error sending GOAWAY from client: %v", err)
}
// Write Ping
st.writePing(true, [8]byte{1, 6, 1, 8, 0, 3, 3, 9})
// Close the conn to finish up the Shutdown process.
conn.Close()
}

0 comments on commit 4d6f00f

Please sign in to comment.