Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stmtdiagnostics: remove the usage of gossip #83547

Merged
merged 1 commit into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions pkg/gossip/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,6 @@ const (
// client connections a node has open. This is used by other nodes in the
// cluster to build a map of the gossip network.
KeyGossipClientsPrefix = "gossip-clients"

// KeyGossipStatementDiagnosticsRequest is the gossip key for new statement
// diagnostics requests. The values is the id of the request that generated
// the notification, as a little-endian-encoded uint64.
// stmtDiagnosticsRequestRegistry listens for notifications and responds by
// polling for new requests.
KeyGossipStatementDiagnosticsRequest = "stmt-diag-req"

// KeyGossipStatementDiagnosticsRequestCancellation is the gossip key for
// canceling an existing diagnostics request. The values is the id of the
// request that needs to be canceled, as a little-endian-encoded uint64.
// stmtDiagnosticsRequestRegistry listens for notifications and responds by
// polling for new requests.
KeyGossipStatementDiagnosticsRequestCancellation = "stmt-diag-cancel-req"
)

// MakeKey creates a canonical key under which to gossip a piece of
Expand Down
1 change: 0 additions & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry(
cfg.circularInternalExecutor,
cfg.db,
cfg.gossip,
cfg.Settings,
)
execCfg.StmtDiagnosticsRecorder = stmtDiagnosticsRegistry
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func startConnExecutor(
),
QueryCache: querycache.New(0),
TestingKnobs: ExecutorTestingKnobs{},
StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, nil, gw, st),
StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, nil, st),
HistogramWindowInterval: base.DefaultHistogramWindowInterval(),
CollectionFactory: descs.NewBareBonesCollectionFactory(st, keys.SystemSQLCodec),
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/stmtdiagnostics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/gossip",
"//pkg/kv",
"//pkg/multitenant",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/settings",
"//pkg/settings/cluster",
Expand Down
107 changes: 9 additions & 98 deletions pkg/sql/stmtdiagnostics/statement_diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,13 @@ package stmtdiagnostics

import (
"context"
"encoding/binary"
"fmt"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -38,7 +35,7 @@ import (
)

var pollingInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
settings.TenantReadOnly,
"sql.stmt_diagnostics.poll_interval",
"rate at which the stmtdiagnostics.Registry polls for requests, set to zero to disable",
10*time.Second)
Expand Down Expand Up @@ -80,19 +77,9 @@ type Registry struct {

rand *rand.Rand
}
st *cluster.Settings
ie sqlutil.InternalExecutor
db *kv.DB
gossip gossip.OptionalGossip

// gossipUpdateChan is used to notify the polling loop that a diagnostics
// request has been added. The gossip callback will not block sending on this
// channel.
gossipUpdateChan chan RequestID
// gossipCancelChan is used to notify the polling loop that a diagnostics
// request has been canceled. The gossip callback will not block sending on
// this channel.
gossipCancelChan chan RequestID
st *cluster.Settings
ie sqlutil.InternalExecutor
db *kv.DB
}

// Request describes a statement diagnostics request along with some conditional
Expand All @@ -113,25 +100,13 @@ func (r *Request) isConditional() bool {
}

