Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…db#92695 cockroachdb#92760 cockroachdb#92763

92632: opt: fix rare incorrect results due to sort between paired joins r=DrewKimball a=DrewKimball

Previously, it was possible for paired joins to produce incorrect results in the case when an ordering was required of their output, and a sort was added between the paired joins to enforce the ordering.

This patch prevents a sort from being added to the output of the first join in a set of paired joins. This is necessary because the continuation column that is used to indicate false positives matched by the first join relies on the ordering being maintained between the joins.

Fixes cockroachdb#89603

Release note: None

92669: roachtest/cdc: export stats for initial scan test to roachperf r=jayshrivastava a=jayshrivastava

This change updates the cdc/initial_scan_only test to produce a `stats.json` artifact to be consumed by roachprod. This file contains stats for p99 foreground latency, changefeed throughput, and CPU usage.

Release note: None
Epic: None

<img width="940" alt="image" src="https://user-images.githubusercontent.com/18633281/204564990-740e86e2-5c43-4d45-a715-4932428a5851.png">


92693: dev: add rewritable paths for ccl execbuilder tests r=rharding6373 a=rharding6373

There are some ccl tests that use test files in
`/pkg/sql/opt/exec/execbuilder`. This commit adds this as a rewritable path so that we can use the `--rewrite` flag with `dev`.

Release note: None
Epic: None

92695: sqlstats: record idle latency for transactions r=matthewtodd a=matthewtodd

Part of cockroachdb#86667
Follows cockroachdb#91098

Release note (sql change): A new NumericStat, idleLat, was introduced to the statistics column of crdb_internal.transaction_statistics, reporting the time spent waiting for the client to send statements while holding a transaction open.

92760: streamclient: replace usage of deprecated ioutil.ReadFile function r=stevendanna a=andyyang890

This patch fixes a lint error resulting from a usage of the
deprecated ioutil.ReadFile function.

Fixes cockroachdb#92761 

Release note: None

92763: jobsprotectedtsccl: unskip TestJobsProtectedTimestamp r=ajwerner a=ajwerner

It was fixed by cockroachdb#92692.

Fixes cockroachdb#91865.

Release note: None

