Skip to content

Commit

Permalink
Merge #35650 #35686
Browse files Browse the repository at this point in the history
35650: sql: add feature usage telemetry for CTEs and subqueries r=knz a=knz

First commit from #35678.

Informs https://github.com/cockroachlabs/registration/issues/203

This adds telemetry for CTEs and subqueries.

35686: storage: facilitate side effect mutation in applyRaftCommand r=andreimatei a=tbg

Future code changes (sending snapshots without historical Raft log) will
make it necessary to sometimes alter the ReplicatedEvalResult in
applyRaftCommand (turning a TruncatedState updated into a noop).

Make this possible by returning the modified result back up to the
caller.

Release note: None

Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: Tobias Schottdorf <[email protected]>
  • Loading branch information
3 people committed Mar 13, 2019
3 parents ce1b4bd + 72b9d27 + b40672f commit 066eac3
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 20 deletions.
19 changes: 19 additions & 0 deletions pkg/server/updates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,14 @@ func TestReportUsage(t *testing.T) {
if _, err := db.Exec(`SELECT '1.2.3.4'::STRING::INET, '{"a":"b","c":123}'::JSON - 'a', ARRAY (SELECT 1)[1]`); err != nil {
t.Fatal(err)
}
// Try a CTE to check CTE feature reporting.
if _, err := db.Exec(`WITH a AS (SELECT 1) SELECT * FROM a`); err != nil {
t.Fatal(err)
}
// Try a correlated subquery to check that feature reporting.
if _, err := db.Exec(`SELECT x FROM (VALUES (1)) AS b(x) WHERE EXISTS(SELECT * FROM (VALUES (1)) AS a(x) WHERE a.x = b.x)`); err != nil {
t.Fatal(err)
}
}