// NewRegistry constructs a new Registry.
func NewRegistry(
ie sqlutil.InternalExecutor, db *kv.DB, gw gossip.OptionalGossip, st *cluster.Settings,
) *Registry {
func NewRegistry(ie sqlutil.InternalExecutor, db *kv.DB, st *cluster.Settings) *Registry {
r := &Registry{
ie: ie,
db: db,
gossip: gw,
gossipUpdateChan: make(chan RequestID, 1),
gossipCancelChan: make(chan RequestID, 1),
st: st,
ie: ie,
db: db,
st: st,
}
r.mu.rand = rand.New(rand.NewSource(timeutil.Now().UnixNano()))

// Some tests pass a nil gossip, and gossip is not available on SQL tenant
// servers.
g, ok := gw.Optional(47893)
if ok && g != nil {
g.RegisterCallback(gossip.KeyGossipStatementDiagnosticsRequest, r.gossipNotification)
}
return r
}

Expand Down Expand Up @@ -187,17 +162,6 @@ func (r *Registry) poll(ctx context.Context) {
select {
case <-pollIntervalChanged:
continue // go back around and maybe reset the timer
case reqID := <-r.gossipUpdateChan:
if r.findRequest(reqID) {
continue // request already exists, don't do anything
}
// Poll the data.
case reqID := <-r.gossipCancelChan:
r.cancelRequest(reqID)
// No need to poll the data (unlike above) because we don't have to
// read anything of the system table to remove the request from the
// registry.
continue
case <-timer.C:
timer.Read = true
case <-ctx.Done():
Expand Down Expand Up @@ -241,12 +205,6 @@ func (r *Registry) addRequestInternalLocked(
}
}

func (r *Registry) findRequest(requestID RequestID) bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.findRequestLocked(requestID)
}

// findRequestLocked returns whether the request already exists. If the request
// is not ongoing and has already expired, it is removed from the registry (yet
// true is still returned).
Expand Down Expand Up @@ -291,11 +249,6 @@ func (r *Registry) insertRequestInternal(
minExecutionLatency time.Duration,
expiresAfter time.Duration,
) (RequestID, error) {
g, err := r.gossip.OptionalErr(48274)
if err != nil {
return 0, err
}

isSamplingProbabilitySupported := r.st.Version.IsActive(ctx, clusterversion.SampledStmtDiagReqs)
if !isSamplingProbabilitySupported && samplingProbability != 0 {
return 0, errors.New(
Expand All @@ -315,7 +268,7 @@ func (r *Registry) insertRequestInternal(

var reqID RequestID
var expiresAt time.Time
err = r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Check if there's already a pending request for this fingerprint.
row, err := r.ie.QueryRowEx(ctx, "stmt-diag-check-pending", txn,
sessiondata.InternalExecutorOverride{
Expand Down Expand Up @@ -393,23 +346,11 @@ func (r *Registry) insertRequestInternal(
r.addRequestInternalLocked(ctx, reqID, stmtFingerprint, samplingProbability, minExecutionLatency, expiresAt)
}()

// Notify all the other nodes that they have to poll.
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, uint64(reqID))
if err := g.AddInfo(gossip.KeyGossipStatementDiagnosticsRequest, buf, 0 /* ttl */); err != nil {
log.Warningf(ctx, "error notifying of diagnostics request: %s", err)
}

return reqID, nil
}

// CancelRequest is part of the server.StmtDiagnosticsRequester interface.
func (r *Registry) CancelRequest(ctx context.Context, requestID int64) error {
g, err := r.gossip.OptionalErr(48274)
if err != nil {
return err
}

row, err := r.ie.QueryRowEx(ctx, "stmt-diag-cancel-request", nil, /* txn */
sessiondata.InternalExecutorOverride{
User: username.RootUserName(),
Expand All @@ -435,13 +376,6 @@ func (r *Registry) CancelRequest(ctx context.Context, requestID int64) error {
reqID := RequestID(requestID)
r.cancelRequest(reqID)

// Notify all the other nodes that this request has been canceled.
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, uint64(reqID))
if err := g.AddInfo(gossip.KeyGossipStatementDiagnosticsRequestCancellation, buf, 0 /* ttl */); err != nil {
log.Warningf(ctx, "error notifying of diagnostics request cancellation: %s", err)
}

return nil
}

Expand Down Expand Up @@ -732,26 +666,3 @@ func (r *Registry) pollRequests(ctx context.Context) error {
}
return nil
}

// gossipNotification is called in response to a gossip update informing us that
// we need to poll.
func (r *Registry) gossipNotification(s string, value roachpb.Value) {
switch s {
case gossip.KeyGossipStatementDiagnosticsRequest:
select {
case r.gossipUpdateChan <- RequestID(binary.LittleEndian.Uint64(value.RawBytes)):
default:
// Don't pile up on these requests and don't block gossip.
}
case gossip.KeyGossipStatementDiagnosticsRequestCancellation:
select {
case r.gossipCancelChan <- RequestID(binary.LittleEndian.Uint64(value.RawBytes)):
default:
// Don't pile up on these requests and don't block gossip.
}
default:
// We don't expect any other notifications. Perhaps in a future version
// we added other keys with the same prefix.
return
}
}
4 changes: 4 additions & 0 deletions pkg/sql/stmtdiagnostics/statement_diagnostics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ func TestDiagnosticsRequestDifferentNode(t *testing.T) {
_, err := db0.Exec("CREATE TABLE test (x int PRIMARY KEY)")
require.NoError(t, err)

// Lower the polling interval to speed up the test.
_, err = db0.Exec("SET CLUSTER SETTING sql.stmt_diagnostics.poll_interval = '1ms'")
require.NoError(t, err)

var minExecutionLatency, expiresAfter time.Duration
var samplingProbability float64

Expand Down