Skip to content

Commit

Permalink
Merge #66639 #68045
Browse files Browse the repository at this point in the history
66639: sql,server: change indexusagestats subsystem to issue cluster RPC fanout r=Azhng a=Azhng

sql: introduce crdb_internal.index_usage_stats virtual table

This commit introduce crdb_internal.index_usage_stats virtual
table that is backed by new clusterindexusagestats package. This
new package implements a variant of the indexusagestats interface
and serves the data by issuing cluster RPC fanout.

Addresses #64740

Followup to #66451

Release note (sql change): introduce crdb_internal.index_usage_statistics
virtual table to surface index usage statistics.
sql.metrics.index_usage_stats.enabled cluster setting can be used to
turn on/off the subsystem. It is default to true.

68045: kv: grab raftMu during no-op writes with local gossip triggers r=nvanbenschoten a=nvanbenschoten

Fixes #68011.

As of 9f8c019, it is now possible to have no-op writes that do not go through
Raft but do set one of the gossip triggers. These gossip triggers require the
raftMu to be held, so we were running into trouble when handling the local
eval results above Raft.

For instance, we see this case when a transaction sets the system config
trigger and then performs a delete range over an empty span before
committing. In this case, the transaction will have no intents to
remove, so it can auto-GC its record during an EndTxn. If its record was
never written in the first place, this is a no-op (as of 9f8c019).

There appear to be three ways we could solve this:
1. we can avoid setting gossip triggers on transactions that don't perform
   any writes.
2. we can force EndTxn requests with gossip triggers to go through Raft even
   if they are otherwise no-ops.
3. we can properly handle gossip triggers on the above Raft local eval result
   path.

This commit opts for the third option.

Co-authored-by: Azhng <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
3 people committed Jul 26, 2021
3 parents a35b845 + 0a95d40 + 584fb97 commit 82b32cf
Show file tree
Hide file tree
Showing 23 changed files with 1,470 additions and 532 deletions.
44 changes: 44 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -3195,6 +3195,50 @@ Response object returned by ResetSQLStats.



## IndexUsageStatistics

`GET /_status/indexusagestatistics`