tables, err := ts.collectSchemaInfo(ctx)
Expand Down Expand Up @@ -561,6 +569,13 @@ func TestReportUsage(t *testing.T) {
"sql.plan.ops.array.cons": 1,
"sql.plan.ops.array.flatten": 1,

// The subquery counter is exercised by `(1, 20, 30, 40) = (SELECT ...)`.
"sql.plan.subquery": 1,
// The correlated sq counter is exercised by `WHERE EXISTS ( ... )` above.
"sql.plan.subquery.correlated": 1,
// The CTE counter is exercised by `WITH a AS (SELECT 1) ...`.
"sql.plan.cte": 10,

"unimplemented.#33285.json_object_agg": 10,
"unimplemented.pg_catalog.pg_stat_wal_receiver": 10,
"unimplemented.syntax.#28751": 10,
Expand Down Expand Up @@ -709,8 +724,10 @@ func TestReportUsage(t *testing.T) {
`[true,false,false] INSERT INTO _ VALUES (length($1::STRING)), (__more1__)`,
`[true,false,false] INSERT INTO _(_, _) VALUES (_, _)`,
`[true,false,false] SELECT (_, _, __more2__) = (SELECT _, _, _, _ FROM _ LIMIT _)`,
`[true,false,false] SELECT _ FROM (VALUES (_)) AS _ (_) WHERE EXISTS (SELECT * FROM (VALUES (_)) AS _ (_) WHERE _._ = _._)`,
"[true,false,false] SELECT _::STRING::INET, _::JSONB - _, ARRAY (SELECT _)[_]",
`[true,false,false] UPDATE _ SET _ = _ + _`,
"[true,false,false] WITH _ AS (SELECT _) SELECT * FROM _",
`[true,false,true] CREATE TABLE _ (_ INT8 PRIMARY KEY, _ INT8, INDEX (_) INTERLEAVE IN PARENT _ (_))`,
`[true,false,true] SELECT _ / $1`,
`[true,false,true] SELECT _ / _`,
Expand Down Expand Up @@ -760,6 +777,7 @@ func TestReportUsage(t *testing.T) {
`INSERT INTO _ SELECT unnest(ARRAY[_, _, __more2__])`,
`INSERT INTO _(_, _) VALUES (_, _)`,
`SELECT (_, _, __more2__) = (SELECT _, _, _, _ FROM _ LIMIT _)`,
`SELECT _ FROM (VALUES (_)) AS _ (_) WHERE EXISTS (SELECT * FROM (VALUES (_)) AS _ (_) WHERE _._ = _._)`,
`SELECT _::STRING::INET, _::JSONB - _, ARRAY (SELECT _)[_]`,
`SELECT * FROM _ WHERE (_ = length($1::STRING)) OR (_ = $2)`,
`SELECT * FROM _ WHERE (_ = _) AND (_ = _)`,
Expand All @@ -771,6 +789,7 @@ func TestReportUsage(t *testing.T) {
`SET CLUSTER SETTING "server.time_until_store_dead" = _`,
`SET CLUSTER SETTING "diagnostics.reporting.send_crash_reports" = _`,
`SET application_name = _`,
`WITH _ AS (SELECT _) SELECT * FROM _`,
},
elemName: {
`SELECT _ FROM _ WHERE (_ = _) AND (lower(_) = lower(_))`,
Expand Down
17 changes: 13 additions & 4 deletions pkg/sql/opt/optbuilder/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/coltypes"
"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
)

func checkArrayElementType(t types.T) error {
Expand Down Expand Up @@ -513,10 +515,17 @@ func (b *Builder) checkSubqueryOuterCols(
return
}

// Remember whether the query was correlated for the heuristic planner,
// to enhance error messages.
// TODO(knz): this can go away when the HP disappears.
b.IsCorrelated = true
if !b.IsCorrelated {
// Remember whether the query was correlated for the heuristic planner,
// to enhance error messages.
// TODO(knz): this can go away when the HP disappears.
b.IsCorrelated = true

// Register the use of correlation to telemetry.
// Note: we don't blindly increment the counter every time this
// method is called, to avoid double counting the same query.
telemetry.Inc(sqltelemetry.CorrelatedSubqueryUseCounter)
}

var inScopeCols opt.ColSet
if b.subquery != nil || inGroupingContext {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/opt/optbuilder/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package optbuilder
import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/opt/cat"
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -485,6 +487,8 @@ func (b *Builder) buildCTE(ctes []*tree.CTE, inScope *scope) (outScope *scope) {
}
}

telemetry.Inc(sqltelemetry.CteUseCounter)

return outScope
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/opt/optbuilder/subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ package optbuilder
import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
)

// subquery represents a subquery expression in an expression tree
Expand Down Expand Up @@ -205,6 +207,8 @@ func (b *Builder) buildSubqueryProjection(
out = b.constructProject(out, []scopeColumn{*col})
}

telemetry.Inc(sqltelemetry.SubqueryUseCounter)

return out, outScope
}

Expand Down
29 changes: 29 additions & 0 deletions pkg/sql/sqltelemetry/planning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package sqltelemetry

import "github.com/cockroachdb/cockroach/pkg/server/telemetry"

// CteUseCounter is to be incremented every time a CTE (WITH ...)
// is planned without error in a query.
var CteUseCounter = telemetry.GetCounterOnce("sql.plan.cte")

// SubqueryUseCounter is to be incremented every time a subquery is
// planned.
var SubqueryUseCounter = telemetry.GetCounterOnce("sql.plan.subquery")

// CorrelatedSubqueryUseCounter is to be incremented every time a
// correlated subquery has been processed during planning.
var CorrelatedSubqueryUseCounter = telemetry.GetCounterOnce("sql.plan.subquery.correlated")
4 changes: 4 additions & 0 deletions pkg/sql/subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ package sql
import (
"context"

"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

Expand Down Expand Up @@ -213,6 +215,8 @@ func (v *subqueryVisitor) extractSubquery(
log.Infof(v.ctx, "collected subquery: %q -> %d", sub, sub.Idx)
}

telemetry.Inc(sqltelemetry.SubqueryUseCounter)

// The typing for subqueries is complex, but regular.
//
// * If the subquery is used in a single-row context:
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/with.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package sql
import (
"context"

"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
)

// This file contains the implementation of common table expressions. See
Expand Down Expand Up @@ -122,6 +124,9 @@ func (p *planner) initWith(ctx context.Context, with *tree.With) (func(p *planne
}
return popCteNameEnvironment, nil
}

telemetry.Inc(sqltelemetry.CteUseCounter)

return nil, nil
}

Expand Down
32 changes: 16 additions & 16 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1932,12 +1932,10 @@ func (r *Replica) processRaftCommand(
tmpBatch.Close()
}

var delta enginepb.MVCCStats
{
var err error
delta, err = r.applyRaftCommand(
raftCmd.ReplicatedEvalResult, err = r.applyRaftCommand(
ctx, idKey, raftCmd.ReplicatedEvalResult, raftIndex, leaseIndex, writeBatch)
raftCmd.ReplicatedEvalResult.Delta = delta.ToStatsDelta()

// applyRaftCommand returned an error, which usually indicates
// either a serious logic bug in CockroachDB or a disk
Expand Down Expand Up @@ -2206,15 +2204,16 @@ func (r *Replica) acquireMergeLock(
// underlying state machine (i.e. the engine). When the state machine can not be
// updated, an error (which is likely fatal!) is returned and must be handled by
// the caller.
// The returned ReplicatedEvalResult replaces the caller's.
func (r *Replica) applyRaftCommand(
ctx context.Context,
idKey storagebase.CmdIDKey,
rResult storagepb.ReplicatedEvalResult,
raftAppliedIndex, leaseAppliedIndex uint64,
writeBatch *storagepb.WriteBatch,
) (enginepb.MVCCStats, error) {
) (storagepb.ReplicatedEvalResult, error) {
if raftAppliedIndex <= 0 {
return enginepb.MVCCStats{}, errors.New("raft command index is <= 0")
return storagepb.ReplicatedEvalResult{}, errors.New("raft command index is <= 0")
}
if writeBatch != nil && len(writeBatch.Data) > 0 {
// Record the write activity, passing a 0 nodeID because replica.writeStats
Expand Down Expand Up @@ -2253,7 +2252,7 @@ func (r *Replica) applyRaftCommand(
// If we have an out of order index, there's corruption. No sense in
// trying to update anything or running the command. Simply return
// a corruption error.
return enginepb.MVCCStats{}, errors.Errorf("applied index jumped from %d to %d",
return storagepb.ReplicatedEvalResult{}, errors.Errorf("applied index jumped from %d to %d",
oldRaftAppliedIndex, raftAppliedIndex)
}

Expand All @@ -2271,7 +2270,7 @@ func (r *Replica) applyRaftCommand(

if writeBatch != nil {
if err := batch.ApplyBatchRepr(writeBatch.Data, false); err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "unable to apply WriteBatch")
return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to apply WriteBatch")
}
}

Expand All @@ -2290,7 +2289,7 @@ func (r *Replica) applyRaftCommand(
// that this new key is replacing.
err := r.raftMu.stateLoader.MigrateToRangeAppliedStateKey(ctx, writer, &deltaStats)
if err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "unable to migrate to range applied state")
return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to migrate to range applied state")
}
usingAppliedStateKey = true
}
Expand All @@ -2305,7 +2304,7 @@ func (r *Replica) applyRaftCommand(
// lease index along with the mvcc stats, all in one key.
if err := r.raftMu.stateLoader.SetRangeAppliedState(ctx, writer,
raftAppliedIndex, leaseAppliedIndex, &ms); err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "unable to set range applied state")
return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to set range applied state")
}
} else {
// Advance the last applied index. We use a blind write in order to avoid
Expand All @@ -2314,7 +2313,7 @@ func (r *Replica) applyRaftCommand(
var appliedIndexNewMS enginepb.MVCCStats
if err := r.raftMu.stateLoader.SetLegacyAppliedIndexBlind(ctx, writer, &appliedIndexNewMS,
raftAppliedIndex, leaseAppliedIndex); err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "unable to set applied index")
return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to set applied index")
}
deltaStats.SysBytes += appliedIndexNewMS.SysBytes -
r.raftMu.stateLoader.CalcAppliedIndexSysBytes(oldRaftAppliedIndex, oldLeaseAppliedIndex)
Expand All @@ -2324,14 +2323,14 @@ func (r *Replica) applyRaftCommand(
// across all deltaStats).
ms.Add(deltaStats)
if err := r.raftMu.stateLoader.SetMVCCStats(ctx, writer, &ms); err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "unable to update MVCCStats")
return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to update MVCCStats")
}
}

if haveTruncatedState {
apply, err := handleTruncatedStateBelowRaft(ctx, oldTruncatedState, rResult.State.TruncatedState, r.raftMu.stateLoader, writer)
if err != nil {
return enginepb.MVCCStats{}, err
return storagepb.ReplicatedEvalResult{}, err
}
if !apply {
// TODO(tbg): As written, there is low confidence that nil'ing out
Expand Down Expand Up @@ -2365,20 +2364,20 @@ func (r *Replica) applyRaftCommand(
rsl := stateloader.Make(rResult.Split.RightDesc.RangeID)
oldHS, err := rsl.LoadHardState(ctx, r.store.Engine())
if err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "unable to load HardState")
return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to load HardState")
}
assertHS = &oldHS
}
if err := batch.Commit(false); err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "could not commit batch")
return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "could not commit batch")
}

if assertHS != nil {
// Load the HardState that was just committed (if any).
rsl := stateloader.Make(rResult.Split.RightDesc.RangeID)
newHS, err := rsl.LoadHardState(ctx, r.store.Engine())
if err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "unable to load HardState")
return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to load HardState")
}
// Assert that nothing moved "backwards".
if newHS.Term < assertHS.Term || (newHS.Term == assertHS.Term && newHS.Commit < assertHS.Commit) {
Expand All @@ -2389,7 +2388,8 @@ func (r *Replica) applyRaftCommand(

elapsed := timeutil.Since(start)
r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds())
return deltaStats, nil
rResult.Delta = deltaStats.ToStatsDelta()
return rResult, nil
}

func handleTruncatedStateBelowRaft(
Expand Down

0 comments on commit 066eac3

Please sign in to comment.