Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
71875: changefeedccl: Add metrics scope library for changefeeds. r=miretskiy a=miretskiy

Add a utility library that allows creation of fixed number (8) of
custome "metric" scopes.  Each scope is intended to keep track of
a small set of SLI related metrics.
Scopes are named "tier0", "tier1", ... "tier7".

Follow on changes will integrate changefeed code with this library.

CRDB-2422

Release Notes: None

71907: backupccl: pause backup schedule in TestFullClusterBackup r=rhu713 a=rhu713

Currently the backup schedule in TestFullClusterBackup can be processed by the
job scheduler in the original DB after the backup has been taken, thus causing
a difference between the original and restored clusters. Prevent this
processing by setting the first run in the future and pausing the schedule.

Fixes #71435

Release note: None

71909: jobs: don't block on notify r=ajwerner a=ajwerner

We have a method to notify the registry to go scan for jobs. There was no
reason for it to block. It did. This commit makes it not block without
meaningfully changing the semantics.

There are better improvements to be had by making the jobs subsystem much
more targeted, but this is some serious bang for its buck in terms of speedup
vs. lines changed.

Release note (performance improvement): Creating many schema changes in
parallel now runs faster due to improved concurrency notifying the jobs
subsystem.

71938: ui: use push instead of replace on browser history on SQL Activity page r=maryliag a=maryliag

Previously, when changing tabs on SQL Activity page the history
would be replaced, this commits change to push to history, meaning
when the user clicks Back on the browser it will return for the
previous tab.

Release note: None

71948: bazel: bump size of `pkg/bench/rttanalysis:rttanalysis_test` r=rail a=rickystewart

Release note: none

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Rui Hu <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Marylia Gutierrez <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
  • Loading branch information
6 people committed Oct 25, 2021
6 parents 041aaff + 1e17220 + 5de7501 + a41f506 + 8388ee8 + 08a9531 commit 7c50de2
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 45 deletions.
1 change: 1 addition & 0 deletions pkg/bench/rttanalysis/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(

go_test(
name = "rttanalysis_test",
size = "large",
srcs = [
"alter_table_bench_test.go",
"bench_test.go",
Expand Down
8 changes: 6 additions & 2 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -144,8 +146,10 @@ CREATE TABLE data2.foo (a int);
sqlDB.Exec(t, `GRANT CREATE, SELECT ON DATABASE data TO system_ops;`)
sqlDB.Exec(t, `GRANT system_ops TO maxroach1;`)

// Populate system.scheduled_jobs table.
sqlDB.Exec(t, `CREATE SCHEDULE FOR BACKUP data.bank INTO $1 RECURRING '@hourly' FULL BACKUP ALWAYS`, LocalFoo)
// Populate system.scheduled_jobs table with a first run in the future to prevent immediate adoption.
firstRun := timeutil.Now().Add(time.Hour).Format(timeutil.TimestampWithoutTZFormat)
sqlDB.Exec(t, `CREATE SCHEDULE FOR BACKUP data.bank INTO $1 RECURRING '@hourly' FULL BACKUP ALWAYS WITH SCHEDULE OPTIONS first_run = $2`, LocalFoo, firstRun)
sqlDB.Exec(t, `PAUSE SCHEDULES SELECT id FROM [SHOW SCHEDULES FOR BACKUP]`)

injectStats(t, sqlDB, "data.bank", "id")
sqlDB.Exec(t, `BACKUP TO $1`, LocalFoo)
Expand Down
9 changes: 3 additions & 6 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1621,9 +1621,8 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
if err := sql.DescsTxn(ctx, r.execCfg, publishDescriptors); err != nil {
return err
}
if err := p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx); err != nil {
return err
}

p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx)
if fn := r.testingKnobs.afterPublishingDescriptors; fn != nil {
if err := fn(); err != nil {
return err
Expand Down Expand Up @@ -1728,9 +1727,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
}
// Reload the details as we may have updated the job.
details = r.job.Details().(jobspb.RestoreDetails)
if err := p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx); err != nil {
return err
}
p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx)

