diff --git a/internal/transport/client_stream.go b/internal/transport/client_stream.go index 6a74c805008c..8ed347c54195 100644 --- a/internal/transport/client_stream.go +++ b/internal/transport/client_stream.go @@ -44,8 +44,8 @@ type ClientStream struct { header metadata.MD // the received header metadata noHeaders bool // set if the client never received headers (set only after the stream is done). - bytesReceived uint32 // indicates whether any bytes have been received on this stream - unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream + bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream + unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream status *status.Status // the status error received from the server } @@ -79,13 +79,13 @@ func (s *ClientStream) Write(hdr []byte, data mem.BufferSlice, opts *WriteOption // BytesReceived indicates whether any bytes have been received on this stream. func (s *ClientStream) BytesReceived() bool { - return atomic.LoadUint32(&s.bytesReceived) == 1 + return s.bytesReceived.Load() } // Unprocessed indicates whether the server did not process this stream -- // i.e. it sent a refused stream or GOAWAY including this stream ID. func (s *ClientStream) Unprocessed() bool { - return atomic.LoadUint32(&s.unprocessed) == 1 + return s.unprocessed.Load() } func (s *ClientStream) waitOnHeader() { diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 7e2e8d08a366..e08627ad01db 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -765,7 +765,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS return } // The stream was unprocessed by the server. - atomic.StoreUint32(&s.unprocessed, 1) + s.unprocessed.Store(true) s.write(recvMsg{err: err}) close(s.done) // If headerChan isn't closed, then close it. @@ -1231,7 +1231,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { } if f.ErrCode == http2.ErrCodeRefusedStream { // The stream was unprocessed by the server. - atomic.StoreUint32(&s.unprocessed, 1) + s.unprocessed.Store(true) } statusCode, ok := http2ErrConvTab[f.ErrCode] if !ok { @@ -1376,7 +1376,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error { for streamID, stream := range t.activeStreams { if streamID > id && streamID <= upperLimit { // The stream was unprocessed by the server. - atomic.StoreUint32(&stream.unprocessed, 1) + stream.unprocessed.Store(true) streamsToClose = append(streamsToClose, stream) } } @@ -1428,7 +1428,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { return } endStream := frame.StreamEnded() - atomic.StoreUint32(&s.bytesReceived, 1) + s.bytesReceived.Store(true) initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0 if !initialHeader && !endStream {