Skip to content

Commit

Permalink
Merge #85907 #85921
Browse files Browse the repository at this point in the history
85907: sql: fix log spam during idx recommendation if txn was closed r=maryliag a=maryliag

Previously, if a transaction (from planner.txn) was already closed,
due to one-phase commit optimization or an error, we were still
trying to generate a recommendation, causing log spam.
This commit now checks if we have a transaction open that can be
used, otherwise it won't try to generate recommendation.

This commit also updates the cache, even if the recommendation
generations fails, this way we won't keep trying constantly,
and instead will wait for the next hour to try again.

Fixes #85875

Release note: None

85921: sql: use the correct txn when creating external connection r=benbardin a=adityamaru

We need to use the planners' txn when creating an external
connection. Using a DB().Txn here was an oversight.

Release note: None

Co-authored-by: Marylia Gutierrez <[email protected]>
Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
3 people committed Aug 11, 2022
3 parents 0ad97e4 + 477d1e8 + 5391bb2 commit 43bfc31
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 56 deletions.
9 changes: 7 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,8 +1197,13 @@ func (ex *connExecutor) dispatchToExecutionEngine(

populateQueryLevelStats(ctx, planner)

// Set index recommendations so it can be saved on statement statistics.
planner.instrumentation.SetIndexRecommendations(ctx, ex.server.idxRecommendationsCache, planner)
// The transaction (from planner.txn) may already have been committed at this point,
// due to one-phase commit optimization or an error. Since we use that transaction
// on the optimizer, check if is still open before generating index recommendations.
if planner.txn.IsOpen() {
// Set index recommendations, so it can be saved on statement statistics.
planner.instrumentation.SetIndexRecommendations(ctx, ex.server.idxRecommendationsCache, planner)
}

// Record the statement summary. This also closes the plan if the
// plan has not been closed earlier.
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/create_external_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
Expand Down Expand Up @@ -111,9 +110,7 @@ func (p *planner) createExternalConnection(

// Create the External Connection and persist it in the
// `system.external_connections` table.
return params.ExecCfg().DB.Txn(params.ctx, func(ctx context.Context, txn *kv.Txn) error {
return ex.Create(params.ctx, params.ExecCfg().InternalExecutor, params.p.User(), txn)
})
return ex.Create(params.ctx, params.ExecCfg().InternalExecutor, params.p.User(), p.Txn())
}

func (c *createExternalConectionNode) Next(_ runParams) (bool, error) { return false, nil }
Expand Down
83 changes: 51 additions & 32 deletions pkg/sql/idxrecommendations/idx_recommendations_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,44 +38,63 @@ func TestIndexRecommendationsStats(t *testing.T) {
testConn := sqlutils.MakeSQLRunner(sqlConn)
testConn.Exec(t, "CREATE DATABASE idxrectest")
testConn.Exec(t, "USE idxrectest")
testConn.Exec(t, "CREATE TABLE t ( k INT PRIMARY KEY, v INT, FAMILY \"primary\" (k, v))")
testConn.Exec(t, "CREATE TABLE t1 (k INT, i INT, f FLOAT, s STRING)")
testConn.Exec(t, "CREATE TABLE t2 (k INT, i INT, s STRING)")
testConn.Exec(t, "CREATE UNIQUE INDEX existing_t1_i ON t1(i)")
minExecutions := 5
var recommendations string

testCases := []struct {
stmt string
fingerprint string
recommendations string
}{
{
stmt: "SELECT * FROM t WHERE v > 123",
fingerprint: "SELECT * FROM t WHERE v > _",
recommendations: "{\"creation : CREATE INDEX ON t (v);\"}",
},
{
stmt: "SELECT t1.k FROM t1 JOIN t2 ON t1.k = t2.k WHERE t1.i > 3 AND t2.i > 3",
fingerprint: "SELECT t1.k FROM t1 JOIN t2 ON t1.k = t2.k WHERE (t1.i > _) AND (t2.i > _)",
recommendations: "{\"replacement : CREATE UNIQUE INDEX ON t1 (i) STORING (k); DROP INDEX t1@existing_t1_i;\"," +
"\"creation : CREATE INDEX ON t2 (i) STORING (k);\"}",
},
}
t.Run("index recommendations generated", func(t *testing.T) {
testConn.Exec(t, "CREATE TABLE t ( k INT PRIMARY KEY, v INT, FAMILY \"primary\" (k, v))")
testConn.Exec(t, "CREATE TABLE t1 (k INT, i INT, f FLOAT, s STRING)")
testConn.Exec(t, "CREATE TABLE t2 (k INT, i INT, s STRING)")
testConn.Exec(t, "CREATE UNIQUE INDEX existing_t1_i ON t1(i)")

var recommendations string
for i := 0; i < 8; i++ {
for _, tc := range testCases {
testConn.Exec(t, tc.stmt)
testCases := []struct {
stmt string
fingerprint string
recommendations string
}{
{
stmt: "SELECT * FROM t WHERE v > 123",
fingerprint: "SELECT * FROM t WHERE v > _",
recommendations: "{\"creation : CREATE INDEX ON t (v);\"}",
},
{
stmt: "SELECT t1.k FROM t1 JOIN t2 ON t1.k = t2.k WHERE t1.i > 3 AND t2.i > 3",
fingerprint: "SELECT t1.k FROM t1 JOIN t2 ON t1.k = t2.k WHERE (t1.i > _) AND (t2.i > _)",
recommendations: "{\"replacement : CREATE UNIQUE INDEX ON t1 (i) STORING (k); DROP INDEX t1@existing_t1_i;\"," +
"\"creation : CREATE INDEX ON t2 (i) STORING (k);\"}",
},
}

for i := 0; i < (minExecutions + 2); i++ {
for _, tc := range testCases {
testConn.Exec(t, tc.stmt)
rows := testConn.QueryRow(t, "SELECT index_recommendations FROM CRDB_INTERNAL.STATEMENT_STATISTICS "+
" WHERE metadata ->> 'db' = 'idxrectest' AND metadata ->> 'query'=$1", tc.fingerprint)
rows.Scan(&recommendations)

expected := tc.recommendations
if i < minExecutions {
expected = "{}"
}
require.Equal(t, expected, recommendations)
}
}
})

t.Run("index recommendations not generated for one-phase commit "+
"optimization statement", func(t *testing.T) {
testConn.Exec(t, "CREATE TABLE t3 (k INT PRIMARY KEY)")

for i := 0; i < (minExecutions + 2); i++ {
testConn.Exec(t, `INSERT INTO t3 VALUES($1)`, i)
rows := testConn.QueryRow(t, "SELECT index_recommendations FROM CRDB_INTERNAL.STATEMENT_STATISTICS "+
" WHERE metadata ->> 'db' = 'idxrectest' AND metadata ->> 'query'=$1", tc.fingerprint)
" WHERE metadata ->> 'db' = 'idxrectest' AND metadata ->> 'query' = 'INSERT INTO t3 VALUES ($1)'")
rows.Scan(&recommendations)

expected := tc.recommendations
if i < 5 {
expected = "{}"
}
require.Equal(t, expected, recommendations)
require.Equal(t, "{}", recommendations)
}
}

})
}

func TestFormatIdxRecommendations(t *testing.T) {
Expand Down
30 changes: 12 additions & 18 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,8 @@ func (ih *instrumentationHelper) SetIndexRecommendations(
opc.reset(ctx)
stmtType := opc.p.stmt.AST.StatementType()

reset := false
var recommendations []string
if idxRec.ShouldGenerateIndexRecommendation(
ih.fingerprint,
ih.planGist.Hash(),
Expand Down Expand Up @@ -713,25 +715,17 @@ func (ih *instrumentationHelper) SetIndexRecommendations(
err = opc.makeQueryIndexRecommendation(ctx)
if err != nil {
log.Warningf(ctx, "unable to generate index recommendations: %s", err)
} else {
idxRec.UpdateIndexRecommendations(
ih.fingerprint,
ih.planGist.Hash(),
planner.SessionData().Database,
stmtType,
ih.indexRecommendations,
true,
)
}
}
} else {
ih.indexRecommendations = idxRec.UpdateIndexRecommendations(
ih.fingerprint,
ih.planGist.Hash(),
planner.SessionData().Database,
stmtType,
[]string{},
false,
)
reset = true
recommendations = ih.indexRecommendations
}
ih.indexRecommendations = idxRec.UpdateIndexRecommendations(
ih.fingerprint,
ih.planGist.Hash(),
planner.SessionData().Database,
stmtType,
recommendations,
reset,
)
}

0 comments on commit 43bfc31

Please sign in to comment.