Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: SHOW QUERIES lazily interpolates placeholders #86968

Merged
merged 3 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -2140,6 +2140,7 @@ ActiveQuery represents a query in flight on some Session.
| is_full_scan | [bool](#cockroach.server.serverpb.ListSessionsResponse-bool) | | True if the query contains a full table or index scan. Note that this field is only valid if the query is in the EXECUTING phase. | [reserved](#support-status) |
| elapsed_time | [google.protobuf.Duration](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Duration) | | Time elapsed since this query started execution. | [reserved](#support-status) |
| plan_gist | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The compressed plan that can be converted back into the statement's logical plan. Empty if the statement is in the PREPARING state. | [reserved](#support-status) |
| placeholders | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | repeated | The placeholders if any. | [reserved](#support-status) |



Expand Down Expand Up @@ -2283,6 +2284,7 @@ ActiveQuery represents a query in flight on some Session.
| is_full_scan | [bool](#cockroach.server.serverpb.ListSessionsResponse-bool) | | True if the query contains a full table or index scan. Note that this field is only valid if the query is in the EXECUTING phase. | [reserved](#support-status) |
| elapsed_time | [google.protobuf.Duration](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Duration) | | Time elapsed since this query started execution. | [reserved](#support-status) |
| plan_gist | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The compressed plan that can be converted back into the statement's logical plan. Empty if the statement is in the PREPARING state. | [reserved](#support-status) |
| placeholders | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | repeated | The placeholders if any. | [reserved](#support-status) |



Expand Down
4 changes: 4 additions & 0 deletions pkg/server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,10 @@ message ActiveQuery {
// The compressed plan that can be converted back into the statement's logical plan.
// Empty if the statement is in the PREPARING state.
string plan_gist = 12;

// The placeholders if any.
repeated string placeholders = 13;

}

// Request object for ListSessions and ListLocalSessions.
Expand Down
17 changes: 11 additions & 6 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3177,12 +3177,16 @@ func (ex *connExecutor) serialize() serverpb.Session {
if query.hidden {
continue
}
ast, err := query.getStatement()
if err != nil {
continue
sqlNoConstants := truncateSQL(formatStatementHideConstants(query.stmt.AST))
nPlaceholders := 0
if query.placeholders != nil {
nPlaceholders = len(query.placeholders.Values)
}
placeholders := make([]string, nPlaceholders)
for i := range placeholders {
placeholders[i] = tree.AsStringWithFlags(query.placeholders.Values[i], tree.FmtSimple)
}
sqlNoConstants := truncateSQL(formatStatementHideConstants(ast))
sql := truncateSQL(ast.String())
sql := truncateSQL(query.stmt.SQL)
progress := math.Float64frombits(atomic.LoadUint64(&query.progressAtomic))
queryStart := query.start.UTC()
activeQueries = append(activeQueries, serverpb.ActiveQuery{
Expand All @@ -3192,7 +3196,8 @@ func (ex *connExecutor) serialize() serverpb.Session {
ElapsedTime: timeNow.Sub(queryStart),
Sql: sql,
SqlNoConstants: sqlNoConstants,
SqlSummary: formatStatementSummary(ast),
SqlSummary: formatStatementSummary(query.stmt.AST),
Placeholders: placeholders,
IsDistributed: query.isDistributed,
Phase: (serverpb.ActiveQuery_Phase)(query.phase),
Progress: float32(progress),
Expand Down
20 changes: 9 additions & 11 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,9 @@ func (ex *connExecutor) execStmtInOpenState(
var queryDoneAfterFunc chan struct{}
var txnDoneAfterFunc chan struct{}

// Early-associate placeholder info with the eval context,
// so that we can fill in placeholder values in our call to addActiveQuery, below.
if !ex.planner.EvalContext().HasPlaceholders() {
ex.planner.EvalContext().Placeholders = pinfo
}

var cancelQuery context.CancelFunc
ctx, cancelQuery = contextutil.WithCancel(ctx)
ex.addActiveQuery(ast, formatWithPlaceholders(ctx, ast, ex.planner.EvalContext()), queryID, cancelQuery)
ex.addActiveQuery(parserStmt, pinfo, queryID, cancelQuery)

// Make sure that we always unregister the query. It also deals with
// overwriting res.Error to a more user-friendly message in case of query
Expand Down Expand Up @@ -547,7 +541,7 @@ func (ex *connExecutor) execStmtInOpenState(
if perr, ok := retPayload.(payloadWithError); ok {
execErr = perr.errorCause()
}
filter(ctx, ex.sessionData(), ast.String(), execErr)
filter(ctx, ex.sessionData(), stmt.AST.String(), execErr)
}

// Do the auto-commit, if necessary. In the extended protocol, the
Expand Down Expand Up @@ -2106,13 +2100,17 @@ func (ex *connExecutor) enableTracing(modes []string) error {

// addActiveQuery adds a running query to the list of running queries.
func (ex *connExecutor) addActiveQuery(
ast tree.Statement, rawStmt string, queryID clusterunique.ID, cancelQuery context.CancelFunc,
stmt parser.Statement,
placeholders *tree.PlaceholderInfo,
queryID clusterunique.ID,
cancelQuery context.CancelFunc,
) {
_, hidden := ast.(tree.HiddenFromShowQueries)
_, hidden := stmt.AST.(tree.HiddenFromShowQueries)
qm := &queryMeta{
txnID: ex.state.mu.txn.ID(),
start: ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionQueryReceived),
rawStmt: rawStmt,
stmt: stmt,
placeholders: placeholders,
phase: preparing,
isDistributed: false,
isFullScan: false,
Expand Down
35 changes: 33 additions & 2 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/sql/idxusage"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
Expand Down Expand Up @@ -1891,14 +1892,16 @@ func populateQueriesTable(
planGistDatum = tree.NewDString(query.PlanGist)
}

// Interpolate placeholders into the SQL statement.
sql := formatActiveQuery(query)
if err := addRow(
tree.NewDString(query.ID),
txnID,
tree.NewDInt(tree.DInt(session.NodeID)),
sessionID,
tree.NewDString(session.Username),
ts,
tree.NewDString(query.Sql),
tree.NewDString(sql),
tree.NewDString(session.ClientAddress),
tree.NewDString(session.ApplicationName),
isDistributedDatum,
Expand Down Expand Up @@ -1938,6 +1941,33 @@ func populateQueriesTable(
return nil
}

// formatActiveQuery formats a serverpb.ActiveQuery by interpolating its
// placeholders within the string.
func formatActiveQuery(query serverpb.ActiveQuery) string {
parsed, parseErr := parser.ParseOne(query.Sql)
if parseErr != nil {
// If we failed to interpolate, rather than give up just send out the
// SQL without interpolated placeholders. Hallelujah!
return query.Sql
}
var sb strings.Builder
sql := tree.AsStringWithFlags(parsed.AST, tree.FmtSimple,
tree.FmtPlaceholderFormat(func(ctx *tree.FmtCtx, p *tree.Placeholder) {
if int(p.Idx) > len(query.Placeholders) {
ctx.Printf("$%d", p.Idx+1)
return
}
ctx.Printf(query.Placeholders[p.Idx])
}),
)
sb.WriteString(sql)
for i := range parsed.Comments {
sb.WriteString(" ")
sb.WriteString(parsed.Comments[i])
}
return sb.String()
}

const sessionsSchemaPattern = `
CREATE TABLE crdb_internal.%s (
node_id INT NOT NULL, -- the node on which the query is running
Expand Down Expand Up @@ -2008,7 +2038,8 @@ func populateSessionsTable(
// The array is leftover from a time when we allowed multiple
// queries to be executed at once in a session.
activeQueryStart = query.Start
activeQueries.WriteString(query.Sql)
sql := formatActiveQuery(query)
activeQueries.WriteString(sql)
}

var err error
Expand Down
19 changes: 5 additions & 14 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1928,10 +1928,11 @@ type queryMeta struct {
// The timestamp when this query began execution.
start time.Time

// The string of the SQL statement being executed. This string may
// contain sensitive information, so it must be converted back into
// an AST and dumped before use in logging.
rawStmt string
// The SQL statement being executed.
stmt parser.Statement

// The placeholders that the query was executed with if any.
placeholders *tree.PlaceholderInfo

// States whether this query is distributed. Note that all queries,
// including those that are distributed, have this field set to false until
Expand Down Expand Up @@ -1970,16 +1971,6 @@ func (q *queryMeta) cancel() {
q.cancelQuery()
}

// getStatement returns a cleaned version of the query associated
// with this queryMeta.
func (q *queryMeta) getStatement() (tree.Statement, error) {
parsed, err := parser.ParseOne(q.rawStmt)
if err != nil {
return nil, err
}
return parsed.AST, nil
}

// SessionDefaults mirrors fields in Session, for restoring default
// configuration values in SET ... TO DEFAULT (or RESET ...) statements.
type SessionDefaults map[string]string
Expand Down
12 changes: 10 additions & 2 deletions pkg/sql/parser/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type Statement struct {
// AST is the root of the AST tree for the parsed statement.
AST tree.Statement

// Comments is the list of parsed SQL comments.
Comments []string

// SQL is the original SQL from which the statement was parsed. Note that this
// is not appropriate for use in logging, as it may contain passwords and
// other sensitive data.
Expand Down Expand Up @@ -182,7 +185,6 @@ func (p *Parser) scanOneStmt() (sql string, tokens []sqlSymType, done bool) {
return p.scanner.In()[startPos:], tokens, true
}
preValID = lval.id
posBeforeScan := p.scanner.Pos()
tokens = append(tokens, sqlSymType{})
lval = &tokens[len(tokens)-1]
p.scanner.Scan(lval)
Expand All @@ -194,8 +196,13 @@ func (p *Parser) scanOneStmt() (sql string, tokens []sqlSymType, done bool) {
curFuncBodyCnt--
}
if lval.id == 0 || (curFuncBodyCnt == 0 && lval.id == ';') {
endPos := p.scanner.Pos()
if lval.id == ';' {
// Don't include the ending semicolon, if there is one, in the raw SQL.
endPos--
}
tokens = tokens[:len(tokens)-1]
return p.scanner.In()[startPos:posBeforeScan], tokens, (lval.id == 0)
return p.scanner.In()[startPos:endPos], tokens, (lval.id == 0)
}
lval.pos -= startPos
}
Expand Down Expand Up @@ -253,6 +260,7 @@ func (p *Parser) parse(
return Statement{
AST: p.lexer.stmt,
SQL: sql,
Comments: p.scanner.Comments,
NumPlaceholders: p.lexer.numPlaceholders,
NumAnnotations: p.lexer.numAnnotations,
}, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/parser/parse_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestScanOneStmt(t *testing.T) {
{
sql: `SELECT 1 /* comment */ ; /* comment */ ; /* comment */ `,
exp: []stmt{{
sql: `SELECT 1`,
sql: `SELECT 1 /* comment */ `,
tok: []int{SELECT, ICONST},
}},
},
Expand Down Expand Up @@ -73,11 +73,11 @@ func TestScanOneStmt(t *testing.T) {
sql: ` ; /* x */ SELECT 1 ; SET /* y */ ; ; INSERT INTO table; ;`,
exp: []stmt{
{
sql: `SELECT 1`,
sql: `SELECT 1 `,
tok: []int{SELECT, ICONST},
},
{
sql: `SET`,
sql: `SET /* y */ `,
tok: []int{SET},
},
{
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/parser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,10 +639,12 @@ func TestParseSQL(t *testing.T) {
{in: ``, exp: nil},
{in: `SELECT 1`, exp: []string{`SELECT 1`}},
{in: `SELECT 1;`, exp: []string{`SELECT 1`}},
{in: `SELECT 1 /* comment */`, exp: []string{`SELECT 1`}},
// We currently chop off beginning-of-line comments.
{in: `/* comment */ SELECT 1`, exp: []string{`SELECT 1`}},
{in: `SELECT 1 /* comment */`, exp: []string{`SELECT 1 /* comment */`}},
{in: `SELECT 1;SELECT 2`, exp: []string{`SELECT 1`, `SELECT 2`}},
{in: `SELECT 1 /* comment */ ;SELECT 2`, exp: []string{`SELECT 1`, `SELECT 2`}},
{in: `SELECT 1 /* comment */ ; /* comment */ SELECT 2`, exp: []string{`SELECT 1`, `SELECT 2`}},
{in: `SELECT 1 /* comment */ ;SELECT 2`, exp: []string{`SELECT 1 /* comment */ `, `SELECT 2`}},
{in: `SELECT 1 /* comment */ ; SELECT 2`, exp: []string{`SELECT 1 /* comment */ `, `SELECT 2`}},
}
var p parser.Parser // Verify that the same parser can be reused.
for _, d := range testData {
Expand All @@ -656,7 +658,7 @@ func TestParseSQL(t *testing.T) {
res = append(res, stmts[i].SQL)
}
if !reflect.DeepEqual(res, d.exp) {
t.Errorf("expected \n%v\n, but found %v", res, d.exp)
t.Errorf("expected \n%v\n, but found %v", d.exp, res)
}
})
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/scanner/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ type Scanner struct {
in string
pos int
bytesPrealloc []byte

// Comments is the list of parsed comments from the SQL statement.
Comments []string
}

// In returns the input string.
Expand Down Expand Up @@ -453,6 +456,7 @@ func (s *Scanner) next() int {
func (s *Scanner) skipWhitespace(lval ScanSymType, allowComments bool) (newline, ok bool) {
newline = false
for {
startPos := s.pos
ch := s.peek()
if ch == '\n' {
s.pos++
Expand All @@ -467,6 +471,8 @@ func (s *Scanner) skipWhitespace(lval ScanSymType, allowComments bool) (newline,
if present, cok := s.ScanComment(lval); !cok {
return false, false
} else if present {
// Mark down the comments that we found.
s.Comments = append(s.Comments, s.in[startPos:s.pos])
continue
}
}
Expand Down
24 changes: 23 additions & 1 deletion pkg/sql/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,21 @@ func TestShowQueriesFillsInValuesForPlaceholders(t *testing.T) {
[]interface{}{"hello"},
"SELECT upper('hello')",
},
{
"SELECT /* test */ upper($1)",
[]interface{}{"hello"},
"SELECT upper('hello') /* test */",
},
{
"SELECT /* test */ 'hi'::string",
[]interface{}{},
"SELECT 'hi'::STRING /* test */",
},
{
"SELECT /* test */ 'hi'::string /* fnord */",
[]interface{}{},
"SELECT 'hi'::STRING /* test */ /* fnord */",
},
}

// Perform both as a simple execution and as a prepared statement,
Expand Down Expand Up @@ -761,7 +776,14 @@ func TestShowQueriesFillsInValuesForPlaceholders(t *testing.T) {
t.Fatal(err)
}

require.Equal(t, test.expected, recordedQueries[test.statement])
// parse and stringify the statement so that it matches the key in the
// recordedQueries map.
stmt, err := parser.ParseOne(test.statement)
if err != nil {
t.Fatal(err)
}
sql := stmt.AST.String()
require.Equal(t, test.expected, recordedQueries[sql])
})
}
}
Expand Down