diff --git a/pkg/cli/cli_test.go b/pkg/cli/cli_test.go index 87877b13a7fa..16d27cffbef4 100644 --- a/pkg/cli/cli_test.go +++ b/pkg/cli/cli_test.go @@ -1836,9 +1836,7 @@ func TestCLITimeout(t *testing.T) { } const exp = `node status 1 --timeout 1ns -operation timed out. - -context deadline exceeded +pq: query execution canceled due to statement timeout ` if out != exp { err := errors.Errorf("unexpected output:\n%q\nwanted:\n%q", out, exp) diff --git a/pkg/cli/context.go b/pkg/cli/context.go index 56138b0bd201..fc40df49a1e2 100644 --- a/pkg/cli/context.go +++ b/pkg/cli/context.go @@ -151,13 +151,6 @@ type cliContext struct { // Defaults set by InitCLIDefaults() above. var cliCtx = cliContext{Config: baseCfg} -func cmdTimeoutContext(ctx context.Context) (context.Context, func()) { - if cliCtx.cmdTimeout != 0 { - return context.WithTimeout(ctx, cliCtx.cmdTimeout) - } - return context.WithCancel(ctx) -} - // sqlCtx captures the command-line parameters of the `sql` command. // Defaults set by InitCLIDefaults() above. var sqlCtx = struct { diff --git a/pkg/cli/node.go b/pkg/cli/node.go index c08482247c47..e12852e3858b 100644 --- a/pkg/cli/node.go +++ b/pkg/cli/node.go @@ -20,7 +20,6 @@ import ( "math" "os" "reflect" - "sort" "strconv" "time" @@ -29,13 +28,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" - "github.com/cockroachdb/cockroach/pkg/server/status" "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) const ( - localTimeFormat = "2006-01-02 15:04:05" + localTimeFormat = "2006-01-02 15:04:05.999999-07:00" ) var lsNodesColumnHeaders = []string{ @@ -54,17 +51,26 @@ To retrieve the IDs for inactive members, see 'node status --decommission'. } func runLsNodes(cmd *cobra.Command, args []string) error { - const showDecommissioned = false - nodeStatuses, _, err := runStatusNodeInner(showDecommissioned, nil) + conn, err := getPasswordAndMakeSQLClient("cockroach node ls") if err != nil { return err } + defer conn.Close() - var rows [][]string - for _, nodeStatus := range nodeStatuses { - rows = append(rows, []string{ - strconv.FormatInt(int64(nodeStatus.Desc.NodeID), 10), - }) + if cliCtx.cmdTimeout != 0 { + if err := conn.Exec(fmt.Sprintf("SET statement_timeout=%d", cliCtx.cmdTimeout), nil); err != nil { + return err + } + } + + _, rows, err := runQuery( + conn, + makeQuery(`SELECT node_id FROM crdb_internal.gossip_liveness + WHERE decommissioning = false OR split_part(expiration,',',1)::decimal > now()::decimal`), + false, + ) + if err != nil { + return err } return printQueryOutput(os.Stdout, lsNodesColumnHeaders, newRowSliceIter(rows, "r")) @@ -113,105 +119,126 @@ var statusNodeCmd = &cobra.Command{ } func runStatusNode(cmd *cobra.Command, args []string) error { - nodeStatuses, decommissionStatusResp, err := runStatusNodeInner( - nodeCtx.statusShowDecommission || nodeCtx.statusShowAll, args, - ) + _, rows, err := runStatusNodeInner(nodeCtx.statusShowDecommission || nodeCtx.statusShowAll, args) if err != nil { return err } - sliceIter := newRowSliceIter(nodeStatusesToRows(nodeStatuses, decommissionStatusResp), getStatusNodeAlignment()) + sliceIter := newRowSliceIter(rows, getStatusNodeAlignment()) return printQueryOutput(os.Stdout, getStatusNodeHeaders(), sliceIter) } -func runStatusNodeInner( - showDecommissioned bool, args []string, -) ([]status.NodeStatus, *serverpb.DecommissionStatusResponse, error) { - ctx, cancel := cmdTimeoutContext(context.Background()) - defer cancel() +func runStatusNodeInner(showDecommissioned bool, args []string) ([]string, [][]string, error) { - var nodeStatuses []status.NodeStatus + joinUsingID := func(queries []string) (query string) { + for i, q := range queries { + if i == 0 { + query = q + continue + } + query = "(" + query + ") JOIN (" + q + ") USING (id)" + } + return + } + + maybeAddActiveNodesFilter := func(query string) string { + activeNodesFilter := "decommissioning = false OR split_part(expiration,',',1)::decimal > now()::decimal" + if !showDecommissioned { + query += " WHERE " + activeNodesFilter + } + return query + } - conn, _, finish, err := getClientGRPCConn(ctx) + baseQuery := joinUsingID( + []string{ + "SELECT node_id AS id, address, tag AS build, started_at, updated_at from crdb_internal.kv_node_status", + maybeAddActiveNodesFilter( + `SELECT node_id AS id, + CASE WHEN split_part(expiration,',',1)::decimal > now()::decimal + THEN true + ELSE false + END AS is_live + FROM crdb_internal.gossip_liveness`, + ), + }, + ) + + rangesQuery := ` +SELECT node_id AS id, + SUM((metrics->>'replicas.leaders')::DECIMAL)::INT AS replicas_leaders, + SUM((metrics->>'replicas.leaseholders')::DECIMAL)::INT AS replicas_leaseholders, + SUM((metrics->>'replicas')::DECIMAL)::INT AS ranges, + SUM((metrics->>'ranges.underreplicated')::DECIMAL)::INT AS ranges_underreplicated, + SUM((metrics->>'ranges.unavailable')::DECIMAL)::INT AS ranges_unavailable +FROM crdb_internal.kv_store_status +GROUP BY node_id` + + statsQuery := ` +SELECT node_id AS id, + SUM((metrics->>'livebytes')::DECIMAL)::INT AS live_bytes, + SUM((metrics->>'keybytes')::DECIMAL)::INT AS key_bytes, + SUM((metrics->>'valbytes')::DECIMAL)::INT AS value_bytes, + SUM((metrics->>'intentbytes')::DECIMAL)::INT AS intent_bytes, + SUM((metrics->>'sysbytes')::DECIMAL)::INT AS system_bytes +FROM crdb_internal.kv_store_status +GROUP BY node_id` + + decommissionQuery := joinUsingID( + []string{ + `SELECT node_id AS id, SUM((metrics->>'replicas')::DECIMAL)::INT AS gossiped_replicas + FROM crdb_internal.kv_store_status GROUP BY node_id`, + `SELECT node_id AS id, decommissioning AS is_decommissioning, draining AS is_draining + FROM crdb_internal.gossip_liveness`, + }, + ) + + conn, err := getPasswordAndMakeSQLClient("cockroach node status") if err != nil { return nil, nil, err } - defer finish() + defer conn.Close() - c := serverpb.NewStatusClient(conn) + queriesToJoin := []string{baseQuery} - var decommissionStatusRequest *serverpb.DecommissionStatusRequest + if nodeCtx.statusShowAll || nodeCtx.statusShowRanges { + queriesToJoin = append(queriesToJoin, rangesQuery) + } + if nodeCtx.statusShowAll || nodeCtx.statusShowStats { + queriesToJoin = append(queriesToJoin, statsQuery) + } + if nodeCtx.statusShowAll || nodeCtx.statusShowDecommission { + queriesToJoin = append(queriesToJoin, decommissionQuery) + } - switch len(args) { - case 0: - // Show status for all nodes. - nodes, err := c.Nodes(ctx, &serverpb.NodesRequest{}) - if err != nil { + if cliCtx.cmdTimeout != 0 { + if err := conn.Exec(fmt.Sprintf("SET statement_timeout=%d", cliCtx.cmdTimeout), nil); err != nil { return nil, nil, err } - nodeStatuses = nodes.Nodes - decommissionStatusRequest = &serverpb.DecommissionStatusRequest{ - NodeIDs: []roachpb.NodeID{}, - } + } + + queryString := "SELECT * FROM " + joinUsingID(queriesToJoin) + switch len(args) { + case 0: + query := makeQuery(queryString + " ORDER BY id") + return runQuery(conn, query, false) case 1: - nodeID := args[0] - nodeStatus, err := c.Node(ctx, &serverpb.NodeRequest{NodeId: nodeID}) + nodeID, err := strconv.Atoi(args[0]) if err != nil { - return nil, nil, err + return nil, nil, errors.Errorf("could not parse node_id %s", args[0]) } - nodeIDs, err := parseNodeIDs(args) + query := makeQuery(queryString+" WHERE id = $1", nodeID) + headers, rows, err := runQuery(conn, query, false) if err != nil { return nil, nil, err } - decommissionStatusRequest = &serverpb.DecommissionStatusRequest{ - NodeIDs: nodeIDs, + if len(rows) == 0 { + return nil, nil, fmt.Errorf("Error: node %d doesn't exist", nodeID) } - if nodeStatus.Desc.NodeID == 0 { - // I'm not sure why the status call doesn't return an error when the given NodeID doesn't - // exist. This should be revisited. - // - // TODO(cdo): Look into why status call returns erroneous data when given node ID of 0. - return nil, nil, fmt.Errorf("Error: node %s doesn't exist", nodeID) - } - nodeStatuses = []status.NodeStatus{*nodeStatus} - + return headers, rows, nil default: return nil, nil, errors.Errorf("expected no arguments or a single node ID") } - - cAdmin, finish, err := getAdminClient(ctx) - if err != nil { - return nil, nil, err - } - defer finish() - - decommissionStatusResp, err := cAdmin.DecommissionStatus(ctx, decommissionStatusRequest) - if err != nil { - return nil, nil, err - } - - if !showDecommissioned { - for _, status := range decommissionStatusResp.Status { - if !status.Decommissioning || status.IsLive { - // Show this entry. - continue - } - for i := 0; i < len(nodeStatuses); i++ { - if nodeStatuses[i].Desc.NodeID == status.NodeID { - // Hide this entry (by swapping it out with the last one). - last := len(nodeStatuses) - 1 - nodeStatuses[i] = nodeStatuses[last] - nodeStatuses = nodeStatuses[:last] - } - } - } - // Sort the surviving entries (again) by NodeID. - sort.Slice(nodeStatuses, func(i, j int) bool { - return nodeStatuses[i].Desc.NodeID < nodeStatuses[j].Desc.NodeID - }) - } - return nodeStatuses, decommissionStatusResp, nil } func getStatusNodeHeaders() []string { @@ -243,61 +270,6 @@ func getStatusNodeAlignment() string { return align } -// nodeStatusesToRows converts NodeStatuses to SQL-like result rows, so that we can pretty-print -// them. We also pass a decommission status object if status was called with the --decommission flag. -func nodeStatusesToRows( - statuses []status.NodeStatus, decomStatus *serverpb.DecommissionStatusResponse, -) [][]string { - // Create results that are like the results for SQL results, so that we can pretty-print them. - var rows [][]string - for i, nodeStatus := range statuses { - hostPort := nodeStatus.Desc.Address.AddressField - updatedAt := timeutil.Unix(0, nodeStatus.UpdatedAt) - updatedAtStr := updatedAt.Format(localTimeFormat) - startedAt := timeutil.Unix(0, nodeStatus.StartedAt) - startedAtStr := startedAt.Format(localTimeFormat) - build := nodeStatus.BuildInfo.Tag - - metricVals := map[string]float64{} - for _, storeStatus := range nodeStatus.StoreStatuses { - for key, val := range storeStatus.Metrics { - metricVals[key] += val - } - } - - row := []string{strconv.FormatInt(int64(nodeStatus.Desc.NodeID), 10), - hostPort, - build, - updatedAtStr, - startedAtStr, - strconv.FormatBool(decomStatus.Status[i].IsLive)} - - if nodeCtx.statusShowAll || nodeCtx.statusShowRanges { - row = append(row, - strconv.FormatInt(int64(metricVals["replicas.leaders"]), 10), - strconv.FormatInt(int64(metricVals["replicas.leaseholders"]), 10), - strconv.FormatInt(int64(metricVals["replicas"]), 10), - strconv.FormatInt(int64(metricVals["ranges.unavailable"]), 10), - strconv.FormatInt(int64(metricVals["ranges.underreplicated"]), 10), - ) - } - if nodeCtx.statusShowAll || nodeCtx.statusShowStats { - row = append(row, - strconv.FormatInt(int64(metricVals["livebytes"]), 10), - strconv.FormatInt(int64(metricVals["keybytes"]), 10), - strconv.FormatInt(int64(metricVals["valbytes"]), 10), - strconv.FormatInt(int64(metricVals["intentbytes"]), 10), - strconv.FormatInt(int64(metricVals["sysbytes"]), 10), - ) - } - if nodeCtx.statusShowAll || nodeCtx.statusShowDecommission { - row = append(row, decommissionResponseValueToRows(decomStatus.Status)[i][2:]...) - } - rows = append(rows, row) - } - return rows -} - var decommissionNodesColumnHeaders = []string{ "id", "is_live",