Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
72348: kv: fix improperly wrapped errors r=arulajmani,nvanbenschoten a=rafiss

refs #42510

I'm working on a linter that detects errors that are not wrapped
correctly, and it discovered these.

Release note: None

72692: server: tidy tenant testing client r=matthewtodd a=matthewtodd

Pushing these assertions into the test client removes some of the line
noise from the tests, making them easier to read. We also generalize the
tenant http client to provide plain text as well as json responses.

Release note: None

72717: server: add check for sql instance ID in tenant status server rpc handlers r=lindseyjin a=lindseyjin

Previously, most RPC handlers in tenant_status.go did not check if the
request was received by the status server before the sql pod had
finished its instance ID initialization. This commit adds in those
missing checks.

Release note: None

72749: cmd/roachtest: fix `--cluster` for new `roachprod` r=ajwerner a=ajwerner

The `roachprod list` command no longer takes a positional argument to filter
the results. This commit adopts the new `--pattern` flag in `roachtest`. This
is important when providing your own cluster for roachtest with the `--cluster`
flag.

Release note: None

72770: ui: create error Boundary component r=maryliag a=maryliag

Previously, when there was an error on the pages on the console
it would crash and nothing would load. This commit introduces
an error boundary, making an error message show instead of the
blank page.


<img width="897" alt="Screen Shot 2021-11-15 at 1 47 39 PM" src="https://user-images.githubusercontent.com/1017486/141837360-eeba1e0b-5245-43cf-a0a8-e185f2cfa179.png">

Release note (ui change): New page for when something goes wrong
and the page crashes.

72777: sql: disallow ALTER COLUMN TYPE for columns stored in secondary indexes r=ajwerner a=ajwerner

It is broken. After the change, the index, after the change, refers to a column
which is not in the table.

Fixes #72718.
Relates to #72771.

Release note (sql change): The experimental `ALTER COLUMN TYPE` statement
is no longer permitted when the column is stored in a secondary index. Prior
to this change, that was the only sort of secondary index membership which
was allowed, but the result of the operation was a subtly corrupted table.

72787: util: make FastIntMap return -1 when not found always r=ajwerner a=ajwerner

Before this change, it would only return -1 if it had not outgrown
the small map. This lead to the oddities uncovered in #72718.

Release note: None

Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Matthew Todd <[email protected]>
Co-authored-by: Lindsey Jin <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Marylia Gutierrez <[email protected]>
  • Loading branch information
