Skip to content

Commit

Permalink
sql, import: plumb row.Metrics from ExecutorConfig to row.Helper
Browse files Browse the repository at this point in the history
Addresses: #67400

Add metrics for sql/row and pass them down from ExecutorConfig and
FlowCtx all the way to row.Helper. Like sql.Metrics, there are two
copies, one for user queries and one for internal queries. (I wanted to
make these part of sql.Metrics, but there are several users of sql/row
that do not operate under a sql.Server or connExecutor so we are forced
to add row.Metrics to the ExecutorConfig and FlowCtx instead.)

I ran into difficulty passing these through import, as FlowCtx itself
is not plumbed through.

There are only two metrics at first, corresponding to
violations of sql.guardrails.max_row_size_{log|err}.

Release justification: Low-risk update to new functionality.

Release note (ops): Added four new metrics,
sql.guardrails.max_row_size_{log|err}.count{.internal} which are
incremented whenever a large row violates the corresponding
sql.guardrails.max_row_size_{log|err} limit.
  • Loading branch information
michae2 committed Aug 28, 2021
1 parent bf3b873 commit 590514b
Show file tree
Hide file tree
Showing 23 changed files with 190 additions and 36 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/read_import_avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ func (th *testHelper) newRecordStream(

conv, err := row.NewDatumRowConverter(
context.Background(), th.schemaTable, nil, th.evalCtx.Copy(), nil,
nil /* seqChunkProvider */)
nil /* seqChunkProvider */, nil, /* metrics */
)
require.NoError(t, err)
return &testRecordStream{
producer: producer,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func makeDatumConverter(
) (*row.DatumRowConverter, error) {
conv, err := row.NewDatumRowConverter(
ctx, importCtx.tableDesc, importCtx.targetCols, importCtx.evalCtx, importCtx.kvCh,
importCtx.seqChunkProvider)
importCtx.seqChunkProvider, nil /* metrics */)
if err == nil {
conv.KvBatch.Source = fileCtx.source
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func newMysqldumpReader(
continue
}
conv, err := row.NewDatumRowConverter(ctx, tabledesc.NewBuilder(table.Desc).BuildImmutableTable(),
nil /* targetColNames */, evalCtx, kvCh, nil /* seqChunkProvider */)
nil /* targetColNames */, evalCtx, kvCh, nil /* seqChunkProvider */, nil /* metrics */)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ func newPgDumpReader(
colSubMap[col.GetName()] = i
}
conv, err := row.NewDatumRowConverter(ctx, tableDesc, targetCols, evalCtx, kvCh,
nil /* seqChunkProvider */)
nil /* seqChunkProvider */, nil /* metrics */)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func NewWorkloadKVConverter(
// This worker needs its own EvalContext and DatumAlloc.
func (w *WorkloadKVConverter) Worker(ctx context.Context, evalCtx *tree.EvalContext) error {
conv, err := row.NewDatumRowConverter(ctx, w.tableDesc, nil /* targetColNames */, evalCtx,
w.kvCh, nil /* seqChunkProvider */)
w.kvCh, nil /* seqChunkProvider */, nil /* metrics */)
if err != nil {
return err
}
Expand Down
16 changes: 12 additions & 4 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
}
cfg.registry.AddMetricStruct(jobRegistry.MetricsStruct())

distSQLMetrics := execinfra.MakeDistSQLMetrics(cfg.HistogramWindowInterval())
cfg.registry.AddMetricStruct(distSQLMetrics)

// Set up Lease Manager
var lmKnobs lease.ManagerTestingKnobs
if leaseManagerTestingKnobs := cfg.TestingKnobs.SQLLeaseManager; leaseManagerTestingKnobs != nil {
Expand Down Expand Up @@ -469,6 +466,13 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
}
}))

distSQLMetrics := execinfra.MakeDistSQLMetrics(cfg.HistogramWindowInterval())
cfg.registry.AddMetricStruct(distSQLMetrics)
rowMetrics := sql.NewRowMetrics(false /* internal */)
cfg.registry.AddMetricStruct(rowMetrics)
internalRowMetrics := sql.NewRowMetrics(true /* internal */)
cfg.registry.AddMetricStruct(internalRowMetrics)

virtualSchemas, err := sql.NewVirtualSchemaHolder(ctx, cfg.Settings)
if err != nil {
return nil, errors.Wrap(err, "creating virtual schema holder")
Expand Down Expand Up @@ -538,7 +542,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
return bulk.MakeBulkAdder(ctx, db, cfg.distSender.RangeDescriptorCache(), cfg.Settings, ts, opts, bulkMon)
},

Metrics: &distSQLMetrics,
Metrics: &distSQLMetrics,
RowMetrics: &rowMetrics,
InternalRowMetrics: &internalRowMetrics,

SQLLivenessReader: cfg.sqlLivenessProvider,
JobRegistry: jobRegistry,
Expand Down Expand Up @@ -668,6 +674,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
),

