Skip to content

Commit

Permalink
sqlproxyccl: add tests for query cancellation
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
rafiss committed Aug 5, 2022
1 parent 043ea4a commit ac56822
Show file tree
Hide file tree
Showing 6 changed files with 305 additions and 38 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ go_test(
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/ctxgroup",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
Expand Down
2 changes: 0 additions & 2 deletions pkg/ccl/sqlproxyccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ func (c *connector) OpenTenantConnWithToken(
// Since this method is only used during connection migration (i.e. proxy
// is connecting to the SQL pod), we'll discard all of the messages, and
// only return once we've seen a ReadyForQuery message.
//
// NOTE: This will need to be updated when we implement query cancellation.
newBackendKeyData, err := readTokenAuthResult(serverConn)
if err != nil {
return nil, err
Expand Down
26 changes: 20 additions & 6 deletions pkg/ccl/sqlproxyccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func TestConnector_OpenTenantConnWithToken(t *testing.T) {

defer testutils.TestingHook(
&readTokenAuthResult,
func(serverConn net.Conn) error {
return errors.New("bar")
func(serverConn net.Conn) (*pgproto3.BackendKeyData, error) {
return nil, errors.New("bar")
},
)()

Expand All @@ -114,9 +114,18 @@ func TestConnector_OpenTenantConnWithToken(t *testing.T) {
StartupMsg: &pgproto3.StartupMessage{
Parameters: make(map[string]string),
},
CancelInfo: makeCancelInfo(
&net.TCPAddr{IP: net.IP{4, 5, 6, 7}},
&net.TCPAddr{IP: net.IP{11, 22, 33, 44}},
),
}
pipeConn, _ := net.Pipe()
defer pipeConn.Close()
conn := &fakeTCPConn{
Conn: pipeConn,
remoteAddr: &net.TCPAddr{IP: net.IP{1, 2, 3, 4}},
localAddr: &net.TCPAddr{IP: net.IP{4, 5, 6, 7}},
}
conn, _ := net.Pipe()
defer conn.Close()

var openCalled bool
c.testingKnobs.dialTenantCluster = func(
Expand All @@ -134,12 +143,16 @@ func TestConnector_OpenTenantConnWithToken(t *testing.T) {
}

var authCalled bool
crdbBackendKeyData := &pgproto3.BackendKeyData{
ProcessID: 4,
SecretKey: 5,
}
defer testutils.TestingHook(
&readTokenAuthResult,
func(serverConn net.Conn) error {
func(serverConn net.Conn) (*pgproto3.BackendKeyData, error) {
authCalled = true
require.Equal(t, conn, serverConn)
return nil
return crdbBackendKeyData, nil
},
)()

Expand All @@ -148,6 +161,7 @@ func TestConnector_OpenTenantConnWithToken(t *testing.T) {
require.True(t, authCalled)
require.NoError(t, err)
require.Equal(t, conn, crdbConn)
require.Equal(t, crdbBackendKeyData, c.CancelInfo.mu.origBackendKeyData)

// Ensure that token is deleted.
_, ok := c.StartupMsg.Parameters[sessionRevivalTokenStartupParam]
Expand Down
12 changes: 9 additions & 3 deletions pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ type ProxyOptions struct {

// balancerOpts is used to customize the balancer created by the proxy.
balancerOpts []balancer.Option

httpCancelErrHandler func(err error)
}
}

Expand Down Expand Up @@ -491,21 +493,25 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn net.Conn)
// handleCancelRequest handles a pgwire query cancel request by either
// forwarding it to a SQL node or to another proxy.
func (handler *proxyHandler) handleCancelRequest(cr *proxyCancelRequest, allowForward bool) error {
const timeout = 2 * time.Second
if ci, ok := handler.cancelInfoMap.getCancelInfo(cr.SecretKey); ok {
return ci.sendCancelToBackend(cr.ClientIP)
}
// Only forward the request if it hasn't already been sent to the correct proxy.
if !allowForward {
return nil
}
u := "https://" + cr.ProxyIP.String() + ":8080/_status/cancel"
u := "http://" + cr.ProxyIP.String() + ":8080/_status/cancel/"
reqBody := bytes.NewReader(cr.Encode())
return forwardCancelRequest(u, reqBody)
}

var forwardCancelRequest = func(url string, reqBody *bytes.Reader) error {
const timeout = 2 * time.Second
client := http.Client{
Timeout: timeout,
}

if _, err := client.Post(u, "application/octet-stream", reqBody); err != nil {
if _, err := client.Post(url, "application/octet-stream", reqBody); err != nil {
return err
}
return nil
Expand Down
Loading

0 comments on commit ac56822

Please sign in to comment.