6 people committed Nov 15, 2021
8 parents 4021c73 + 5d84e46 + c9c9207 + 83cdd2b + a79f290 + be9c596 + dccab78 + a242793 commit cb5d510
Show file tree
Hide file tree
Showing 19 changed files with 259 additions and 72 deletions.
58 changes: 28 additions & 30 deletions pkg/ccl/serverccl/statusccl/tenant_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"encoding/hex"
"fmt"
"net/url"
"reflect"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -660,12 +659,10 @@ func TestTenantStatusCancelSession(t *testing.T) {
sqlPod0.Exec(t, "SELECT 1")

// See the session over HTTP on tenant SQL pod 1.
httpPod1, err := helper.testCluster().tenantHTTPJSONClient(1)
require.NoError(t, err)
httpPod1 := helper.testCluster().tenantHTTPClient(t, 1)
defer httpPod1.Close()
listSessionsResp := serverpb.ListSessionsResponse{}
err = httpPod1.GetJSON("/_status/sessions", &listSessionsResp)
require.NoError(t, err)
httpPod1.GetJSON("/_status/sessions", &listSessionsResp)
var session serverpb.Session
for _, s := range listSessionsResp.Sessions {
if s.LastActiveQuery == "SELECT 1" {
Expand All @@ -676,24 +673,27 @@ func TestTenantStatusCancelSession(t *testing.T) {
require.NotNil(t, session.ID, "session not found")

// See the session over SQL on tenant SQL pod 0.
require.Contains(t, selectClusterSessionIDs(t, sqlPod0), hex.EncodeToString(session.ID))
sessionID := hex.EncodeToString(session.ID)
require.Eventually(t, func() bool {
return strings.Contains(strings.Join(selectClusterSessionIDs(t, sqlPod0), ","), sessionID)
}, 5*time.Second, 100*time.Millisecond)

// Cancel the session over HTTP from tenant SQL pod 1.
cancelSessionReq := serverpb.CancelSessionRequest{SessionID: session.ID}
cancelSessionResp := serverpb.CancelSessionResponse{}
err = httpPod1.PostJSON("/_status/cancel_session/"+session.NodeID.String(), &cancelSessionReq, &cancelSessionResp)
require.NoError(t, err)
httpPod1.PostJSON("/_status/cancel_session/"+session.NodeID.String(), &cancelSessionReq, &cancelSessionResp)
require.Equal(t, true, cancelSessionResp.Canceled, cancelSessionResp.Error)

// No longer see the session over SQL from tenant SQL pod 0.
// (The SQL client maintains an internal connection pool and automatically reconnects.)
require.NotContains(t, selectClusterSessionIDs(t, sqlPod0), hex.EncodeToString(session.ID))
require.Eventually(t, func() bool {
return !strings.Contains(strings.Join(selectClusterSessionIDs(t, sqlPod0), ","), sessionID)
}, 5*time.Second, 100*time.Millisecond)

// Attempt to cancel the session again over HTTP from tenant SQL pod 1, so that we can see the error message.
err = httpPod1.PostJSON("/_status/cancel_session/"+session.NodeID.String(), &cancelSessionReq, &cancelSessionResp)
require.NoError(t, err)
httpPod1.PostJSON("/_status/cancel_session/"+session.NodeID.String(), &cancelSessionReq, &cancelSessionResp)
require.Equal(t, false, cancelSessionResp.Canceled)
require.Equal(t, fmt.Sprintf("session ID %s not found", hex.EncodeToString(session.ID)), cancelSessionResp.Error)
require.Equal(t, fmt.Sprintf("session ID %s not found", sessionID), cancelSessionResp.Error)
}

func selectClusterQueryIDs(t *testing.T, conn *sqlutils.SQLRunner) []string {
Expand All @@ -717,27 +717,25 @@ func TestTenantStatusCancelQuery(t *testing.T) {

// Open a SQL session on tenant SQL pod 0 and start a long-running query.
sqlPod0 := helper.testCluster().tenantConn(0)
results := make(chan struct{})
errors := make(chan error)
defer close(results)
defer close(errors)
resultCh := make(chan struct{})
errorCh := make(chan error)
defer close(resultCh)
defer close(errorCh)
go func() {
if _, err := sqlPod0.DB.ExecContext(ctx, "SELECT pg_sleep(60)"); err != nil {
errors <- err
errorCh <- err
} else {
results <- struct{}{}
resultCh <- struct{}{}
}
}()

// See the query over HTTP on tenant SQL pod 1.
httpPod1, err := helper.testCluster().tenantHTTPJSONClient(1)
require.NoError(t, err)
httpPod1 := helper.testCluster().tenantHTTPClient(t, 1)
defer httpPod1.Close()
var listSessionsResp serverpb.ListSessionsResponse
var query serverpb.ActiveQuery
require.Eventually(t, func() bool {
err = httpPod1.GetJSON("/_status/sessions", &listSessionsResp)
require.NoError(t, err)
httpPod1.GetJSON("/_status/sessions", &listSessionsResp)
for _, s := range listSessionsResp.Sessions {
for _, q := range s.ActiveQueries {
if q.Sql == "SELECT pg_sleep(60)" {
Expand All @@ -750,32 +748,32 @@ func TestTenantStatusCancelQuery(t *testing.T) {
}, 10*time.Second, 100*time.Millisecond, "query not found")

// See the query over SQL on tenant SQL pod 0.
require.Contains(t, selectClusterQueryIDs(t, sqlPod0), query.ID)
require.Eventually(t, func() bool {
return strings.Contains(strings.Join(selectClusterQueryIDs(t, sqlPod0), ","), query.ID)
}, 10*time.Second, 100*time.Millisecond)

// Cancel the query over HTTP on tenant SQL pod 1.
cancelQueryReq := serverpb.CancelQueryRequest{QueryID: query.ID}
cancelQueryResp := serverpb.CancelQueryResponse{}
err = httpPod1.PostJSON("/_status/cancel_query/0", &cancelQueryReq, &cancelQueryResp)
require.NoError(t, err)
httpPod1.PostJSON("/_status/cancel_query/0", &cancelQueryReq, &cancelQueryResp)
require.Equal(t, true, cancelQueryResp.Canceled,
"expected query to be canceled, but encountered unexpected error %s", cancelQueryResp.Error)

// No longer see the query over SQL on tenant SQL pod 0.
require.Eventually(t, func() bool {
return !strings.Contains(reflect.ValueOf(selectClusterQueryIDs(t, sqlPod0)).String(), query.ID)
return !strings.Contains(strings.Join(selectClusterQueryIDs(t, sqlPod0), ","), query.ID)
}, 10*time.Second, 100*time.Millisecond,
"expected query %s to no longer be visible in crdb_internal.cluster_queries", query.ID)

select {
case <-results:
case <-resultCh:
t.Fatalf("Expected long-running query to have been canceled with error.")
case err := <-errors:
case err := <-errorCh:
require.Equal(t, "pq: query execution canceled", err.Error())
}

// Attempt to cancel the query again over HTTP from tenant SQL pod 1, so that we can see the error message.
err = httpPod1.PostJSON("/_status/cancel_query/0", &cancelQueryReq, &cancelQueryResp)
require.NoError(t, err)
httpPod1.PostJSON("/_status/cancel_query/0", &cancelQueryReq, &cancelQueryResp)
require.Equal(t, false, cancelQueryResp.Canceled)
require.Equal(t, fmt.Sprintf("query ID %s not found", query.ID), cancelQueryResp.Error)
}
25 changes: 12 additions & 13 deletions pkg/ccl/serverccl/statusccl/tenant_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,10 @@ func (c tenantCluster) tenantConn(idx int) *sqlutils.SQLRunner {
return c[idx].tenantDB
}

func (c tenantCluster) tenantHTTPJSONClient(idx int) (*httpJSONClient, error) {
func (c tenantCluster) tenantHTTPClient(t *testing.T, idx int) *httpClient {
client, err := c[idx].tenant.RPCContext().GetHTTPClient()
if err != nil {
return nil, err
}
return &httpJSONClient{client: client, baseURL: "https://" + c[idx].tenant.HTTPAddr()}, nil
require.NoError(t, err)
return &httpClient{t: t, client: client, baseURL: "https://" + c[idx].tenant.HTTPAddr()}
}

func (c tenantCluster) tenantSQLStats(idx int) *persistedsqlstats.PersistedSQLStats {
Expand All @@ -172,21 +170,22 @@ func (c tenantCluster) cleanup(t *testing.T) {
}
}

type httpJSONClient struct {
type httpClient struct {
t *testing.T
client http.Client
baseURL string
}

func (c *httpJSONClient) GetJSON(path string, response protoutil.Message) error {
return httputil.GetJSON(c.client, c.baseURL+path, response)
func (c *httpClient) GetJSON(path string, response protoutil.Message) {
err := httputil.GetJSON(c.client, c.baseURL+path, response)
require.NoError(c.t, err)
}

func (c *httpJSONClient) PostJSON(
path string, request protoutil.Message, response protoutil.Message,
) error {
return httputil.PostJSON(c.client, c.baseURL+path, request, response)
func (c *httpClient) PostJSON(path string, request protoutil.Message, response protoutil.Message) {
err := httputil.PostJSON(c.client, c.baseURL+path, request, response)
require.NoError(c.t, err)
}

func (c *httpJSONClient) Close() {
func (c *httpClient) Close() {
c.client.CloseIdleConnections()
}
3 changes: 2 additions & 1 deletion pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,8 @@ func (c *clusterImpl) validate(
) error {
// Perform validation on the existing cluster.
c.status("checking that existing cluster matches spec")
sargs := []string{roachprod, "list", c.name, "--json", "--quiet"}
pattern := "^" + regexp.QuoteMeta(c.name) + "$"
sargs := []string{roachprod, "list", "--pattern", pattern, "--json", "--quiet"}
out, err := execCmdWithBuffer(ctx, l, sargs...)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2205,7 +2205,7 @@ func skipStaleReplicas(
if !routing.Valid() {
return noMoreReplicasErr(
ambiguousError,
errors.Newf("routing information detected to be stale; lastErr: %s", lastErr))
errors.Wrap(lastErr, "routing information detected to be stale"))
}

for {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func verifyCleanup(key roachpb.Key, eng storage.Engine, t *testing.T, coords ...
//lint:ignore SA1019 historical usage of deprecated eng.MVCCGetProto is OK
ok, _, _, err := eng.MVCCGetProto(storage.MakeMVCCMetadataKey(key), meta)
if err != nil {
return fmt.Errorf("error getting MVCC metadata: %s", err)
return errors.Wrap(err, "error getting MVCC metadata")
}
if ok && meta.Txn != nil {
return fmt.Errorf("found unexpected write intent: %s", meta)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5019,7 +5019,7 @@ func setupClusterWithSubsumedRange(
newDesc, err = tc.AddVoters(desc.StartKey.AsRawKey(), tc.Target(1))
if kv.IsExpectedRelocateError(err) {
// Retry.
return errors.Newf("ChangeReplicas: received error %s", err)
return errors.Wrap(err, "ChangeReplicas received error")
}
return nil
})
Expand Down
55 changes: 55 additions & 0 deletions pkg/server/tenant_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ func (t *tenantStatusServer) ListLocalSessions(
if err != nil {
return nil, err
}

if t.sqlServer.SQLInstanceID() == 0 {
return nil, status.Errorf(codes.Unavailable, "instanceID not set")
}

return &serverpb.ListSessionsResponse{Sessions: sessions}, nil
}

Expand All @@ -177,6 +182,9 @@ func (t *tenantStatusServer) CancelQuery(
if err := t.checkCancelPrivilege(ctx, reqUsername, findSessionByQueryID(request.QueryID)); err != nil {
return nil, err
}
if t.sqlServer.SQLInstanceID() == 0 {
return nil, status.Errorf(codes.Unavailable, "instanceID not set")
}

response := serverpb.CancelQueryResponse{}
distinctErrorMessages := map[string]struct{}{}
Expand Down Expand Up @@ -243,6 +251,10 @@ func (t *tenantStatusServer) CancelSession(
return nil, err
}

if t.sqlServer.SQLInstanceID() == 0 {
return nil, status.Errorf(codes.Unavailable, "instanceID not set")
}

response := serverpb.CancelSessionResponse{}
distinctErrorMessages := map[string]struct{}{}

Expand Down Expand Up @@ -299,6 +311,10 @@ func (t *tenantStatusServer) ListContentionEvents(
return nil, err
}

if t.sqlServer.SQLInstanceID() == 0 {
return nil, status.Errorf(codes.Unavailable, "instanceID not set")
}

var response serverpb.ListContentionEventsResponse

podFn := func(ctx context.Context, client interface{}, _ base.SQLInstanceID) (interface{}, error) {
Expand Down Expand Up @@ -340,6 +356,15 @@ func (t *tenantStatusServer) ListContentionEvents(
return &response, nil
}

func (t *tenantStatusServer) ListLocalContentionEvents(
ctx context.Context, req *serverpb.ListContentionEventsRequest,
) (*serverpb.ListContentionEventsResponse, error) {
if t.sqlServer.SQLInstanceID() == 0 {
return nil, status.Errorf(codes.Unavailable, "instanceID not set")
}
return t.baseStatusServer.ListLocalContentionEvents(ctx, req)
}

func (t *tenantStatusServer) ResetSQLStats(
ctx context.Context, req *serverpb.ResetSQLStatsRequest,
) (*serverpb.ResetSQLStatsResponse, error) {
Expand All @@ -350,6 +375,10 @@ func (t *tenantStatusServer) ResetSQLStats(
return nil, err
}

if t.sqlServer.SQLInstanceID() == 0 {
return nil, status.Errorf(codes.Unavailable, "instanceID not set")
}

response := &serverpb.ResetSQLStatsResponse{}
controller := t.sqlServer.pgServer.SQLServer.GetSQLStatsController()

Expand Down Expand Up @@ -658,9 +687,23 @@ func (t *tenantStatusServer) iteratePods(
func (t *tenantStatusServer) ListDistSQLFlows(
ctx context.Context, request *serverpb.ListDistSQLFlowsRequest,
) (*serverpb.ListDistSQLFlowsResponse, error) {
if t.sqlServer.SQLInstanceID() == 0 {
return nil, status.Errorf(codes.Unavailable, "instanceID not set")
}

return t.ListLocalDistSQLFlows(ctx, request)
}

func (t *tenantStatusServer) ListLocalDistSQLFlows(
ctx context.Context, request *serverpb.ListDistSQLFlowsRequest,
) (*serverpb.ListDistSQLFlowsResponse, error) {
if t.sqlServer.SQLInstanceID() == 0 {
return nil, status.Errorf(codes.Unavailable, "instanceID not set")
}

return t.baseStatusServer.ListLocalDistSQLFlows(ctx, request)
}

// Profile implements the profiling endpoint by delegating the request
// to the local handler. No facility for requesting profiles from
// remote nodes is facilitated at this time. Requests for nodes other
Expand All @@ -671,6 +714,10 @@ func (t *tenantStatusServer) Profile(
ctx = propagateGatewayMetadata(ctx)
ctx = t.AnnotateCtx(ctx)

if t.sqlServer.SQLInstanceID() == 0 {
return nil, status.Errorf(codes.Unavailable, "instanceID not set")
}

if request.NodeId != "local" {
return nil, status.Errorf(codes.Unimplemented, "profiling arbitrary tenants is unsupported")
}
Expand All @@ -687,6 +734,10 @@ func (t *tenantStatusServer) IndexUsageStatistics(
return nil, err
}

if t.sqlServer.SQLInstanceID() == 0 {
return nil, status.Errorf(codes.Unavailable, "instanceID not set")
}

localReq := &serverpb.IndexUsageStatisticsRequest{
NodeID: "local",
}
Expand Down Expand Up @@ -757,6 +808,10 @@ func (t *tenantStatusServer) ResetIndexUsageStats(
return nil, err
}

if t.sqlServer.SQLInstanceID() == 0 {
return nil, status.Errorf(codes.Unavailable, "instanceID not set")
}

localReq := &serverpb.ResetIndexUsageStatsRequest{
NodeID: "local",
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/alter_column_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,13 @@ func alterColumnTypeGeneral(
return colInIndexNotSupportedErr
}
}
if !idx.Primary() {
for i := 0; i < idx.NumSecondaryStoredColumns(); i++ {
if idx.GetStoredColumnID(i) == col.GetID() {
return colInIndexNotSupportedErr
}
}
}
}

// Disallow ALTER COLUMN TYPE general inside an explicit transaction.
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/alter_column_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func TestSchemaChangeBeforeAlterColumnType(t *testing.T) {
},
},
}

defer close(waitBeforeContinuing)
s, db, _ := serverutils.StartServer(t, params)
sqlDB := sqlutils.MakeSQLRunner(db)
defer s.Stopper().Stop(ctx)
Expand All @@ -353,7 +353,8 @@ ALTER TABLE t.test ALTER PRIMARY KEY USING COLUMNS (x);

<-swapNotification

expected := "pq: unimplemented: table test is currently undergoing a schema change"
expected := "pq: unimplemented: ALTER COLUMN TYPE requiring rewrite of on-disk data is currently not " +
"supported for columns that are part of an index"
sqlDB.ExpectErr(t, expected, `
SET enable_experimental_alter_column_type_general = true;
ALTER TABLE t.test ALTER COLUMN y TYPE STRING;`)
Expand Down
Loading

0 comments on commit cb5d510

Please sign in to comment.