Skip to content

Commit

Permalink
Merge #30508
Browse files Browse the repository at this point in the history
30508: pgwire: make conn hold most fields by value, not reference r=nvanbenschoten a=nvanbenschoten

This allows their lifetimes to be more obviously scoped together, reduces the number of pointers we have lying around for each SQL connection, and enables future optimization where we can put entire `pgwire.conn` objects in a pool.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Sep 25, 2018
2 parents fc5ee01 + ab71616 commit f06556d
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 16 deletions.
8 changes: 7 additions & 1 deletion pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,15 @@ var _ Command = SendError{}
// NewStmtBuf creates a StmtBuf.
func NewStmtBuf() *StmtBuf {
var buf StmtBuf
buf.Init()
return &buf
}

// Init initializes a StmtBuf. It exists to avoid the allocation imposed by
// NewStmtBuf.
func (buf *StmtBuf) Init() {
buf.mu.lastPos = -1
buf.mu.cond = sync.NewCond(&buf.mu.Mutex)
return &buf
}

// Close marks the buffer as closed. Once Close() is called, no further push()es
Expand Down
18 changes: 9 additions & 9 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type conn struct {
rd bufio.Reader

// stmtBuf is populated with commands queued for execution by this conn.
stmtBuf *sql.StmtBuf
stmtBuf sql.StmtBuf

// err is an error, accessed atomically. It represents any error encountered
// while accessing the underlying network connection. This can read via
Expand All @@ -92,7 +92,7 @@ type conn struct {
}

readBuf pgwirebase.ReadBuffer
msgBuilder *writeBuffer
msgBuilder writeBuffer
}

// serveConn creates a conn that will serve the netConn. It returns once the
Expand Down Expand Up @@ -169,16 +169,16 @@ func newConn(
) *conn {
c := &conn{
conn: netConn,
stmtBuf: sql.NewStmtBuf(),
sessionArgs: sArgs,
msgBuilder: newWriteBuffer(metrics.BytesOutCount),
execCfg: execCfg,
metrics: metrics,
rd: *bufio.NewReader(netConn),
execCfg: execCfg,
}
c.stmtBuf.Init()
c.writerState.fi.buf = &c.writerState.buf
c.writerState.fi.lastFlushed = -1
c.writerState.fi.cmdStarts = make(map[sql.CmdPos]int)
c.msgBuilder.init(metrics.BytesOutCount)

return c
}
Expand Down Expand Up @@ -266,7 +266,7 @@ func (c *conn) serveImpl(
wg.Add(1)
go func() {
writerErr = sqlServer.ServeConn(
processorCtx, c.sessionArgs, c.stmtBuf, c, reserved, c.metrics.SQLMemMetrics, stopProcessor)
processorCtx, c.sessionArgs, &c.stmtBuf, c, reserved, c.metrics.SQLMemMetrics, stopProcessor)
// TODO(andrei): Should we sometimes transmit the writerErr's to the
// client?
wg.Done()
Expand Down Expand Up @@ -377,7 +377,7 @@ Loop:
// and flushing the buffer.
if ctxCanceled || draining() {
_ /* err */ = writeErr(
newAdminShutdownErr(err), c.msgBuilder, &c.writerState.buf)
newAdminShutdownErr(err), &c.msgBuilder, &c.writerState.buf)
_ /* n */, _ /* err */ = c.writerState.buf.WriteTo(c.conn)

// Swallow whatever error we might have gotten from the writer. If we're
Expand Down Expand Up @@ -900,7 +900,7 @@ func (c *conn) bufferCommandComplete(tag []byte) {
}

func (c *conn) bufferErr(err error) {
if err := writeErr(err, c.msgBuilder, &c.writerState.buf); err != nil {
if err := writeErr(err, &c.msgBuilder, &c.writerState.buf); err != nil {
panic(fmt.Sprintf("unexpected err from buffer: %s", err))
}
}
Expand Down Expand Up @@ -1224,7 +1224,7 @@ func (r *pgwireReader) ReadByte() (byte, error) {
func (c *conn) handleAuthentication(ctx context.Context, insecure bool) error {

sendError := func(err error) error {
_ /* err */ = writeErr(err, c.msgBuilder, c.conn)
_ /* err */ = writeErr(err, &c.msgBuilder, c.conn)
return err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/pgwire/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestConn(t *testing.T) {

// Now we'll expect to receive the commands corresponding to the operations in
// client().
rd := sql.MakeStmtBufReader(conn.stmtBuf)
rd := sql.MakeStmtBufReader(&conn.stmtBuf)
expectExecStmt(ctx, t, "SELECT 1", &rd, conn, queryStringComplete)
expectSync(ctx, t, &rd)
expectExecStmt(ctx, t, "SELECT 2", &rd, conn, queryStringComplete)
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestConn(t *testing.T) {
// processPgxStartup processes the first few queries that the pgx driver
// automatically sends on a new connection that has been established.
func processPgxStartup(ctx context.Context, s serverutils.TestServerInterface, c *conn) error {
rd := sql.MakeStmtBufReader(c.stmtBuf)
rd := sql.MakeStmtBufReader(&c.stmtBuf)

for {
cmd, err := rd.CurCmd()
Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/pgwire/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,17 @@ type writeBuffer struct {
}

func newWriteBuffer(bytecount *metric.Counter) *writeBuffer {
b := &writeBuffer{
bytecount: bytecount,
}
b.textFormatter = tree.MakeFmtCtx(&b.variablePutbuf, tree.FmtPgwireText)
b := new(writeBuffer)
b.init(bytecount)
return b
}

// init exists to avoid the allocation imposed by newWriteBuffer.
func (b *writeBuffer) init(bytecount *metric.Counter) {
b.bytecount = bytecount
b.textFormatter = tree.MakeFmtCtx(&b.variablePutbuf, tree.FmtPgwireText)
}

// Write implements the io.Write interface.
func (b *writeBuffer) Write(p []byte) (int, error) {
b.write(p)
Expand Down

0 comments on commit f06556d

Please sign in to comment.