QueryCache: querycache.New(cfg.QueryCacheSize),
RowMetrics: &rowMetrics,
InternalRowMetrics: &internalRowMetrics,
ProtectedTimestampProvider: cfg.protectedtsProvider,
ExternalIODirConfig: cfg.ExternalIODirConfig,
GCJobNotifier: gcJobNotifier,
Expand Down
26 changes: 19 additions & 7 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,7 @@ func TruncateInterleavedIndexes(
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
rd := row.MakeDeleter(
codec, table, nil /* requestedCols */, &execCfg.Settings.SV, true, /* internal */
execCfg.GetRowMetrics(true /* internal */),
)
td := tableDeleter{rd: rd, alloc: alloc}
if err := td.init(ctx, txn, nil /* *tree.EvalContext */); err != nil {
Expand Down Expand Up @@ -883,7 +884,7 @@ func (sc *SchemaChanger) truncateIndexes(
}
rd := row.MakeDeleter(
sc.execCfg.Codec, tableDesc, nil /* requestedCols */, &sc.settings.SV,
true, /* internal */
true /* internal */, sc.execCfg.GetRowMetrics(true /* internal */),
)
td := tableDeleter{rd: rd, alloc: alloc}
if err := td.init(ctx, txn, nil /* *tree.EvalContext */); err != nil {
Expand Down Expand Up @@ -2044,7 +2045,10 @@ func runSchemaChangesInTxn(
return AlterColTypeInTxnNotSupportedErr
} else if col := m.AsColumn(); col != nil {
if !doneColumnBackfill && catalog.ColumnNeedsBackfill(col) {
if err := columnBackfillInTxn(ctx, planner.Txn(), planner.EvalContext(), planner.SemaCtx(), immutDesc, traceKV); err != nil {
if err := columnBackfillInTxn(
ctx, planner.Txn(), planner.ExecCfg(), planner.EvalContext(), planner.SemaCtx(),
immutDesc, traceKV,
); err != nil {
return err
}
doneColumnBackfill = true
Expand All @@ -2065,7 +2069,8 @@ func runSchemaChangesInTxn(
if col := m.AsColumn(); col != nil {
if !doneColumnBackfill && catalog.ColumnNeedsBackfill(col) {
if err := columnBackfillInTxn(
ctx, planner.Txn(), planner.EvalContext(), planner.SemaCtx(), immutDesc, traceKV,
ctx, planner.Txn(), planner.ExecCfg(), planner.EvalContext(), planner.SemaCtx(),
immutDesc, traceKV,
); err != nil {
return err
}
Expand Down Expand Up @@ -2399,6 +2404,7 @@ func validateUniqueWithoutIndexConstraintInTxn(
func columnBackfillInTxn(
ctx context.Context,
txn *kv.Txn,
execCfg *ExecutorConfig,
evalCtx *tree.EvalContext,
semaCtx *tree.SemaContext,
tableDesc catalog.TableDescriptor,
Expand All @@ -2414,8 +2420,11 @@ func columnBackfillInTxn(
columnBackfillerMon = execinfra.NewMonitor(ctx, evalCtx.Mon, "local-column-backfill-mon")
}

rowMetrics := execCfg.GetRowMetrics(evalCtx.SessionData().Internal)
var backfiller backfill.ColumnBackfiller
if err := backfiller.InitForLocalUse(ctx, evalCtx, semaCtx, tableDesc, columnBackfillerMon); err != nil {
if err := backfiller.InitForLocalUse(
ctx, evalCtx, semaCtx, tableDesc, columnBackfillerMon, rowMetrics,
); err != nil {
return err
}
defer backfiller.Close(ctx)
Expand Down Expand Up @@ -2453,7 +2462,9 @@ func indexBackfillInTxn(
}

var backfiller backfill.IndexBackfiller
if err := backfiller.InitForLocalUse(ctx, evalCtx, semaCtx, tableDesc, indexBackfillerMon); err != nil {
if err := backfiller.InitForLocalUse(
ctx, evalCtx, semaCtx, tableDesc, indexBackfillerMon,
); err != nil {
return err
}
defer backfiller.Close(ctx)
Expand Down Expand Up @@ -2485,9 +2496,10 @@ func indexTruncateInTxn(
alloc := &rowenc.DatumAlloc{}
var sp roachpb.Span
for done := false; !done; done = sp.Key == nil {
internal := evalCtx.SessionData().Internal
rd := row.MakeDeleter(
execCfg.Codec, tableDesc, nil /* requestedCols */, &execCfg.Settings.SV,
evalCtx.SessionData().Internal,
execCfg.Codec, tableDesc, nil /* requestedCols */, &execCfg.Settings.SV, internal,
execCfg.GetRowMetrics(internal),
)
td := tableDeleter{rd: rd, alloc: alloc}
if err := td.init(ctx, txn, evalCtx); err != nil {
Expand Down
11 changes: 9 additions & 2 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type ColumnBackfiller struct {

// mon is a memory monitor linked with the ColumnBackfiller on creation.
mon *mon.BytesMonitor

rowMetrics *row.Metrics
}

// initCols is a helper to populate some column metadata on a ColumnBackfiller.
Expand All @@ -91,6 +93,7 @@ func (cb *ColumnBackfiller) init(
computedExprs []tree.TypedExpr,
desc catalog.TableDescriptor,
mon *mon.BytesMonitor,
rowMetrics *row.Metrics,
) error {
cb.evalCtx = evalCtx
cb.updateCols = append(cb.added, cb.dropped...)
Expand Down Expand Up @@ -130,6 +133,7 @@ func (cb *ColumnBackfiller) init(
return errors.AssertionFailedf("no memory monitor linked to ColumnBackfiller during init")
}
cb.mon = mon
cb.rowMetrics = rowMetrics

return cb.fetcher.Init(
evalCtx.Context,
Expand All @@ -154,6 +158,7 @@ func (cb *ColumnBackfiller) InitForLocalUse(
semaCtx *tree.SemaContext,
desc catalog.TableDescriptor,
mon *mon.BytesMonitor,
rowMetrics *row.Metrics,
) error {
cb.initCols(desc)
defaultExprs, err := schemaexpr.MakeDefaultExprs(
Expand All @@ -174,7 +179,7 @@ func (cb *ColumnBackfiller) InitForLocalUse(
if err != nil {
return err
}
return cb.init(evalCtx, defaultExprs, computedExprs, desc, mon)
return cb.init(evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics)
}

// InitForDistributedUse initializes a ColumnBackfiller for use as part of a
Expand Down Expand Up @@ -230,7 +235,8 @@ func (cb *ColumnBackfiller) InitForDistributedUse(
// entire backfill process.
flowCtx.TypeResolverFactory.Descriptors.ReleaseAll(ctx)

return cb.init(evalCtx, defaultExprs, computedExprs, desc, mon)
rowMetrics := flowCtx.GetRowMetrics()
return cb.init(evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics)
}

// Close frees the resources used by the ColumnBackfiller.
Expand Down Expand Up @@ -269,6 +275,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
&cb.alloc,
&cb.evalCtx.Settings.SV,
cb.evalCtx.SessionData().Internal,
cb.rowMetrics,
)
if err != nil {
return roachpb.Key{}, err
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ func (n *createTableNode) startExec(params runParams) error {

// Instantiate a row inserter and table writer. It has a 1-1
// mapping to the definitions in the descriptor.
internal := params.p.SessionData().Internal
ri, err := row.MakeInserter(
params.ctx,
params.p.txn,
Expand All @@ -464,7 +465,8 @@ func (n *createTableNode) startExec(params runParams) error {
desc.PublicColumns(),
params.p.alloc,
&params.ExecCfg().Settings.SV,
params.p.SessionData().Internal,
internal,
params.ExecCfg().GetRowMetrics(internal),
)
if err != nil {
return err
Expand Down
21 changes: 21 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/querycache"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -1051,6 +1052,8 @@ type ExecutorConfig struct {

SchemaChangerMetrics *SchemaChangerMetrics
FeatureFlagMetrics *featureflag.DenialMetrics
RowMetrics *row.Metrics
InternalRowMetrics *row.Metrics

TestingKnobs ExecutorTestingKnobs
MigrationTestingKnobs *migration.TestingKnobs
Expand Down Expand Up @@ -3002,3 +3005,21 @@ func DescsTxn(
) error {
return execCfg.CollectionFactory.Txn(ctx, execCfg.InternalExecutor, execCfg.DB, f)
}

// NewRowMetrics creates a row.Metrics struct for either internal or user
// queries.
func NewRowMetrics(internal bool) row.Metrics {
return row.Metrics{
MaxRowSizeLogCount: metric.NewCounter(getMetricMeta(row.MetaMaxRowSizeLog, internal)),
MaxRowSizeErrCount: metric.NewCounter(getMetricMeta(row.MetaMaxRowSizeErr, internal)),
}
}

// GetRowMetrics returns the proper RowMetrics for either internal or user
// queries.
func (cfg *ExecutorConfig) GetRowMetrics(internal bool) *row.Metrics {
if internal {
return cfg.InternalRowMetrics
}
return cfg.RowMetrics
}
1 change: 1 addition & 0 deletions pkg/sql/execinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/execinfrapb",
"//pkg/sql/row",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
Expand Down
14 changes: 13 additions & 1 deletion pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
Expand Down Expand Up @@ -107,7 +108,9 @@ type ServerConfig struct {
// because of RocksDB space amplification.
ParentDiskMonitor *mon.BytesMonitor

Metrics *DistSQLMetrics
Metrics *DistSQLMetrics
RowMetrics *row.Metrics
InternalRowMetrics *row.Metrics

// SQLLivenessReader provides access to reading the liveness of sessions.
SQLLivenessReader sqlliveness.Reader
Expand Down Expand Up @@ -284,3 +287,12 @@ func GetWorkMemLimit(flowCtx *FlowCtx) int64 {
}
return flowCtx.EvalCtx.SessionData().WorkMemLimit
}

// GetRowMetrics returns the proper RowMetrics for either internal or user
// queries.
func (flowCtx *FlowCtx) GetRowMetrics() *row.Metrics {
if flowCtx.EvalCtx.SessionData().Internal {
return flowCtx.Cfg.InternalRowMetrics
}
return flowCtx.Cfg.RowMetrics
}
Loading

0 comments on commit 590514b

Please sign in to comment.