Skip to content

Commit

Permalink
ccl/sqlproxyccl: add metric that records the transfer response messag…
Browse files Browse the repository at this point in the history
…e size

Informs #76000, and follow up to #76805.

This commit adds a new proxy.conn_migration.transfer_response.message_size
metric that will track the distribution of the transfer response message size.
This will be used to tune a value for the SQL cluster setting:
sql.session_transfer.max_session_size.

Release justification: Low-risk metric-only change within sqlproxy.

Release note: None
  • Loading branch information
jaylim-crl committed Mar 13, 2022
1 parent 471a9b9 commit ba4c58e
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 29 deletions.
30 changes: 23 additions & 7 deletions pkg/ccl/sqlproxyccl/conn_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (f *forwarder) runTransfer() (retErr error) {

// Transfer the connection.
clientConn, serverConn := f.getConns()
newServerConn, err := transferConnection(ctx, f.connector, clientConn, serverConn)
newServerConn, err := transferConnection(ctx, f.connector, f.metrics, clientConn, serverConn)
if err != nil {
return errors.Wrap(err, "transferring connection")
}
Expand All @@ -183,7 +183,10 @@ func (f *forwarder) runTransfer() (retErr error) {
// connection, and returns the a new connection to the server that the
// connection got transferred to.
func transferConnection(
ctx *transferContext, connector *connector, clientConn, serverConn *interceptor.PGConn,
ctx *transferContext,
connector *connector,
metrics *metrics,
clientConn, serverConn *interceptor.PGConn,
) (_ *interceptor.PGConn, retErr error) {
ctx.markRecoverable(true)

Expand All @@ -202,7 +205,7 @@ func transferConnection(
}

transferErr, state, revivalToken, err := waitForShowTransferState(
ctx, serverConn.ToFrontendConn(), clientConn, transferKey)
ctx, serverConn.ToFrontendConn(), clientConn, transferKey, metrics)
if err != nil {
return nil, errors.Wrap(err, "waiting for transfer state")
}
Expand Down Expand Up @@ -310,6 +313,9 @@ var runShowTransferState = func(w io.Writer, transferKey string) error {
// Since ReadyForQuery may be for a previous pipelined query, this handles the
// forwarding of messages back to the client in case we don't see our state yet.
//
// metrics is optional, and if not nil, it will be used to record the transfer
// response message size in ConnMigrationTransferResponseMessageSize.
//
// WARNING: When using this, we assume that no other goroutines are using both
// serverConn and clientConn. In the context of a transfer, the response
// processor must be blocked to avoid concurrent reads from serverConn.
Expand All @@ -318,6 +324,7 @@ var waitForShowTransferState = func(
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
metrics *metrics,
) (transferErr string, state string, revivalToken string, retErr error) {
// Wait for a response that looks like the following:
//
Expand Down Expand Up @@ -362,7 +369,7 @@ var waitForShowTransferState = func(
}

// 2. Read DataRow.
if err := expectDataRow(ctx, serverConn, func(msg *pgproto3.DataRow) bool {
if err := expectDataRow(ctx, serverConn, func(msg *pgproto3.DataRow, size int) bool {
// This has to be 4 since we validated RowDescription earlier.
if len(msg.Values) != 4 {
return false
Expand All @@ -380,6 +387,11 @@ var waitForShowTransferState = func(
// referenced in msg will no longer be valid once we read the next pgwire
// message.
transferErr, state, revivalToken = string(msg.Values[0]), string(msg.Values[1]), string(msg.Values[2])

// Since the DataRow is valid, record response message size.
if metrics != nil {
metrics.ConnMigrationTransferResponseMessageSize.RecordValue(int64(size))
}
return true
}); err != nil {
return "", "", "", errors.Wrap(err, "expecting DataRow")
Expand Down Expand Up @@ -447,7 +459,7 @@ var runAndWaitForDeserializeSession = func(
}

// 2. Read DataRow.
if err := expectDataRow(ctx, serverConn, func(msg *pgproto3.DataRow) bool {
if err := expectDataRow(ctx, serverConn, func(msg *pgproto3.DataRow, _ int) bool {
return len(msg.Values) == 1 && string(msg.Values[0]) == "t"
}); err != nil {
return errors.Wrap(err, "expecting DataRow")
Expand Down Expand Up @@ -564,11 +576,15 @@ func waitForSmallRowDescription(
func expectDataRow(
ctx context.Context,
serverConn *interceptor.FrontendConn,
validateFn func(*pgproto3.DataRow) bool,
validateFn func(*pgproto3.DataRow, int) bool,
) error {
if ctx.Err() != nil {
return ctx.Err()
}
_, size, err := serverConn.PeekMsg()
if err != nil {
return errors.Wrap(err, "peeking message")
}
msg, err := serverConn.ReadMsg()
if err != nil {
return errors.Wrap(err, "reading message")
Expand All @@ -577,7 +593,7 @@ func expectDataRow(
if !ok {
return errors.Newf("unexpected message: %v", jsonOrRaw(msg))
}
if !validateFn(pgMsg) {
if !validateFn(pgMsg, size) {
return errors.Newf("validation failed for message: %v", jsonOrRaw(msg))
}
return nil
Expand Down
29 changes: 22 additions & 7 deletions pkg/ccl/sqlproxyccl/conn_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestTransferConnection(t *testing.T) {
ctx, cancel := newTransferContext(context.Background())
cancel()

conn, err := transferConnection(ctx, nil, nil, nil)
conn, err := transferConnection(ctx, nil, nil, nil, nil)
require.EqualError(t, err, context.Canceled.Error())
require.Nil(t, conn)
require.True(t, ctx.isRecoverable())
Expand All @@ -131,6 +131,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
nil,
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand All @@ -157,6 +158,7 @@ func TestTransferConnection(t *testing.T) {
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
_ *metrics,
) (string, string, string, error) {
require.Equal(t, ctx, tCtx)
require.NotNil(t, serverConn)
Expand All @@ -169,6 +171,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
nil,
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand All @@ -195,6 +198,7 @@ func TestTransferConnection(t *testing.T) {
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
_ *metrics,
) (string, string, string, error) {
require.Equal(t, ctx, tCtx)
require.NotNil(t, serverConn)
Expand All @@ -207,6 +211,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
nil,
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand All @@ -233,6 +238,7 @@ func TestTransferConnection(t *testing.T) {
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
_ *metrics,
) (string, string, string, error) {
require.Equal(t, ctx, tCtx)
require.NotNil(t, serverConn)
Expand All @@ -256,6 +262,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
&connector{},
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand All @@ -282,6 +289,7 @@ func TestTransferConnection(t *testing.T) {
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
_ *metrics,
) (string, string, string, error) {
require.Equal(t, ctx, tCtx)
require.NotNil(t, serverConn)
Expand Down Expand Up @@ -321,6 +329,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
&connector{},
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand Down Expand Up @@ -351,6 +360,7 @@ func TestTransferConnection(t *testing.T) {
serverConn *interceptor.FrontendConn,
clientConn io.Writer,
transferKey string,
_ *metrics,
) (string, string, string, error) {
require.Equal(t, ctx, tCtx)
require.NotNil(t, serverConn)
Expand Down Expand Up @@ -390,6 +400,7 @@ func TestTransferConnection(t *testing.T) {
conn, err := transferConnection(
ctx,
&connector{},
nil,
interceptor.NewPGConn(p1),
interceptor.NewPGConn(p2),
)
Expand Down Expand Up @@ -476,7 +487,7 @@ func TestWaitForShowTransferState(t *testing.T) {
tCtx, cancel := context.WithCancel(ctx)
cancel()

transferErr, state, token, err := waitForShowTransferState(tCtx, nil, nil, "")
transferErr, state, token, err := waitForShowTransferState(tCtx, nil, nil, "", nil)
require.True(t, errors.Is(err, context.Canceled))
require.Equal(t, "", transferErr)
require.Equal(t, "", state)
Expand Down Expand Up @@ -753,6 +764,7 @@ func TestWaitForShowTransferState(t *testing.T) {
interceptor.NewFrontendConn(serverProxy),
clientProxy,
"foo-transfer-key",
nil,
)
if tc.err == "" {
require.NoError(t, err)
Expand Down Expand Up @@ -1069,7 +1081,7 @@ func TestExpectDataRow(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

falseValidateFn := func(m *pgproto3.DataRow) bool { return false }
falseValidateFn := func(m *pgproto3.DataRow, s int) bool { return false }

t.Run("context_cancelled", func(t *testing.T) {
tCtx, cancel := context.WithCancel(ctx)
Expand All @@ -1085,7 +1097,7 @@ func TestExpectDataRow(t *testing.T) {
w.Close()

err := expectDataRow(ctx, interceptor.NewFrontendConn(r), falseValidateFn)
require.Regexp(t, "reading message", err)
require.Regexp(t, "peeking message", err)
})

t.Run("type_mismatch", func(t *testing.T) {
Expand Down Expand Up @@ -1119,15 +1131,18 @@ func TestExpectDataRow(t *testing.T) {
defer r.Close()
defer w.Close()

msg := &pgproto3.DataRow{Values: [][]byte{[]byte("foo")}}
go func() {
writeServerMsg(w, &pgproto3.DataRow{Values: [][]byte{[]byte("foo")}})
writeServerMsg(w, msg)
}()

err := expectDataRow(
ctx,
interceptor.NewFrontendConn(r),
func(m *pgproto3.DataRow) bool {
return len(m.Values) == 1 && string(m.Values[0]) == "foo"
func(m *pgproto3.DataRow, size int) bool {
return len(m.Values) == 1 &&
string(m.Values[0]) == "foo" &&
len(msg.Encode(nil)) == size
},
)
require.Nil(t, err)
Expand Down
54 changes: 39 additions & 15 deletions pkg/ccl/sqlproxyccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,40 @@ import (

// metrics contains pointers to the metrics for monitoring proxy operations.
type metrics struct {
BackendDisconnectCount *metric.Counter
IdleDisconnectCount *metric.Counter
BackendDownCount *metric.Counter
ClientDisconnectCount *metric.Counter
CurConnCount *metric.Gauge
RoutingErrCount *metric.Counter
RefusedConnCount *metric.Counter
SuccessfulConnCount *metric.Counter
AuthFailedCount *metric.Counter
ExpiredClientConnCount *metric.Counter
ConnMigrationSuccessCount *metric.Counter
ConnMigrationErrorFatalCount *metric.Counter
ConnMigrationErrorRecoverableCount *metric.Counter
ConnMigrationAttemptedCount *metric.Counter
ConnMigrationAttemptedLatency *metric.Histogram
BackendDisconnectCount *metric.Counter
IdleDisconnectCount *metric.Counter
BackendDownCount *metric.Counter
ClientDisconnectCount *metric.Counter
CurConnCount *metric.Gauge
RoutingErrCount *metric.Counter
RefusedConnCount *metric.Counter
SuccessfulConnCount *metric.Counter
AuthFailedCount *metric.Counter
ExpiredClientConnCount *metric.Counter

ConnMigrationSuccessCount *metric.Counter
ConnMigrationErrorFatalCount *metric.Counter
ConnMigrationErrorRecoverableCount *metric.Counter
ConnMigrationAttemptedCount *metric.Counter
ConnMigrationAttemptedLatency *metric.Histogram
ConnMigrationTransferResponseMessageSize *metric.Histogram
}

// MetricStruct implements the metrics.Struct interface.
func (metrics) MetricStruct() {}

var _ metric.Struct = metrics{}

const (
// maxExpectedTransferResponseMessageSize corresponds to maximum expected
// response message size for the SHOW TRANSFER STATE query. We choose 16MB
// here to match the defaultMaxReadBufferSize used for ingesting SQL
// statements in the SQL server (see pkg/sql/pgwire/pgwirebase/encoding.go).
//
// This will be used to tune sql.session_transfer.max_session_size.
maxExpectedTransferResponseMessageSize = 1 << 24 // 16MB
)

var (
metaCurConnCount = metric.Metadata{
Name: "proxy.sql.conns",
Expand Down Expand Up @@ -134,6 +146,12 @@ var (
Measurement: "Latency",
Unit: metric.Unit_NANOSECONDS,
}
metaConnMigrationTransferResponseMessageSize = metric.Metadata{
Name: "proxy.conn_migration.transfer_response.message_size",
Help: "Message size for the SHOW TRANSFER STATE response",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
)

// makeProxyMetrics instantiates the metrics holder for proxy monitoring.
Expand All @@ -158,6 +176,12 @@ func makeProxyMetrics() metrics {
metaConnMigrationAttemptedLatency,
base.DefaultHistogramWindowInterval(),
),
ConnMigrationTransferResponseMessageSize: metric.NewHistogram(
metaConnMigrationTransferResponseMessageSize,
base.DefaultHistogramWindowInterval(),
maxExpectedTransferResponseMessageSize,
1,
),
}
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,8 @@ func TestConnectionMigration(t *testing.T) {
)
require.Equal(t, f.metrics.ConnMigrationAttemptedCount.Count(),
f.metrics.ConnMigrationAttemptedLatency.TotalCount())
require.Equal(t, f.metrics.ConnMigrationAttemptedCount.Count(),
f.metrics.ConnMigrationTransferResponseMessageSize.TotalCount())
})

// Transfers should fail if there is an open transaction. These failed
Expand Down Expand Up @@ -999,6 +1001,8 @@ func TestConnectionMigration(t *testing.T) {
require.Equal(t, int64(0), f.metrics.ConnMigrationErrorFatalCount.Count())
require.Equal(t, f.metrics.ConnMigrationAttemptedCount.Count(),
f.metrics.ConnMigrationAttemptedLatency.TotalCount())
require.Equal(t, f.metrics.ConnMigrationAttemptedCount.Count(),
f.metrics.ConnMigrationTransferResponseMessageSize.TotalCount())

// We have already asserted metrics above, so transfer must have
// been completed.
Expand Down Expand Up @@ -1028,6 +1032,8 @@ func TestConnectionMigration(t *testing.T) {
require.Equal(t, int64(0), f.metrics.ConnMigrationErrorFatalCount.Count())
require.Equal(t, f.metrics.ConnMigrationAttemptedCount.Count(),
f.metrics.ConnMigrationAttemptedLatency.TotalCount())
require.Equal(t, f.metrics.ConnMigrationAttemptedCount.Count(),
f.metrics.ConnMigrationTransferResponseMessageSize.TotalCount())

// We have already asserted metrics above, so transfer must have
// been completed.
Expand Down Expand Up @@ -1133,6 +1139,11 @@ func TestConnectionMigration(t *testing.T) {
require.NotNil(t, f.ctx.Err())
require.Equal(t, f.metrics.ConnMigrationAttemptedCount.Count(),
f.metrics.ConnMigrationAttemptedLatency.TotalCount())

// Here, we get a transfer timeout in response, so the message size
// should not be recorded.
require.Equal(t, f.metrics.ConnMigrationAttemptedCount.Count()-1,
f.metrics.ConnMigrationTransferResponseMessageSize.TotalCount())
})
}

Expand Down

0 comments on commit ba4c58e

Please sign in to comment.