Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
110458: rangefeed: add rangefeed scheduler metrics r=erikgrinaker,miretskiy a=aliher1911

This commit adds two rangefeed scheduler metrics to assist performance debugging.

Latency histogram which is showing how long events spent waiting in queue before they are scheduled for processing:
- `kv.rangefeed.scheduler.normal.latency`
- `kv.rangefeed.scheduler.system.latency`

Queue size which is showing how many ranges are pending event to be processed:
- `kv.rangefeed.scheduler.normal.queue_size`
- `kv.rangefeed.scheduler.system.queue_size`

Fixes: cockroachdb#110420

Release note: None

110609: server: replace admin checks with VIEWCLUSTERMETADATA and REPAIRCLUSTERMETADATA r=rafiss a=rafiss

Release note (security update): Endpoints in the admin and status server
that previously required the admin role now can be used by users with
the VIEWCLUSTERMETADATA or REPAIRCLUSTERMETADATA system privilege,
depending on whether the endpoint is read-only or can modify state.

fixes cockroachdb#79571
fixes cockroachdb#109814
Release note: None

111045: rangefeed: create catchup iterators eagerly r=erikgrinaker a=aliher1911

Previously, catchup iterators were created in the main rangefeed processor work loop. This is negatively affecting scheduler based processors as this operation could be slow.
This commit makes iterator creation eager, simplifying error handling and making rangefeed times delays lower.

Epic: [CRDB-26372](https://cockroachlabs.atlassian.net/browse/CRDB-26372)

Fixes: cockroachdb#111060
Fixes: cockroachdb#111040

Release note: None

Co-authored-by: Oleg Afanasyev <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
3 people committed Sep 22, 2023
4 parents ed75b08 + ab00ae9 + 2325016 + 54b8a73 commit 235babd
Show file tree
Hide file tree
Showing 27 changed files with 429 additions and 254 deletions.
6 changes: 5 additions & 1 deletion docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,11 @@
<tr><td>STORAGE</td><td>kv.rangefeed.mem_system</td><td>Memory usage by rangefeeds on system ranges</td><td>Memory</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.processors_goroutine</td><td>Number of active RangeFeed processors using goroutines</td><td>Processors</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.processors_scheduler</td><td>Number of active RangeFeed processors using scheduler</td><td>Processors</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.registrations</td><td>Number of active rangefeed registrations</td><td>Registrations</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.registrations</td><td>Number of active RangeFeed registrations</td><td>Registrations</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.scheduler.normal.latency</td><td>KV RangeFeed normal scheduler latency</td><td>Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.scheduler.normal.queue_size</td><td>Number of entries in the KV RangeFeed normal scheduler queue</td><td>Pending Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.scheduler.system.latency</td><td>KV RangeFeed system scheduler latency</td><td>Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.scheduler.system.queue_size</td><td>Number of entries in the KV RangeFeed system scheduler queue</td><td>Pending Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.replica_circuit_breaker.num_tripped_events</td><td>Number of times the per-Replica circuit breakers tripped since process start.</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>kv.replica_circuit_breaker.num_tripped_replicas</td><td>Number of Replicas for which the per-Replica circuit breaker is currently tripped.<br/><br/>A nonzero value indicates range or replica unavailability, and should be investigated.<br/>Replicas in this state will fail-fast all inbound requests.<br/></td><td>Replicas</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.replica_read_batch_evaluate.dropped_latches_before_eval</td><td>Number of times read-only batches dropped latches before evaluation.</td><td>Batches</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
12 changes: 7 additions & 5 deletions pkg/ccl/backupccl/alter_backup_schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
"github.com/cockroachdb/errors"
pbtypes "github.com/gogo/protobuf/types"
)
Expand Down Expand Up @@ -127,16 +129,16 @@ func doAlterBackupSchedules(
s.incJob.ScheduleID())
}

