Skip to content

Commit

Permalink
ccl/sqlproxyccl: update PeekMsg to return message size instead of bod…
Browse files Browse the repository at this point in the history
…y size

Informs cockroachdb#76000. Follow-up to cockroachdb#76006.

Previously, PeekMsg was returning the body size (excluding header size), which
is a bit awkward from an API point of view because most callers of PeekMsg
immediately adds the header size to the returned size previously. This commit
cleans the API design up by making PeekMsg return the message size instead,
i.e. header inclusive. At the same time, returning the message size makes it
consistent with the ReadMsg API since that returns the entire message.

Release note: None
  • Loading branch information
jaylim-crl committed Feb 15, 2022
1 parent 0c1b7a7 commit d6348f2
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestBackendInterceptor(t *testing.T) {
typ, size, err := bi.PeekMsg()
require.NoError(t, err)
require.Equal(t, pgwirebase.ClientMsgSimpleQuery, typ)
require.Equal(t, 9, size)
require.Equal(t, 14, size)

bi.Close()
typ, size, err = bi.PeekMsg()
Expand Down
27 changes: 14 additions & 13 deletions pkg/ccl/sqlproxyccl/interceptor/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ func newPgInterceptor(src io.Reader, dst io.Writer, bufSize int) (*pgInterceptor

// PeekMsg returns the header of the current pgwire message without advancing
// the interceptor. On return, err == nil if and only if the entire header can
// be read. Note that size corresponds to the body size, and does not account
// for the size field itself. This will return ErrProtocolError if the packets
// are malformed.
// be read. The returned size corresponds to the entire message size, which
// includes the header type and body length. This will return ErrProtocolError
// if the packets are malformed.
//
// If the interceptor is closed, PeekMsg returns ErrInterceptorClosed.
func (p *pgInterceptor) PeekMsg() (typ byte, size int, err error) {
Expand All @@ -108,7 +108,9 @@ func (p *pgInterceptor) PeekMsg() (typ byte, size int, err error) {
return 0, 0, ErrProtocolError
}

return typ, size - 4, nil
// Add 1 to size to account for type. We don't need to add 4 (int length) to
// it because size is already inclusive of that.
return typ, size + 1, nil
}

// WriteMsg writes the given bytes to the writer dst. If err != nil and a Write
Expand Down Expand Up @@ -148,28 +150,27 @@ func (p *pgInterceptor) ReadMsg() (msg []byte, err error) {
return nil, ErrInterceptorClosed
}

// Peek header of the current message for body size.
// Peek header of the current message for message size.
_, size, err := p.PeekMsg()
if err != nil {
return nil, err
}
msgSizeBytes := pgHeaderSizeBytes + size

// Can the entire message fit into the buffer?
if msgSizeBytes <= len(p.buf) {
if err := p.ensureNextNBytes(msgSizeBytes); err != nil {
if size <= len(p.buf) {
if err := p.ensureNextNBytes(size); err != nil {
// Possibly due to a timeout or context cancellation.
return nil, err
}

// Return a slice to the internal buffer to avoid an allocation here.
retBuf := p.buf[p.readPos : p.readPos+msgSizeBytes]
p.readPos += msgSizeBytes
retBuf := p.buf[p.readPos : p.readPos+size]
p.readPos += size
return retBuf, nil
}

// Message cannot fit, so we will have to allocate.
msg = make([]byte, msgSizeBytes)
msg = make([]byte, size)

// Copy bytes which have already been read.
n := copy(msg, p.buf[p.readPos:p.writePos])
Expand Down Expand Up @@ -209,15 +210,15 @@ func (p *pgInterceptor) ForwardMsg() (n int, err error) {
return 0, ErrInterceptorClosed
}

// Retrieve header of the current message for body size.
// Retrieve header of the current message for message size.
_, size, err := p.PeekMsg()
if err != nil {
return 0, err
}

// Handle overflows as current message may not fit in the current buffer.
startPos := p.readPos
endPos := startPos + pgHeaderSizeBytes + size
endPos := startPos + size
remainingBytes := 0
if endPos > p.writePos {
remainingBytes = endPos - p.writePos
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/sqlproxyccl/interceptor/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func TestPGInterceptor_PeekMsg(t *testing.T) {

t.Run("successful", func(t *testing.T) {
buf := new(bytes.Buffer)
_, err := buf.Write((&pgproto3.Query{String: "SELECT 1"}).Encode(nil))
msgBytes := (&pgproto3.Query{String: "SELECT 1"}).Encode(nil)
_, err := buf.Write(msgBytes)
require.NoError(t, err)

pgi, err := newPgInterceptor(buf, nil /* dst */, 10)
Expand All @@ -104,14 +105,14 @@ func TestPGInterceptor_PeekMsg(t *testing.T) {
typ, size, err := pgi.PeekMsg()
require.NoError(t, err)
require.Equal(t, byte(pgwirebase.ClientMsgSimpleQuery), typ)
require.Equal(t, 9, size)
require.Equal(t, len(msgBytes), size)
require.Equal(t, 4, buf.Len())

// Invoking Peek should not advance the interceptor.
typ, size, err = pgi.PeekMsg()
require.NoError(t, err)
require.Equal(t, byte(pgwirebase.ClientMsgSimpleQuery), typ)
require.Equal(t, 9, size)
require.Equal(t, len(msgBytes), size)
require.Equal(t, 4, buf.Len())
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestFrontendInterceptor(t *testing.T) {
typ, size, err := fi.PeekMsg()
require.NoError(t, err)
require.Equal(t, pgwirebase.ServerMsgReady, typ)
require.Equal(t, 1, size)
require.Equal(t, 6, size)

fi.Close()
typ, size, err = fi.PeekMsg()
Expand Down

0 comments on commit d6348f2

Please sign in to comment.