if details.DescriptorCoverage == tree.AllDescriptors {
// We restore the system tables from the main data bundle so late because it
Expand Down
11 changes: 9 additions & 2 deletions pkg/ccl/changefeedccl/cdcutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "cdcutils",
srcs = ["throttle.go"],
srcs = [
"metrics_scope.go",
"throttle.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils",
visibility = ["//visibility:public"],
deps = [
Expand All @@ -18,13 +21,17 @@ go_library(

go_test(
name = "cdcutils_test",
srcs = ["throttle_test.go"],
srcs = [
"metrics_scope_test.go",
"throttle_test.go",
],
embed = [":cdcutils"],
deps = [
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/settings/cluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
"@com_github_stretchr_testify//require",
],
)
77 changes: 77 additions & 0 deletions pkg/ccl/changefeedccl/cdcutils/metrics_scope.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package cdcutils

import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/util/metric"
)

// maxSLIScopes is a static limit on the number of SLI scopes -- that is the number
// of SLI metrics we will keep track of.
// The limit is static due to metric.Registry limitations.
const maxSLIScopes = 8

// SLIMetrics is the list of SLI related metrics for changefeeds.
type SLIMetrics struct {
ErrorRetries *metric.Counter
// TODO(yevgeniy): Add more SLI related metrics.
}

// MetricStruct implements metric.Struct interface
func (*SLIMetrics) MetricStruct() {}

func makeSLIMetrics(prefix string) *SLIMetrics {
withPrefix := func(meta metric.Metadata) metric.Metadata {
meta.Name = fmt.Sprintf("%s.%s", prefix, meta.Name)
return meta
}

return &SLIMetrics{
ErrorRetries: metric.NewCounter(withPrefix(metric.Metadata{
Name: "error_retries",
Help: "Total retryable errors encountered this SLI",
Measurement: "Errors",
Unit: metric.Unit_COUNT,
})),
}
}

// SLIScopes represents a set of SLI related metrics for a particular "scope".
type SLIScopes struct {
Scopes [maxSLIScopes]*SLIMetrics // Exported so that we can register w/ metrics registry.
names map[string]*SLIMetrics
}

// MetricStruct implements metric.Struct interface
func (*SLIScopes) MetricStruct() {}

// CreateSLIScopes creates changefeed specific SLI scope: a metric.Struct containing
// SLI specific metrics for each scope.
// The scopes are statically named "tier<number>", and each metric name
// contained in SLIMetrics will be prefixed by "changefeed.tier<number" prefix.
func CreateSLIScopes() *SLIScopes {
scope := &SLIScopes{
names: make(map[string]*SLIMetrics, maxSLIScopes),
}

for i := 0; i < maxSLIScopes; i++ {
scopeName := fmt.Sprintf("tier%d", i)
scope.Scopes[i] = makeSLIMetrics(fmt.Sprintf("changefeed.%s", scopeName))
scope.names[scopeName] = scope.Scopes[i]
}
return scope
}

// GetSLIMetrics returns a metric.Struct associated with the specified scope, or nil
// of no such scope exists.
func (s *SLIScopes) GetSLIMetrics(scopeName string) *SLIMetrics {
return s.names[scopeName]
}
57 changes: 57 additions & 0 deletions pkg/ccl/changefeedccl/cdcutils/metrics_scope_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package cdcutils

import (
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/stretchr/testify/require"
)

func TestMetricScope(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

scope := CreateSLIScopes()
require.NotNil(t, scope)

sliMetricNames := []string{
"error_retries",
}

expectedMetrics := make(map[string]struct{})
expectScopedMetrics := func(scope string) {
for _, n := range sliMetricNames {
expectedMetrics[fmt.Sprintf("changefeed.%s.%s", scope, n)] = struct{}{}
}
}

for i := 0; i < maxSLIScopes; i++ {
scopeName := fmt.Sprintf("tier%d", i)
require.NotNil(t, scope.GetSLIMetrics(scopeName))
expectScopedMetrics(scopeName)
}

require.Nil(t, scope.GetSLIMetrics("does not exist"))
require.Nil(t, scope.GetSLIMetrics(fmt.Sprintf("tier%d", maxSLIScopes+1)))

registry := metric.NewRegistry()
registry.AddMetricStruct(scope)
registry.Each(func(name string, _ interface{}) {
_, exists := expectedMetrics[name]
require.True(t, exists)
delete(expectedMetrics, name)
})
require.Equal(t, 0, len(expectedMetrics),
"remaining metrics: %s", expectedMetrics)
}
17 changes: 7 additions & 10 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ func MakeRegistry(
execCtx: execCtxFn,
preventAdoptionFile: preventAdoptionFile,
td: td,
adoptionCh: make(chan adoptionNotice),
// Use a non-zero buffer to allow queueing of notifications.
// The writing method will use a default case to avoid blocking
// if a notification is already queued.
adoptionCh: make(chan adoptionNotice, 1),
}
if knobs != nil {
r.knobs = *knobs
Expand Down Expand Up @@ -251,15 +254,11 @@ func (r *Registry) MakeJobID() jobspb.JobID {
}

// NotifyToAdoptJobs notifies the job adoption loop to start claimed jobs.
func (r *Registry) NotifyToAdoptJobs(ctx context.Context) error {
func (r *Registry) NotifyToAdoptJobs(context.Context) {
select {
case r.adoptionCh <- resumeClaimedJobs:
case <-r.stopper.ShouldQuiesce():
return stop.ErrUnavailable
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
}

// WaitForJobs waits for a given list of jobs to reach some sort
Expand Down Expand Up @@ -344,9 +343,7 @@ func (r *Registry) Run(
return nil
}
log.Infof(ctx, "scheduled jobs %+v", jobs)
if err := r.NotifyToAdoptJobs(ctx); err != nil {
return err
}
r.NotifyToAdoptJobs(ctx)
err := r.WaitForJobs(ctx, ex, jobs)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/spanconfig/spanconfigmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,6 @@ func (m *Manager) createAndStartJobIfNoneExists(ctx context.Context) (bool, erro
if fn := m.knobs.ManagerCreatedJobInterceptor; fn != nil {
fn(job)
}
err := m.jr.NotifyToAdoptJobs(ctx)
return true, err
m.jr.NotifyToAdoptJobs(ctx)
return true, nil
}
10 changes: 5 additions & 5 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ func startGCJob(
return err
}
log.Infof(ctx, "starting GC job %d", jobID)
return jobRegistry.NotifyToAdoptJobs(ctx)
jobRegistry.NotifyToAdoptJobs(ctx)
return nil
}

func (sc *SchemaChanger) execLogTags() *logtags.Buffer {
Expand Down Expand Up @@ -895,7 +896,8 @@ func (sc *SchemaChanger) rollbackSchemaChange(ctx context.Context, err error) er
return err
}
log.Infof(ctx, "starting GC job %d", gcJobID)
return sc.jobRegistry.NotifyToAdoptJobs(ctx)
sc.jobRegistry.NotifyToAdoptJobs(ctx)
return nil
}

// RunStateMachineBeforeBackfill moves the state machine forward
Expand Down Expand Up @@ -1350,9 +1352,7 @@ func (sc *SchemaChanger) done(ctx context.Context) error {
return err
}
// Notify the job registry to start jobs, in case we started any.
if err := sc.jobRegistry.NotifyToAdoptJobs(ctx); err != nil {
return err
}
sc.jobRegistry.NotifyToAdoptJobs(ctx)

// If any operations was skipped because a mutation was made
// redundant due to a column getting dropped later on then we should
Expand Down
10 changes: 2 additions & 8 deletions pkg/sql/schemachanger/scjob/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,7 @@ func (n *newSchemaChangeResumer) Resume(ctx context.Context, execCtxI interface{
}); err != nil {
return err
}
err := execCtx.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx)
if err != nil {
return err
}
execCtx.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx)
}

// If no stages exist, then execute a singe transaction
Expand All @@ -159,10 +156,7 @@ func (n *newSchemaChangeResumer) Resume(ctx context.Context, execCtxI interface{
if err != nil {
return err
}
err = execCtx.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx)
if err != nil {
return err
}
execCtx.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx)
}
return nil
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/ui/workspaces/cluster-ui/src/util/query/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export function getMatchParamByName(
export function syncHistory(
params: Record<string, string | undefined>,
history: History,
push?: boolean,
): void {
const nextSearchParams = new URLSearchParams(history.location.search);

Expand All @@ -80,10 +81,9 @@ export function syncHistory(
});

history.location.search = nextSearchParams.toString();
history.replace(history.location);
}