// Check that the user is admin or the owner of the schedules being altered.
isAdmin, err := p.UserHasAdminRole(ctx, p.User())
// Check that the user has privileges or is the owner of the schedules being altered.
hasPriv, err := p.HasPrivilege(ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.REPAIRCLUSTERMETADATA, p.User())
if err != nil {
return err
}
isOwnerOfFullJob := s.fullJob == nil || s.fullJob.Owner() == p.User()
isOwnerOfIncJob := s.incJob == nil || s.incJob.Owner() == p.User()
if !isAdmin && !(isOwnerOfFullJob && isOwnerOfIncJob) {
return pgerror.New(pgcode.InsufficientPrivilege, "must be admin or owner of the "+
"schedules being altered.")
if !hasPriv && !(isOwnerOfFullJob && isOwnerOfIncJob) {
return pgerror.Newf(pgcode.InsufficientPrivilege, "must be admin or the owner of the "+
"schedules being altered, or have %s privilege", privilege.REPAIRCLUSTERMETADATA)
}

if s, err = processFullBackupRecurrence(
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/backupccl/testdata/backup-restore/schedule-privileges
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ foocluster_admin BACKUP INTO LATEST IN 'external://foo/cluster' WITH OPTIONS (de
foocluster_admin BACKUP INTO 'external://foo/cluster' WITH OPTIONS (detached)

# nonadmin testuser is not allowed to drop a schedule they do not own.
exec-sql expect-error-regex=(must be admin or owner of the schedule [0-9]+ to DROP it) user=testuser
exec-sql expect-error-regex=(must have REPAIRCLUSTERMETADATA privilege or be owner of the schedule [0-9]+ to DROP it) user=testuser
DROP SCHEDULE $fullID
----
regex matches error
Expand Down Expand Up @@ -141,7 +141,7 @@ let $otherFullID $otherIncID
with schedules as (show schedules) select id from schedules where label='foocluster_admin_revoke' order by command->>'backup_type' asc;
----

exec-sql expect-error-regex=(must be admin or owner of the schedule [0-9]+ to DROP it) user=testuser
exec-sql expect-error-regex=(must have REPAIRCLUSTERMETADATA privilege or be owner of the schedule [0-9]+ to DROP it) user=testuser
DROP SCHEDULE $otherFullID
----
regex matches error
Expand Down Expand Up @@ -180,17 +180,17 @@ DROP SCHEDULE $testuserIncID;
----

# But testuser can't drop, alter, resume or pause the root owned schedules.
exec-sql expect-error-regex=(must be admin or owner of the schedule [0-9]+ to PAUSE it) user=testuser
exec-sql expect-error-regex=(must have REPAIRCLUSTERMETADATA privilege or be owner of the schedule [0-9]+ to PAUSE it) user=testuser
PAUSE SCHEDULE $otherFullID
----
regex matches error

exec-sql expect-error-regex=(must be admin or owner of the schedule [0-9]+ to RESUME it) user=testuser
exec-sql expect-error-regex=(must have REPAIRCLUSTERMETADATA privilege or be owner of the schedule [0-9]+ to RESUME it) user=testuser
RESUME SCHEDULE $otherFullID
----
regex matches error

exec-sql expect-error-regex=(must be admin or owner of the schedule [0-9]+ to DROP it) user=testuser
exec-sql expect-error-regex=(must have REPAIRCLUSTERMETADATA privilege or be owner of the schedule [0-9]+ to DROP it) user=testuser
DROP SCHEDULE $otherFullID;
----
regex matches error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ GRANT SYSTEM EXTERNALCONNECTION TO testuser
query-sql user=testuser
SHOW CREATE ALL EXTERNAL CONNECTIONS
----
pq: must be admin to run `SHOW CREATE ALL EXTERNAL CONNECTIONS
pq: must have VIEWCLUSTERMETADATA privilege to run `SHOW CREATE ALL EXTERNAL CONNECTIONS`

query-sql user=testuser
SHOW CREATE EXTERNAL CONNECTION foo
----
pq: must be admin or owner of the External Connection "foo"
pq: must have VIEWCLUSTERMETADATA privilege or be owner of the External Connection "foo"

# Create External Connection where testuser is the owner, they should be able to SHOW this object.
exec-sql user=testuser
Expand All @@ -95,7 +95,7 @@ CREATE EXTERNAL CONNECTION bar AS 'nodelocal://1/foo'
query-sql user=testuser
SHOW CREATE ALL EXTERNAL CONNECTIONS
----
pq: must be admin to run `SHOW CREATE ALL EXTERNAL CONNECTIONS
pq: must have VIEWCLUSTERMETADATA privilege to run `SHOW CREATE ALL EXTERNAL CONNECTIONS`

# TODO(aditymaru): Synthetic privileges do not have a concept of owners. Once they do, testuser will
# be able to run this query successfully since they are the owner of the External Connection object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ GRANT SYSTEM EXTERNALCONNECTION TO testuser
query-sql user=testuser
SHOW CREATE ALL EXTERNAL CONNECTIONS
----
pq: must be admin to run `SHOW CREATE ALL EXTERNAL CONNECTIONS
pq: must have VIEWCLUSTERMETADATA privilege to run `SHOW CREATE ALL EXTERNAL CONNECTIONS`

query-sql user=testuser
SHOW CREATE EXTERNAL CONNECTION foo
----
pq: must be admin or owner of the External Connection "foo"
pq: must have VIEWCLUSTERMETADATA privilege or be owner of the External Connection "foo"

# Create External Connection where testuser is the owner, they should be able to SHOW this object.
exec-sql user=testuser
Expand All @@ -91,7 +91,7 @@ CREATE EXTERNAL CONNECTION bar AS 'nodelocal://1/foo'
query-sql user=testuser
SHOW CREATE ALL EXTERNAL CONNECTIONS
----
pq: must be admin to run `SHOW CREATE ALL EXTERNAL CONNECTIONS
pq: must have VIEWCLUSTERMETADATA privilege to run `SHOW CREATE ALL EXTERNAL CONNECTIONS`

# TODO(aditymaru): Synthetic privileges do not have a concept of owners. Once they do, testuser will
# be able to run this query successfully since they are the owner of the External Connection object.
Expand Down
14 changes: 14 additions & 0 deletions pkg/ccl/serverccl/statusccl/tenant_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ func testTenantSpanStats(ctx context.Context, t *testing.T, helper serverccl.Ten
require.Error(t, err)
require.Contains(t, err.Error(), "Forbidden")

// VIEWCLUSTERMETADATA should allow the user to see the span stats.
grantStmt := `GRANT SYSTEM VIEWCLUSTERMETADATA TO authentic_user_noadmin;`
helper.TestCluster().TenantConn(0).Exec(t, grantStmt)

err = client.PostJSONChecked("/_status/span", &req, &resp)
require.NoError(t, err)
require.NotEmpty(t, resp.SpanToStats)

revokeStmt := `REVOKE SYSTEM VIEWCLUSTERMETADATA FROM authentic_user_noadmin;`
helper.TestCluster().TenantConn(0).Exec(t, revokeStmt)

adminClient := helper.TestCluster().TenantHTTPClient(t, 1, true)
adminClient.PostJSON("/_status/span", &req, &resp)
require.Greaterf(t, resp.SpanToStats[aSpan.String()].RangeCount, int32(0), "positive range count")
Expand Down Expand Up @@ -1508,6 +1519,9 @@ func testTenantHotRanges(_ context.Context, t *testing.T, helper serverccl.Tenan

client.PostJSON("/_status/v2/hotranges", &req, &resp)
require.NotEmpty(t, resp.Ranges)

revokeStmt := `REVOKE SYSTEM VIEWCLUSTERMETADATA FROM authentic_user_noadmin;`
helper.TestCluster().TenantConn(0).Exec(t, revokeStmt)
})

t.Run("test tenant hot ranges respects tenant isolation", func(t *testing.T) {
Expand Down
61 changes: 60 additions & 1 deletion pkg/kv/kvserver/rangefeed/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package rangefeed

import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -38,7 +39,7 @@ var (
}
metaRangeFeedRegistrations = metric.Metadata{
Name: "kv.rangefeed.registrations",
Help: "Number of active rangefeed registrations",
Help: "Number of active RangeFeed registrations",
Measurement: "Registrations",
Unit: metric.Unit_COUNT,
}
Expand All @@ -54,6 +55,18 @@ var (
Measurement: "Processors",
Unit: metric.Unit_COUNT,
}
metaQueueTimeHistogramsTemplate = metric.Metadata{
Name: "kv.rangefeed.scheduler.%s.latency",
Help: "KV RangeFeed %s scheduler latency",
Measurement: "Latency",
Unit: metric.Unit_NANOSECONDS,
}
metaQueueSizeTemplate = metric.Metadata{
Name: "kv.rangefeed.scheduler.%s.queue_size",
Help: "Number of entries in the KV RangeFeed %s scheduler queue",
Measurement: "Pending Ranges",
Unit: metric.Unit_COUNT,
}
)

// Metrics are for production monitoring of RangeFeeds.
Expand Down Expand Up @@ -120,3 +133,49 @@ func NewFeedBudgetMetrics(histogramWindow time.Duration) *FeedBudgetPoolMetrics
"Memory usage by rangefeeds")),
}
}

