Skip to content

Commit

Permalink
cli: run node status and node ls commands through SQL
Browse files Browse the repository at this point in the history
Earlier `node status` command used to hit statusServer api endpoint and `node ls`
internally called `node status`.

Now, `node status` and `node ls` commands internally query various tables in
crdb_internal and format the result.

Closes cockroachdb#20713.

Release note: None
  • Loading branch information
Nishant Gupta committed May 8, 2018
1 parent f548705 commit 860afca
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 147 deletions.
4 changes: 1 addition & 3 deletions pkg/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1755,9 +1755,7 @@ func TestCLITimeout(t *testing.T) {
}

const exp = `node status 1 --timeout 1ns
operation timed out.
context deadline exceeded
pq: query execution canceled due to statement timeout
`
if out != exp {
err := errors.Errorf("unexpected output:\n%q\nwanted:\n%q", out, exp)
Expand Down
7 changes: 0 additions & 7 deletions pkg/cli/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,6 @@ type cliContext struct {
// Defaults set by InitCLIDefaults() above.
var cliCtx = cliContext{Config: baseCfg}

func cmdTimeoutContext(ctx context.Context) (context.Context, func()) {
if cliCtx.cmdTimeout != 0 {
return context.WithTimeout(ctx, cliCtx.cmdTimeout)
}
return context.WithCancel(ctx)
}

// sqlCtx captures the command-line parameters of the `sql` command.
// Defaults set by InitCLIDefaults() above.
var sqlCtx = struct {
Expand Down
231 changes: 94 additions & 137 deletions pkg/cli/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"math"
"os"
"reflect"
"sort"
"strconv"
"time"

Expand All @@ -29,13 +28,11 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

const (
localTimeFormat = "2006-01-02 15:04:05"
localTimeFormat = "2006-01-02 15:04:05.999999-07:00"
)

var lsNodesColumnHeaders = []string{
Expand All @@ -54,17 +51,26 @@ To retrieve the IDs for inactive members, see 'node status --decommission'.
}

func runLsNodes(cmd *cobra.Command, args []string) error {
const showDecommissioned = false
nodeStatuses, _, err := runStatusNodeInner(showDecommissioned, nil)
conn, err := getPasswordAndMakeSQLClient("cockroach node ls")
if err != nil {
return err
}
defer conn.Close()

var rows [][]string
for _, nodeStatus := range nodeStatuses {
rows = append(rows, []string{
strconv.FormatInt(int64(nodeStatus.Desc.NodeID), 10),
})
if cliCtx.cmdTimeout != 0 {
if err := conn.Exec(fmt.Sprintf("SET statement_timeout=%d", cliCtx.cmdTimeout), nil); err != nil {
return err
}
}

_, rows, err := runQuery(
conn,
makeQuery(`SELECT node_id FROM crdb_internal.gossip_liveness
WHERE decommissioning = false OR expiration > concat((now()::decimal(30,9))::text, ',0')`),
false,
)
if err != nil {
return err
}

return printQueryOutput(os.Stdout, lsNodesColumnHeaders, newRowSliceIter(rows, "r"))
Expand Down Expand Up @@ -113,105 +119,111 @@ var statusNodeCmd = &cobra.Command{
}

func runStatusNode(cmd *cobra.Command, args []string) error {
nodeStatuses, decommissionStatusResp, err := runStatusNodeInner(
nodeCtx.statusShowDecommission || nodeCtx.statusShowAll, args,
)
_, rows, err := runStatusNodeInner(args)
if err != nil {
return err
}

sliceIter := newRowSliceIter(nodeStatusesToRows(nodeStatuses, decommissionStatusResp), getStatusNodeAlignment())
sliceIter := newRowSliceIter(rows, getStatusNodeAlignment())
return printQueryOutput(os.Stdout, getStatusNodeHeaders(), sliceIter)
}

func runStatusNodeInner(
showDecommissioned bool, args []string,
) ([]status.NodeStatus, *serverpb.DecommissionStatusResponse, error) {
ctx, cancel := cmdTimeoutContext(context.Background())
defer cancel()
func runStatusNodeInner(args []string) ([]string, [][]string, error) {

joinUsingID := func(queries []string) (query string) {
for i, q := range queries {
if i == 0 {
query = q
continue
}
query = "(" + query + ") JOIN (" + q + ") USING (id)"
}
return
}

var nodeStatuses []status.NodeStatus
baseQuery := joinUsingID(
[]string{
"SELECT node_id AS id, address, tag AS build, started_at, updated_at from crdb_internal.kv_node_status",
`SELECT node_id AS id,
CASE WHEN expiration > concat((now()::decimal(30,9))::text, ',0') THEN true ELSE false END AS is_live
FROM crdb_internal.gossip_liveness`,
},
)

conn, _, finish, err := getClientGRPCConn(ctx)
rangesQuery := `
SELECT node_id AS id,
SUM((metrics->>'replicas.leaders')::DECIMAL)::INT AS replicas_leaders,
SUM((metrics->>'replicas.leaseholders')::DECIMAL)::INT AS replicas_leaseholders,
SUM((metrics->>'replicas')::DECIMAL)::INT AS ranges,
SUM((metrics->>'ranges.underreplicated')::DECIMAL)::INT AS ranges_underreplicated,
SUM((metrics->>'ranges.unavailable')::DECIMAL)::INT AS ranges_unavailable
FROM crdb_internal.kv_store_status
GROUP BY node_id`

statsQuery := `
SELECT node_id AS id,
SUM((metrics->>'livebytes')::DECIMAL)::INT AS live_bytes,
SUM((metrics->>'keybytes')::DECIMAL)::INT AS key_bytes,
SUM((metrics->>'valbytes')::DECIMAL)::INT AS value_bytes,
SUM((metrics->>'intentbytes')::DECIMAL)::INT AS intent_bytes,
SUM((metrics->>'sysbytes')::DECIMAL)::INT AS system_bytes
FROM crdb_internal.kv_store_status
GROUP BY node_id`

decommissionQuery := joinUsingID(
[]string{
`SELECT node_id AS id, SUM((metrics->>'replicas')::DECIMAL)::INT AS gossiped_replicas
FROM crdb_internal.kv_store_status GROUP BY node_id`,
`SELECT node_id AS id, decommissioning AS is_decommissioning, draining AS is_draining
FROM crdb_internal.gossip_liveness`,
},
)

conn, err := getPasswordAndMakeSQLClient("cockroach node status")
if err != nil {
return nil, nil, err
}
defer finish()
defer conn.Close()

c := serverpb.NewStatusClient(conn)
queriesToJoin := []string{baseQuery}

var decommissionStatusRequest *serverpb.DecommissionStatusRequest
if nodeCtx.statusShowAll || nodeCtx.statusShowRanges {
queriesToJoin = append(queriesToJoin, rangesQuery)
}
if nodeCtx.statusShowAll || nodeCtx.statusShowStats {
queriesToJoin = append(queriesToJoin, statsQuery)
}
if nodeCtx.statusShowAll || nodeCtx.statusShowDecommission {
queriesToJoin = append(queriesToJoin, decommissionQuery)
}

switch len(args) {
case 0:
// Show status for all nodes.
nodes, err := c.Nodes(ctx, &serverpb.NodesRequest{})
if err != nil {
if cliCtx.cmdTimeout != 0 {
if err := conn.Exec(fmt.Sprintf("SET statement_timeout=%d", cliCtx.cmdTimeout), nil); err != nil {
return nil, nil, err
}
nodeStatuses = nodes.Nodes
decommissionStatusRequest = &serverpb.DecommissionStatusRequest{
NodeIDs: []roachpb.NodeID{},
}
}

switch len(args) {
case 0:
query := makeQuery("SELECT * FROM " + joinUsingID(queriesToJoin) + " ORDER BY id")
return runQuery(conn, query, false)
case 1:
nodeID := args[0]
nodeStatus, err := c.Node(ctx, &serverpb.NodeRequest{NodeId: nodeID})
nodeID, err := strconv.Atoi(args[0])
if err != nil {
return nil, nil, err
return nil, nil, errors.Errorf("could not parse node_id %s", args[0])
}
nodeIDs, err := parseNodeIDs(args)
query := makeQuery("SELECT * FROM "+joinUsingID(queriesToJoin)+" WHERE id = $1 ORDER BY id", nodeID)
headers, rows, err := runQuery(conn, query, false)
if err != nil {
return nil, nil, err
}
decommissionStatusRequest = &serverpb.DecommissionStatusRequest{
NodeIDs: nodeIDs,
}
if nodeStatus.Desc.NodeID == 0 {
// I'm not sure why the status call doesn't return an error when the given NodeID doesn't
// exist. This should be revisited.
//
// TODO(cdo): Look into why status call returns erroneous data when given node ID of 0.
return nil, nil, fmt.Errorf("Error: node %s doesn't exist", nodeID)
if len(rows) == 0 {
return nil, nil, fmt.Errorf("Error: node %d doesn't exist", nodeID)
}
nodeStatuses = []status.NodeStatus{*nodeStatus}

return headers, rows, nil
default:
return nil, nil, errors.Errorf("expected no arguments or a single node ID")
}

cAdmin, finish, err := getAdminClient(ctx)
if err != nil {
return nil, nil, err
}
defer finish()

decommissionStatusResp, err := cAdmin.DecommissionStatus(ctx, decommissionStatusRequest)
if err != nil {
return nil, nil, err
}

if !showDecommissioned {
for _, status := range decommissionStatusResp.Status {
if !status.Decommissioning || status.IsLive {
// Show this entry.
continue
}
for i := 0; i < len(nodeStatuses); i++ {
if nodeStatuses[i].Desc.NodeID == status.NodeID {
// Hide this entry (by swapping it out with the last one).
last := len(nodeStatuses) - 1
nodeStatuses[i] = nodeStatuses[last]
nodeStatuses = nodeStatuses[:last]
}
}
}
// Sort the surviving entries (again) by NodeID.
sort.Slice(nodeStatuses, func(i, j int) bool {
return nodeStatuses[i].Desc.NodeID < nodeStatuses[j].Desc.NodeID
})
}
return nodeStatuses, decommissionStatusResp, nil
}

func getStatusNodeHeaders() []string {
Expand Down Expand Up @@ -243,61 +255,6 @@ func getStatusNodeAlignment() string {
return align
}

// nodeStatusesToRows converts NodeStatuses to SQL-like result rows, so that we can pretty-print
// them. We also pass a decommission status object if status was called with the --decommission flag.
func nodeStatusesToRows(
statuses []status.NodeStatus, decomStatus *serverpb.DecommissionStatusResponse,
) [][]string {
// Create results that are like the results for SQL results, so that we can pretty-print them.
var rows [][]string
for i, nodeStatus := range statuses {
hostPort := nodeStatus.Desc.Address.AddressField
updatedAt := timeutil.Unix(0, nodeStatus.UpdatedAt)
updatedAtStr := updatedAt.Format(localTimeFormat)
startedAt := timeutil.Unix(0, nodeStatus.StartedAt)
startedAtStr := startedAt.Format(localTimeFormat)
build := nodeStatus.BuildInfo.Tag

metricVals := map[string]float64{}
for _, storeStatus := range nodeStatus.StoreStatuses {
for key, val := range storeStatus.Metrics {
metricVals[key] += val
}
}

row := []string{strconv.FormatInt(int64(nodeStatus.Desc.NodeID), 10),
hostPort,
build,
updatedAtStr,
startedAtStr,
strconv.FormatBool(decomStatus.Status[i].IsLive)}

if nodeCtx.statusShowAll || nodeCtx.statusShowRanges {
row = append(row,
strconv.FormatInt(int64(metricVals["replicas.leaders"]), 10),
strconv.FormatInt(int64(metricVals["replicas.leaseholders"]), 10),
strconv.FormatInt(int64(metricVals["replicas"]), 10),
strconv.FormatInt(int64(metricVals["ranges.unavailable"]), 10),
strconv.FormatInt(int64(metricVals["ranges.underreplicated"]), 10),
)
}
if nodeCtx.statusShowAll || nodeCtx.statusShowStats {
row = append(row,
strconv.FormatInt(int64(metricVals["livebytes"]), 10),
strconv.FormatInt(int64(metricVals["keybytes"]), 10),
strconv.FormatInt(int64(metricVals["valbytes"]), 10),
strconv.FormatInt(int64(metricVals["intentbytes"]), 10),
strconv.FormatInt(int64(metricVals["sysbytes"]), 10),
)
}
if nodeCtx.statusShowAll || nodeCtx.statusShowDecommission {
row = append(row, decommissionResponseValueToRows(decomStatus.Status)[i][2:]...)
}
rows = append(rows, row)
}
return rows
}

var decommissionNodesColumnHeaders = []string{
"id",
"is_live",
Expand Down

0 comments on commit 860afca

Please sign in to comment.