export function clearHistory(history: History): void {
history.location.search = "";
history.replace(history.location);
if (push) {
history.push(history.location);
} else {
history.replace(history.location);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// All changes made on this file, should also be done on the equivalent
// file on managed-service repo.

import React from "react";
import React, { useState, useEffect } from "react";
import { Tabs } from "antd";
import { commonStyles, util } from "@cockroachlabs/cluster-ui";
import SessionsPageConnected from "src/views/sessions/sessionsPage";
Expand All @@ -23,14 +23,21 @@ const { TabPane } = Tabs;

const SQLActivityPage = (props: RouteComponentProps) => {
const defaultTab = util.queryByName(props.location, "tab") || "sessions";
const [currentTab, setCurrentTab] = React.useState(defaultTab);
const [currentTab, setCurrentTab] = useState(defaultTab);

const onTabChange = (tabId: string): void => {
setCurrentTab(tabId);
util.clearHistory(props.history);
util.syncHistory({ tab: tabId }, props.history);
props.history.location.search = "";
util.syncHistory({ tab: tabId }, props.history, true);
};

useEffect(() => {
const queryTab = util.queryByName(props.location, "tab") || "sessions";
if (queryTab !== currentTab) {
setCurrentTab(queryTab);
}
}, [props.location, currentTab]);

return (
<div>
<h3 className={commonStyles("base-heading")}>SQL Activity</h3>
Expand Down

0 comments on commit 7c50de2

Please sign in to comment.