// ShardMetrics metrics for individual scheduler shard.
type ShardMetrics struct {
// QueueTime is time spent by range in scheduler queue.
QueueTime metric.IHistogram
// QueueSize is number of elements in the queue recently observed by reader.
QueueSize *metric.Gauge
}

// MetricStruct implements metrics.Struct interface.
func (*ShardMetrics) MetricStruct() {}

// SchedulerMetrics for production monitoring of rangefeed Scheduler.
type SchedulerMetrics struct {
SystemPriority *ShardMetrics
NormalPriority *ShardMetrics
}

// MetricStruct implements metrics.Struct interface.
func (*SchedulerMetrics) MetricStruct() {}

// NewSchedulerMetrics creates metric struct for Scheduler.
func NewSchedulerMetrics(histogramWindow time.Duration) *SchedulerMetrics {
return &SchedulerMetrics{
SystemPriority: newSchedulerShardMetrics("system", histogramWindow),
NormalPriority: newSchedulerShardMetrics("normal", histogramWindow),
}
}

func newSchedulerShardMetrics(name string, histogramWindow time.Duration) *ShardMetrics {
expandTemplate := func(template metric.Metadata) metric.Metadata {
result := template
result.Name = fmt.Sprintf(template.Name, name)
result.Help = fmt.Sprintf(template.Help, name)
return result
}
return &ShardMetrics{
QueueTime: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: expandTemplate(metaQueueTimeHistogramsTemplate),
Duration: histogramWindow,
BucketConfig: metric.IOLatencyBuckets,
}),
QueueSize: metric.NewGauge(expandTemplate(metaQueueSizeTemplate)),
}
}
25 changes: 7 additions & 18 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ type Processor interface {
// provided an error when the registration closes.
//
// The optionally provided "catch-up" iterator is used to read changes from the
// engine which occurred after the provided start timestamp (exclusive).
// engine which occurred after the provided start timestamp (exclusive). If
// this method succeeds, registration must take ownership of iterator and
// subsequently close it. If method fails, iterator must be kept intact and
// would be closed by caller.
//
// If the method returns false, the processor will have been stopped, so calling
// Stop is not necessary. If the method returns true, it will also return an
Expand All @@ -174,7 +177,7 @@ type Processor interface {
Register(
span roachpb.RSpan,
startTS hlc.Timestamp, // exclusive
catchUpIterConstructor CatchUpIteratorConstructor,
catchUpIter *CatchUpIterator,
withDiff bool,
stream Stream,
disconnectFn func(),
Expand Down Expand Up @@ -332,12 +335,6 @@ func NewLegacyProcessor(cfg Config) *LegacyProcessor {
// engine has not been closed.
type IntentScannerConstructor func() IntentScanner

// CatchUpIteratorConstructor is used to construct an iterator that can be used
// for catchup-scans. Takes the key span and exclusive start time to run the
// catchup scan for. It should be called from underneath a stopper task to
// ensure that the engine has not been closed.
type CatchUpIteratorConstructor func(roachpb.Span, hlc.Timestamp) (*CatchUpIterator, error)

// Start implements Processor interface.
//
// LegacyProcessor launches a goroutine to process rangefeed events and send
Expand Down Expand Up @@ -410,14 +407,6 @@ func (p *LegacyProcessor) run(
log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span)
}

// Construct the catchUpIter before notifying the registration that it
// has been registered. Note that if the catchUpScan is never run, then
// the iterator constructed here will be closed in disconnect.
if err := r.maybeConstructCatchUpIter(); err != nil {
r.disconnect(kvpb.NewError(err))
return
}

// Add the new registration to the registry.
p.reg.Register(&r)

Expand Down Expand Up @@ -565,7 +554,7 @@ func (p *LegacyProcessor) sendStop(pErr *kvpb.Error) {
func (p *LegacyProcessor) Register(
span roachpb.RSpan,
startTS hlc.Timestamp,
catchUpIterConstructor CatchUpIteratorConstructor,
catchUpIter *CatchUpIterator,
withDiff bool,
stream Stream,
disconnectFn func(),
Expand All @@ -578,7 +567,7 @@ func (p *LegacyProcessor) Register(

blockWhenFull := p.Config.EventChanTimeout == 0 // for testing
r := newRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff,
span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff,
p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, done,
)
select {
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,11 @@ func newTestProcessor(
o(&cfg)
}
if cfg.useScheduler {
sch := NewScheduler(SchedulerConfig{Workers: 1, PriorityWorkers: 1})
sch := NewScheduler(SchedulerConfig{
Workers: 1,
PriorityWorkers: 1,
Metrics: NewSchedulerMetrics(time.Second),
})
require.NoError(t, sch.Start(context.Background(), stopper))
cfg.Scheduler = sch
// Also create a dummy priority processor to populate priorityIDs for
Expand Down
Loading

0 comments on commit 235babd

Please sign in to comment.