diff --git a/pkg/ccl/serverccl/statusccl/tenant_status_test.go b/pkg/ccl/serverccl/statusccl/tenant_status_test.go index 136afa80dfd4..0e7b3e3aa632 100644 --- a/pkg/ccl/serverccl/statusccl/tenant_status_test.go +++ b/pkg/ccl/serverccl/statusccl/tenant_status_test.go @@ -14,7 +14,6 @@ import ( "encoding/hex" "fmt" "net/url" - "reflect" "sort" "strconv" "strings" @@ -660,12 +659,10 @@ func TestTenantStatusCancelSession(t *testing.T) { sqlPod0.Exec(t, "SELECT 1") // See the session over HTTP on tenant SQL pod 1. - httpPod1, err := helper.testCluster().tenantHTTPJSONClient(1) - require.NoError(t, err) + httpPod1 := helper.testCluster().tenantHTTPClient(t, 1) defer httpPod1.Close() listSessionsResp := serverpb.ListSessionsResponse{} - err = httpPod1.GetJSON("/_status/sessions", &listSessionsResp) - require.NoError(t, err) + httpPod1.GetJSON("/_status/sessions", &listSessionsResp) var session serverpb.Session for _, s := range listSessionsResp.Sessions { if s.LastActiveQuery == "SELECT 1" { @@ -676,24 +673,27 @@ func TestTenantStatusCancelSession(t *testing.T) { require.NotNil(t, session.ID, "session not found") // See the session over SQL on tenant SQL pod 0. - require.Contains(t, selectClusterSessionIDs(t, sqlPod0), hex.EncodeToString(session.ID)) + sessionID := hex.EncodeToString(session.ID) + require.Eventually(t, func() bool { + return strings.Contains(strings.Join(selectClusterSessionIDs(t, sqlPod0), ","), sessionID) + }, 5*time.Second, 100*time.Millisecond) // Cancel the session over HTTP from tenant SQL pod 1. cancelSessionReq := serverpb.CancelSessionRequest{SessionID: session.ID} cancelSessionResp := serverpb.CancelSessionResponse{} - err = httpPod1.PostJSON("/_status/cancel_session/"+session.NodeID.String(), &cancelSessionReq, &cancelSessionResp) - require.NoError(t, err) + httpPod1.PostJSON("/_status/cancel_session/"+session.NodeID.String(), &cancelSessionReq, &cancelSessionResp) require.Equal(t, true, cancelSessionResp.Canceled, cancelSessionResp.Error) // No longer see the session over SQL from tenant SQL pod 0. // (The SQL client maintains an internal connection pool and automatically reconnects.) - require.NotContains(t, selectClusterSessionIDs(t, sqlPod0), hex.EncodeToString(session.ID)) + require.Eventually(t, func() bool { + return !strings.Contains(strings.Join(selectClusterSessionIDs(t, sqlPod0), ","), sessionID) + }, 5*time.Second, 100*time.Millisecond) // Attempt to cancel the session again over HTTP from tenant SQL pod 1, so that we can see the error message. - err = httpPod1.PostJSON("/_status/cancel_session/"+session.NodeID.String(), &cancelSessionReq, &cancelSessionResp) - require.NoError(t, err) + httpPod1.PostJSON("/_status/cancel_session/"+session.NodeID.String(), &cancelSessionReq, &cancelSessionResp) require.Equal(t, false, cancelSessionResp.Canceled) - require.Equal(t, fmt.Sprintf("session ID %s not found", hex.EncodeToString(session.ID)), cancelSessionResp.Error) + require.Equal(t, fmt.Sprintf("session ID %s not found", sessionID), cancelSessionResp.Error) } func selectClusterQueryIDs(t *testing.T, conn *sqlutils.SQLRunner) []string { @@ -717,27 +717,25 @@ func TestTenantStatusCancelQuery(t *testing.T) { // Open a SQL session on tenant SQL pod 0 and start a long-running query. sqlPod0 := helper.testCluster().tenantConn(0) - results := make(chan struct{}) - errors := make(chan error) - defer close(results) - defer close(errors) + resultCh := make(chan struct{}) + errorCh := make(chan error) + defer close(resultCh) + defer close(errorCh) go func() { if _, err := sqlPod0.DB.ExecContext(ctx, "SELECT pg_sleep(60)"); err != nil { - errors <- err + errorCh <- err } else { - results <- struct{}{} + resultCh <- struct{}{} } }() // See the query over HTTP on tenant SQL pod 1. - httpPod1, err := helper.testCluster().tenantHTTPJSONClient(1) - require.NoError(t, err) + httpPod1 := helper.testCluster().tenantHTTPClient(t, 1) defer httpPod1.Close() var listSessionsResp serverpb.ListSessionsResponse var query serverpb.ActiveQuery require.Eventually(t, func() bool { - err = httpPod1.GetJSON("/_status/sessions", &listSessionsResp) - require.NoError(t, err) + httpPod1.GetJSON("/_status/sessions", &listSessionsResp) for _, s := range listSessionsResp.Sessions { for _, q := range s.ActiveQueries { if q.Sql == "SELECT pg_sleep(60)" { @@ -750,32 +748,32 @@ func TestTenantStatusCancelQuery(t *testing.T) { }, 10*time.Second, 100*time.Millisecond, "query not found") // See the query over SQL on tenant SQL pod 0. - require.Contains(t, selectClusterQueryIDs(t, sqlPod0), query.ID) + require.Eventually(t, func() bool { + return strings.Contains(strings.Join(selectClusterQueryIDs(t, sqlPod0), ","), query.ID) + }, 10*time.Second, 100*time.Millisecond) // Cancel the query over HTTP on tenant SQL pod 1. cancelQueryReq := serverpb.CancelQueryRequest{QueryID: query.ID} cancelQueryResp := serverpb.CancelQueryResponse{} - err = httpPod1.PostJSON("/_status/cancel_query/0", &cancelQueryReq, &cancelQueryResp) - require.NoError(t, err) + httpPod1.PostJSON("/_status/cancel_query/0", &cancelQueryReq, &cancelQueryResp) require.Equal(t, true, cancelQueryResp.Canceled, "expected query to be canceled, but encountered unexpected error %s", cancelQueryResp.Error) // No longer see the query over SQL on tenant SQL pod 0. require.Eventually(t, func() bool { - return !strings.Contains(reflect.ValueOf(selectClusterQueryIDs(t, sqlPod0)).String(), query.ID) + return !strings.Contains(strings.Join(selectClusterQueryIDs(t, sqlPod0), ","), query.ID) }, 10*time.Second, 100*time.Millisecond, "expected query %s to no longer be visible in crdb_internal.cluster_queries", query.ID) select { - case <-results: + case <-resultCh: t.Fatalf("Expected long-running query to have been canceled with error.") - case err := <-errors: + case err := <-errorCh: require.Equal(t, "pq: query execution canceled", err.Error()) } // Attempt to cancel the query again over HTTP from tenant SQL pod 1, so that we can see the error message. - err = httpPod1.PostJSON("/_status/cancel_query/0", &cancelQueryReq, &cancelQueryResp) - require.NoError(t, err) + httpPod1.PostJSON("/_status/cancel_query/0", &cancelQueryReq, &cancelQueryResp) require.Equal(t, false, cancelQueryResp.Canceled) require.Equal(t, fmt.Sprintf("query ID %s not found", query.ID), cancelQueryResp.Error) } diff --git a/pkg/ccl/serverccl/statusccl/tenant_test_utils.go b/pkg/ccl/serverccl/statusccl/tenant_test_utils.go index f0afda6378ba..82a31493e475 100644 --- a/pkg/ccl/serverccl/statusccl/tenant_test_utils.go +++ b/pkg/ccl/serverccl/statusccl/tenant_test_utils.go @@ -150,12 +150,10 @@ func (c tenantCluster) tenantConn(idx int) *sqlutils.SQLRunner { return c[idx].tenantDB } -func (c tenantCluster) tenantHTTPJSONClient(idx int) (*httpJSONClient, error) { +func (c tenantCluster) tenantHTTPClient(t *testing.T, idx int) *httpClient { client, err := c[idx].tenant.RPCContext().GetHTTPClient() - if err != nil { - return nil, err - } - return &httpJSONClient{client: client, baseURL: "https://" + c[idx].tenant.HTTPAddr()}, nil + require.NoError(t, err) + return &httpClient{t: t, client: client, baseURL: "https://" + c[idx].tenant.HTTPAddr()} } func (c tenantCluster) tenantSQLStats(idx int) *persistedsqlstats.PersistedSQLStats { @@ -172,21 +170,22 @@ func (c tenantCluster) cleanup(t *testing.T) { } } -type httpJSONClient struct { +type httpClient struct { + t *testing.T client http.Client baseURL string } -func (c *httpJSONClient) GetJSON(path string, response protoutil.Message) error { - return httputil.GetJSON(c.client, c.baseURL+path, response) +func (c *httpClient) GetJSON(path string, response protoutil.Message) { + err := httputil.GetJSON(c.client, c.baseURL+path, response) + require.NoError(c.t, err) } -func (c *httpJSONClient) PostJSON( - path string, request protoutil.Message, response protoutil.Message, -) error { - return httputil.PostJSON(c.client, c.baseURL+path, request, response) +func (c *httpClient) PostJSON(path string, request protoutil.Message, response protoutil.Message) { + err := httputil.PostJSON(c.client, c.baseURL+path, request, response) + require.NoError(c.t, err) } -func (c *httpJSONClient) Close() { +func (c *httpClient) Close() { c.client.CloseIdleConnections() }