Skip to content

Commit

Permalink
replace unprocessed and bytesReceived with atomic.Bools
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Nov 4, 2024
1 parent 660dbc3 commit 9c369a6
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
8 changes: 4 additions & 4 deletions internal/transport/client_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type ClientStream struct {
header metadata.MD // the received header metadata
noHeaders bool // set if the client never received headers (set only after the stream is done).

bytesReceived uint32 // indicates whether any bytes have been received on this stream
unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream

status *status.Status // the status error received from the server
}
Expand Down Expand Up @@ -79,13 +79,13 @@ func (s *ClientStream) Write(hdr []byte, data mem.BufferSlice, opts *WriteOption

// BytesReceived indicates whether any bytes have been received on this stream.
func (s *ClientStream) BytesReceived() bool {
return atomic.LoadUint32(&s.bytesReceived) == 1
return s.bytesReceived.Load()
}

// Unprocessed indicates whether the server did not process this stream --
// i.e. it sent a refused stream or GOAWAY including this stream ID.
func (s *ClientStream) Unprocessed() bool {
return atomic.LoadUint32(&s.unprocessed) == 1
return s.unprocessed.Load()
}

func (s *ClientStream) waitOnHeader() {
Expand Down
8 changes: 4 additions & 4 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS
return
}
// The stream was unprocessed by the server.
atomic.StoreUint32(&s.unprocessed, 1)
s.unprocessed.Store(true)
s.write(recvMsg{err: err})
close(s.done)
// If headerChan isn't closed, then close it.
Expand Down Expand Up @@ -1231,7 +1231,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
}
if f.ErrCode == http2.ErrCodeRefusedStream {
// The stream was unprocessed by the server.
atomic.StoreUint32(&s.unprocessed, 1)
s.unprocessed.Store(true)
}
statusCode, ok := http2ErrConvTab[f.ErrCode]
if !ok {
Expand Down Expand Up @@ -1376,7 +1376,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error {
for streamID, stream := range t.activeStreams {
if streamID > id && streamID <= upperLimit {
// The stream was unprocessed by the server.
atomic.StoreUint32(&stream.unprocessed, 1)
stream.unprocessed.Store(true)
streamsToClose = append(streamsToClose, stream)
}
}
Expand Down Expand Up @@ -1428,7 +1428,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}
endStream := frame.StreamEnded()
atomic.StoreUint32(&s.bytesReceived, 1)
s.bytesReceived.Store(true)
initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0

if !initialHeader && !endStream {
Expand Down

0 comments on commit 9c369a6

Please sign in to comment.