Co-authored-by: Drew Kimball <[email protected]>
Co-authored-by: Jayant Shrivastava <[email protected]>
Co-authored-by: rharding6373 <[email protected]>
Co-authored-by: Matthew Todd <[email protected]>
Co-authored-by: Andy Yang <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
7 people committed Nov 30, 2022
7 parents d764ec1 + 7cfcce1 + ee0fa07 + 64624fa + 756c671 + c0a9d9c + f1210ea commit 21ec411
Show file tree
Hide file tree
Showing 25 changed files with 339 additions and 52 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/jobsccl/jobsprotectedtsccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ go_test(
"//pkg/sql/catalog/descpb",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
Expand Down
3 changes: 0 additions & 3 deletions pkg/ccl/jobsccl/jobsprotectedtsccl/jobs_protected_ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -133,8 +132,6 @@ WHERE
func TestJobsProtectedTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()

skip.WithIssue(t, 91865) // flaky test

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ package streamclient
import (
"context"
"fmt"
"io/ioutil"
"net/url"
"os"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -107,7 +107,7 @@ INSERT INTO d.t2 VALUES (2);
v := ret.Query()
for _, opt := range []string{"sslcert", "sslkey", "sslrootcert"} {
path := v.Get(opt)
content, err := ioutil.ReadFile(path)
content, err := os.ReadFile(path)
require.NoError(t, err)
v.Set(opt, string(content))

Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/dev/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (d *dev) test(cmd *cobra.Command, commandLine []string) error {
// recursive directories ending in /...
extraRewritablePaths = []struct{ pkg, path string }{
{"pkg/ccl/logictestccl", "pkg/sql/logictest"},
{"pkg/ccl/logictestccl", "pkg/sql/opt/exec/execbuilder"},
{"pkg/sql/opt/memo", "pkg/sql/opt/testutils/opttester/testfixtures"},
{"pkg/sql/opt/norm", "pkg/sql/opt/testutils/opttester/testfixtures"},
{"pkg/sql/opt/xform", "pkg/sql/opt/testutils/opttester/testfixtures"},
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/dev/testdata/datadriven/test
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ exec
dev test pkg/ccl/logictestccl -f=TestTenantLogic/3node-tenant/system -v --rewrite
----
bazel info workspace --color=no
bazel test pkg/ccl/logictestccl:all --nocache_test_results --test_env=GOTRACEBACK=all --test_env=COCKROACH_WORKSPACE=crdb-checkout --test_arg -rewrite --sandbox_writable_path=crdb-checkout/pkg/ccl/logictestccl --sandbox_writable_path=crdb-checkout/pkg/sql/logictest --test_filter=TestTenantLogic/3node-tenant/system --test_arg -test.v --test_sharding_strategy=disabled --test_output all
bazel test pkg/ccl/logictestccl:all --nocache_test_results --test_env=GOTRACEBACK=all --test_env=COCKROACH_WORKSPACE=crdb-checkout --test_arg -rewrite --sandbox_writable_path=crdb-checkout/pkg/ccl/logictestccl --sandbox_writable_path=crdb-checkout/pkg/sql/logictest --sandbox_writable_path=crdb-checkout/pkg/sql/opt/exec/execbuilder --test_filter=TestTenantLogic/3node-tenant/system --test_arg -test.v --test_sharding_strategy=disabled --test_output all

exec
dev test pkg/spanconfig/spanconfigkvsubscriber -f=TestDecodeSpanTargets -v --stream-output
Expand Down
4 changes: 4 additions & 0 deletions pkg/cmd/roachtest/clusterstats/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type AggQuery struct {
Query string
AggFn AggregateFn
Interval Interval
Tag string
}

// StatExporter defines an interface to export statistics to roachperf.
Expand Down Expand Up @@ -278,6 +279,9 @@ func (cs *clusterStatCollector) getStatSummary(
}

ret.AggTag = summaryQuery.Query
if summaryQuery.Tag != "" {
ret.AggTag = summaryQuery.Tag
}
// If there is more than one label name associated with the summary, we
// cannot be sure which is the correct label.
if len(taggedSummarySeries) != 1 {
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"canary.go",
"cancel.go",
"cdc.go",
"cdc_stats.go",
"chaos.go",
"clearrange.go",
"cli.go",
Expand Down
46 changes: 44 additions & 2 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
Expand Down Expand Up @@ -77,6 +78,7 @@ type cdcTester struct {
crdbNodes option.NodeListOption
workloadNode option.NodeListOption
logger *logger.Logger
promCfg *prometheus.Config

// sinkType -> sinkURI
sinkCache map[sinkType]string
Expand All @@ -85,6 +87,43 @@ type cdcTester struct {
doneCh chan struct{}
}

// startStatsCollection sets the start point of the stats collection window
// and returns a function which should be called at the end of the test to dump a
// stats.json file to the artifacts directory.
func (ct *cdcTester) startStatsCollection() func() {
if ct.promCfg == nil {
ct.t.Error("prometheus configuration is nil")
}
promClient, err := clusterstats.SetupCollectorPromClient(ct.ctx, ct.cluster, ct.t.L(), ct.promCfg)
if err != nil {
ct.t.Errorf("error creating prometheus client for stats collector: %s", err)
}

statsCollector := clusterstats.NewStatsCollector(ct.ctx, promClient)
startTime := timeutil.Now()
return func() {
endTime := timeutil.Now()
err := statsCollector.Exporter().Export(ct.ctx, ct.cluster, ct.t,
startTime,
endTime,
[]clusterstats.AggQuery{sqlServiceLatencyAgg, changefeedThroughputAgg, cpuUsageAgg},
func(stats map[string]clusterstats.StatSummary) (string, float64) {
// TODO(jayant): update this metric to be more accurate.
// It may be worth plugging in real latency values from the latency
// verifier here in the future for more accuracy. However, it may not be
// worth the added complexity. Since latency verifier failures will show
// up as roachtest failures, we don't need to make them very apparent in
// roachperf. Note that other roachperf stats, such as the aggregate stats
// above, will be accurate.
return "Total Run Time (mins)", endTime.Sub(startTime).Minutes()
},
)
if err != nil {
ct.t.Errorf("error exporting stats file: %s", err)
}
}
}

func (ct *cdcTester) startCRDBChaos() {
chaosStopper := make(chan time.Time)
ct.mon.Go(func(ctx context.Context) error {
Expand Down Expand Up @@ -468,18 +507,19 @@ func newCDCTester(ctx context.Context, t test.Test, c cluster.Cluster) cdcTester
if !t.SkipInit() {
tester.startGrafana()
}

return tester
}

func (ct *cdcTester) startGrafana() {
// Setup the prometheus instance on the workload node
cfg := (&prometheus.Config{}).
WithPrometheusNode(ct.workloadNode.InstallNodes()[0]).
WithCluster(ct.crdbNodes.InstallNodes()).
WithNodeExporter(ct.crdbNodes.InstallNodes()).
WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard")
cfg.Grafana.Enabled = true

ct.promCfg = cfg

err := ct.cluster.StartGrafana(ct.ctx, ct.t.L(), cfg)
if err != nil {
ct.t.Errorf("error starting prometheus/grafana: %s", err)
Expand Down Expand Up @@ -846,6 +886,7 @@ func registerCDC(r registry.Registry) {

ct.runTPCCWorkload(tpccArgs{warehouses: 100})

exportStatsFile := ct.startStatsCollection()
feed := ct.newChangefeed(feedArgs{
sinkType: kafkaSink,
targets: allTpccTargets,
Expand All @@ -855,6 +896,7 @@ func registerCDC(r registry.Registry) {
initialScanLatency: 30 * time.Minute,
})
feed.waitForCompletion()
exportStatsFile()
},
})
r.Add(registry.TestSpec{
Expand Down
42 changes: 42 additions & 0 deletions pkg/cmd/roachtest/tests/cdc_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats"

var (
// sqlServiceLatency is the sql_service_latency_bucket prometheus metric.
sqlServiceLatency = clusterstats.ClusterStat{LabelName: "node", Query: "sql_service_latency_bucket"}
// sqlServiceLatencyAgg is the P99 latency of foreground SQL traffic across all nodes measured in ms.
sqlServiceLatencyAgg = clusterstats.AggQuery{
Stat: sqlServiceLatency,
Query: "histogram_quantile(0.99, sum by(le) (rate(sql_service_latency_bucket[2m]))) / (1000*1000)",
Tag: "P99 Foreground Latency (ms)",
}

// changefeedThroughput is the changefeed_emitted_bytes prometheus metric.
changefeedThroughput = clusterstats.ClusterStat{LabelName: "node", Query: "changefeed_emitted_bytes"}
// changefeedThroughputAgg is the total rate of bytes being emitted by a cluster measured in MB/s.
changefeedThroughputAgg = clusterstats.AggQuery{
Stat: changefeedThroughput,
Query: "sum(rate(changefeed_emitted_bytes[1m]) / (1000 * 1000))",
Tag: "Throughput (MBps)",
}

// cpuUsage is the sys_cpu_combined_percent_normalized prometheus metric per mode.
cpuUsage = clusterstats.ClusterStat{LabelName: "node", Query: "sys_cpu_combined_percent_normalized"}
// cpuUsageAgg is the average CPU usage across all nodes.
cpuUsageAgg = clusterstats.AggQuery{
Stat: cpuUsage,
Query: "avg(sys_cpu_combined_percent_normalized) * 100",
Tag: "CPU Utilization (%)",
}
)
1 change: 1 addition & 0 deletions pkg/roachpb/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (t *TransactionStatistics) Add(other *TransactionStatistics) {
t.MaxRetries = other.MaxRetries
}

t.IdleLat.Add(other.IdleLat, t.Count, other.Count)
t.CommitLat.Add(other.CommitLat, t.Count, other.Count)
t.RetryLat.Add(other.RetryLat, t.Count, other.Count)
t.ServiceLat.Add(other.ServiceLat, t.Count, other.Count)
Expand Down
4 changes: 4 additions & 0 deletions pkg/roachpb/app_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ message TransactionStatistics {
// all statement operations have been applied.
optional NumericStat commit_lat = 6 [(gogoproto.nullable) = false];

// IdleLat is the cumulative amount of time spent in seconds waiting for
// the client to send statements while holding the transaction open.
optional NumericStat idle_lat = 11 [(gogoproto.nullable) = false];

// BytesRead collects the number of bytes read from disk.
optional NumericStat bytes_read = 7 [(gogoproto.nullable) = false];

Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,11 @@ type connExecutor struct {
shouldCollectTxnExecutionStats bool
// accumulatedStats are the accumulated stats of all statements.
accumulatedStats execstats.QueryLevelStats

// idleLatency is the cumulative amount of time spent waiting for the
// client to send statements while holding the transaction open.
idleLatency time.Duration

// rowsRead and bytesRead are separate from QueryLevelStats because they are
// accumulated independently since they are always collected, as opposed to
// QueryLevelStats which are sampled.
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,9 @@ func (ex *connExecutor) resetTransactionOnSchemaChangeRetry(ctx context.Context)
func (ex *connExecutor) commitSQLTransaction(
ctx context.Context, ast tree.Statement, commitFn func(context.Context) error,
) (fsm.Event, fsm.EventPayload) {
ex.extraTxnState.idleLatency += ex.statsCollector.PhaseTimes().
GetIdleLatency(ex.statsCollector.PreviousPhaseTimes())

ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionStartTransactionCommit, timeutil.Now())
if err := commitFn(ctx); err != nil {
if descs.IsTwoVersionInvariantViolationError(err) {
Expand Down Expand Up @@ -2285,6 +2288,7 @@ func (ex *connExecutor) onTxnRestart(ctx context.Context) {
// accumulatedStats are cleared, but shouldCollectTxnExecutionStats is
// unchanged.
ex.extraTxnState.accumulatedStats = execstats.QueryLevelStats{}
ex.extraTxnState.idleLatency = 0
ex.extraTxnState.rowsRead = 0
ex.extraTxnState.bytesRead = 0
ex.extraTxnState.rowsWritten = 0
Expand Down Expand Up @@ -2317,6 +2321,7 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) {
ex.extraTxnState.numRows = 0
ex.extraTxnState.shouldCollectTxnExecutionStats = false
ex.extraTxnState.accumulatedStats = execstats.QueryLevelStats{}
ex.extraTxnState.idleLatency = 0
ex.extraTxnState.rowsRead = 0
ex.extraTxnState.bytesRead = 0
ex.extraTxnState.rowsWritten = 0
Expand Down Expand Up @@ -2396,6 +2401,7 @@ func (ex *connExecutor) recordTransactionFinish(
ServiceLatency: txnServiceLat,
RetryLatency: txnRetryLat,
CommitLatency: commitLat,
IdleLatency: ex.extraTxnState.idleLatency,
RowsAffected: ex.extraTxnState.numRows,
CollectedExecStats: ex.planner.instrumentation.collectExecStats,
ExecStats: ex.extraTxnState.accumulatedStats,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/executor_statement_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ func (ex *connExecutor) recordStatementSummary(
ex.extraTxnState.transactionStatementsHash.Add(uint64(stmtFingerprintID))
}
ex.extraTxnState.numRows += rowsAffected
ex.extraTxnState.idleLatency += idleLatRaw

if log.V(2) {
// ages since significant epochs
Expand Down
24 changes: 24 additions & 0 deletions pkg/sql/opt/ordering/ordering.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,30 @@ func CanProvide(expr memo.RelExpr, required *props.OrderingChoice) bool {
return funcMap[expr.Op()].canProvideOrdering(expr, required)
}

// CanEnforce returns true if the output of the given operator can be sorted
// in order to satisfy the given required ordering.
func CanEnforce(expr memo.RelExpr, required *props.OrderingChoice) bool {
if required.Any() {
return false
}
if buildutil.CrdbTestBuild {
checkRequired(expr, required)
}
switch t := expr.(type) {
case *memo.ExplainExpr:
return false
case *memo.LookupJoinExpr:
// For paired joins we use a boolean continuation column to handle false
// positive matches in the first join. This relies on the ordering being
// unchanged between the first and second joins, so adding a sort on top
// of this expression could lead to incorrect results.
return !t.IsFirstJoinInPairedJoiner
case *memo.InvertedJoinExpr:
return !t.IsFirstJoinInPairedJoiner
}
return true
}

// BuildChildRequired returns the ordering that must be required of its
// given child in order to satisfy a required ordering. Can only be called if
// CanProvide is true for the required ordering.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/xform/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func (o *Optimizer) enforceProps(
return o.optimizeEnforcer(state, getEnforcer, required, member)
}

if !required.Ordering.Any() && member.Op() != opt.ExplainOp {
if ordering.CanEnforce(member, &required.Ordering) {
// Try Sort enforcer that requires no ordering from its input.
getEnforcer := func() memo.RelExpr {
enforcer := o.getScratchSort()
Expand Down
Loading

0 comments on commit 21ec411

Please sign in to comment.