Support status: [reserved](#support-status)

#### Request Parameters




Request object for issuing IndexUsageStatistics request.


| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| node_id | [string](#cockroach.server.serverpb.IndexUsageStatisticsRequest-string) | | node_id is the ID of the node where the stats data shall be retrieved from. If this is left empty, the cluster-wide aggregated result will be returned. | [reserved](#support-status) |







#### Response Parameters




Response object returned by IndexUsageStatistics.


| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| statistics | [cockroach.sql.CollectedIndexUsageStatistics](#cockroach.server.serverpb.IndexUsageStatisticsResponse-cockroach.sql.CollectedIndexUsageStatistics) | repeated | | [reserved](#support-status) |







## RequestCA

`GET /_join/v1/ca`
Expand Down
1 change: 1 addition & 0 deletions pkg/base/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ type TestingKnobs struct {
JobsTestingKnobs ModuleTestingKnobs
BackupRestore ModuleTestingKnobs
MigrationManager ModuleTestingKnobs
IndexUsageStatsKnobs ModuleTestingKnobs
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,7 @@ func (sm *replicaStateMachine) ApplySideEffects(
if cmd.IsLocal() {
// Handle the LocalResult.
if cmd.localResult != nil {
sm.r.handleReadWriteLocalEvalResult(ctx, *cmd.localResult)
sm.r.handleReadWriteLocalEvalResult(ctx, *cmd.localResult, true /* raftMuHeld */)
}

rejected := cmd.Rejected()
Expand Down
18 changes: 17 additions & 1 deletion pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,9 @@ func addSSTablePreApply(
return copied
}

func (r *Replica) handleReadWriteLocalEvalResult(ctx context.Context, lResult result.LocalResult) {
func (r *Replica) handleReadWriteLocalEvalResult(
ctx context.Context, lResult result.LocalResult, raftMuHeld bool,
) {
// Fields for which no action is taken in this method are zeroed so that
// they don't trigger an assertion at the end of the method (which checks
// that all fields were handled).
Expand Down Expand Up @@ -707,21 +709,35 @@ func (r *Replica) handleReadWriteLocalEvalResult(ctx context.Context, lResult re
lResult.MaybeAddToSplitQueue = false
}

// The following three triggers require the raftMu to be held. If a
// trigger is present, acquire the mutex if it is not held already.
maybeAcquireRaftMu := func() func() {
if raftMuHeld {
return func() {}
}
raftMuHeld = true
r.raftMu.Lock()
return r.raftMu.Unlock
}

if lResult.MaybeGossipSystemConfig {
defer maybeAcquireRaftMu()()
if err := r.MaybeGossipSystemConfigRaftMuLocked(ctx); err != nil {
log.Errorf(ctx, "%v", err)
}
lResult.MaybeGossipSystemConfig = false
}

if lResult.MaybeGossipSystemConfigIfHaveFailure {
defer maybeAcquireRaftMu()()
if err := r.MaybeGossipSystemConfigIfHaveFailureRaftMuLocked(ctx); err != nil {
log.Errorf(ctx, "%v", err)
}
lResult.MaybeGossipSystemConfigIfHaveFailure = false
}

if lResult.MaybeGossipNodeLiveness != nil {
defer maybeAcquireRaftMu()()
if err := r.MaybeGossipNodeLivenessRaftMuLocked(ctx, *lResult.MaybeGossipNodeLiveness); err != nil {
log.Errorf(ctx, "%v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (r *Replica) evalAndPropose(
if proposal.command == nil {
intents := proposal.Local.DetachEncounteredIntents()
endTxns := proposal.Local.DetachEndTxns(pErr != nil /* alwaysOnly */)
r.handleReadWriteLocalEvalResult(ctx, *proposal.Local)
r.handleReadWriteLocalEvalResult(ctx, *proposal.Local, false /* raftMuHeld */)

pr := proposalResult{
Reply: proposal.Local.Reply,
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"drain.go",
"grpc_server.go",
"idle_monitor.go",
"index_usage_stats.go",
"init.go",
"init_handshake.go",
"loopback.go",
Expand Down Expand Up @@ -124,6 +125,7 @@ go_library(
"//pkg/sql/flowinfra",
"//pkg/sql/gcjob",
"//pkg/sql/gcjob/gcjobnotifier",
"//pkg/sql/idxusage",
"//pkg/sql/optionalnodeliveness",
"//pkg/sql/parser",
"//pkg/sql/pgwire",
Expand Down Expand Up @@ -260,6 +262,7 @@ go_test(
"drain_test.go",
"graphite_test.go",
"idle_monitor_test.go",
"index_usage_stats_test.go",
"init_handshake_test.go",
"intent_test.go",
"main_test.go",
Expand Down Expand Up @@ -320,6 +323,7 @@ go_test(
"//pkg/sql/catalog/dbdesc",
"//pkg/sql/catalog/descpb",
"//pkg/sql/execinfrapb",
"//pkg/sql/idxusage",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlstats",
"//pkg/sql/tests",
Expand Down
112 changes: 112 additions & 0 deletions pkg/server/index_usage_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/idxusage"
"github.com/cockroachdb/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// IndexUsageStatistics is the GRPC handler for serving index usage statistics.
// If the NodeID in the request payload is left empty, the handler will issue
// a cluster-wide RPC fanout to aggregate all index usage statistics from all
// the nodes. If the NodeID is specified, then the handler will handle the
// request either locally (if the NodeID matches the current node's NodeID) or
// forward it to the correct node.
func (s *statusServer) IndexUsageStatistics(
ctx context.Context, req *serverpb.IndexUsageStatisticsRequest,
) (*serverpb.IndexUsageStatisticsResponse, error) {
ctx = propagateGatewayMetadata(ctx)
ctx = s.AnnotateCtx(ctx)

if _, err := s.privilegeChecker.requireViewActivityPermission(ctx); err != nil {
return nil, err
}

localReq := &serverpb.IndexUsageStatisticsRequest{
NodeID: "local",
}

if len(req.NodeID) > 0 {
requestedNodeID, local, err := s.parseNodeID(req.NodeID)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if local {
statsReader := s.sqlServer.pgServer.SQLServer.GetLocalIndexStatistics()
return indexUsageStatsLocal(statsReader)
}

statusClient, err := s.dialNode(ctx, requestedNodeID)
if err != nil {
return nil, err
}

// We issue a localReq instead of the incoming req to other nodes. This is
// to instruct other nodes to only return us their node-local stats and
// do not further propagates the RPC call.
return statusClient.IndexUsageStatistics(ctx, localReq)
}

dialFn := func(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error) {
client, err := s.dialNode(ctx, nodeID)
return client, err
}

fetchIndexUsageStats := func(ctx context.Context, client interface{}, _ roachpb.NodeID) (interface{}, error) {
statusClient := client.(serverpb.StatusClient)
return statusClient.IndexUsageStatistics(ctx, localReq)
}

resp := &serverpb.IndexUsageStatisticsResponse{}
aggFn := func(_ roachpb.NodeID, nodeResp interface{}) {
stats := nodeResp.(*serverpb.IndexUsageStatisticsResponse)
resp.Statistics = append(resp.Statistics, stats.Statistics...)
}

var combinedError error
errFn := func(_ roachpb.NodeID, nodeFnError error) {
combinedError = errors.CombineErrors(combinedError, nodeFnError)
}

// It's unfortunate that we cannot use paginatedIterateNodes here because we
// need to aggregate all stats before returning. Returning a partial result
// yields an incorrect result.
if err := s.iterateNodes(ctx,
fmt.Sprintf("requesting index usage stats for node %s", req.NodeID),
dialFn, fetchIndexUsageStats, aggFn, errFn); err != nil {
return nil, err
}

return resp, nil
}

func indexUsageStatsLocal(
idxUsageStats *idxusage.LocalIndexUsageStats,
) (*serverpb.IndexUsageStatisticsResponse, error) {
resp := &serverpb.IndexUsageStatisticsResponse{}
if err := idxUsageStats.ForEach(idxusage.IteratorOptions{}, func(key *roachpb.IndexUsageKey, value *roachpb.IndexUsageStatistics) error {
resp.Statistics = append(resp.Statistics, roachpb.CollectedIndexUsageStatistics{Key: *key,
Stats: *value,
})
return nil
}); err != nil {
return nil, err
}
return resp, nil
}
Loading

0 comments on commit 82b32cf

Please sign in to comment.