diff --git a/pkg/ccl/serverccl/testdata/api_v2_sql b/pkg/ccl/serverccl/testdata/api_v2_sql index ee1171781afa..0aedbfd2789a 100644 --- a/pkg/ccl/serverccl/testdata/api_v2_sql +++ b/pkg/ccl/serverccl/testdata/api_v2_sql @@ -13,6 +13,7 @@ sql admin "database": "system", "execute": false, "max_result_size": 10000, + "separate_txns": false, "statements": [ { "arguments": [ diff --git a/pkg/server/api_v2_sql.go b/pkg/server/api_v2_sql.go index 3d49e46bb348..ea7791771178 100644 --- a/pkg/server/api_v2_sql.go +++ b/pkg/server/api_v2_sql.go @@ -196,6 +196,7 @@ var SQLAPIClock timeutil.TimeSource = timeutil.DefaultTimeSource{} // type: array // description: The result rows. // items: {} + func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) { // Type for the request. type requestType struct { @@ -204,12 +205,14 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) { Database string `json:"database"` ApplicationName string `json:"application_name"` Execute bool `json:"execute"` + SeparateTxns bool `json:"separate_txns"` Statements []struct { SQL string `json:"sql"` stmt statements.Statement[tree.Statement] `json:"-"` Arguments []interface{} `json:"arguments,omitempty"` } `json:"statements"` } + // Type for the result. type txnResult struct { Statement int `json:"statement"` // index of statement in request. @@ -364,25 +367,61 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) { options := []isql.TxnOption{ isql.WithPriority(admissionpb.NormalPri), } + + // execResultRunner is the function that runs the entirety of the request. + var execResultRunner func(context.Context, func(context.Context, isql.Txn) error, ...isql.TxnOption) error + // stmtRunner is the function that runs each statement in turn. + var stmtRunner func(ctx context.Context, outerTxn isql.Txn, queryFn func(ctx context.Context, innerTxn isql.Txn) error, opts ...isql.TxnOption) error + var handleStmtErr func(outerErr, stmtErr error) (error, error) + + // Select which runners to use depending on the transaction mode. + if !requestPayload.SeparateTxns { + execResultRunner = a.sqlServer.internalDB.Txn + handleStmtErr = func(_, stmtErr error) (error, error) { + // In a single txn, any stmt err ends the txn, so the stmt err is also the outer err + return stmtErr, stmtErr + } + stmtRunner = func(ctx context.Context, outerTxn isql.Txn, queryFn func(ctx context.Context, innerTxn isql.Txn) error, _ ...isql.TxnOption) error { + return queryFn(ctx, outerTxn) + } + } else { + execResultRunner = func(ctx context.Context, queryFn func(ctx context.Context, txn isql.Txn) error, _ ...isql.TxnOption) error { + return queryFn(ctx, nil /* txn */) + } + handleStmtErr = func(outerErr, stmtErr error) (error, error) { + // If we encounter a stmt error with separate txns, then set the outer error without returning it + if stmtErr != nil && outerErr == nil { + outerErr = errors.New("separate transaction payload encountered transaction error(s)") + } + return outerErr, nil + } + stmtRunner = func(ctx context.Context, _ isql.Txn, queryFn func(ctx context.Context, innerTxn isql.Txn) error, opts ...isql.TxnOption) error { + return a.sqlServer.internalDB.Txn(ctx, queryFn, opts...) + } + } + result.Execution = &execResult{} result.Execution.TxnResults = make([]txnResult, 0, len(requestPayload.Statements)) err = timeutil.RunWithTimeout(ctx, "run-sql-via-api", timeout, func(ctx context.Context) error { retryNum := 0 - - return a.sqlServer.internalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + return execResultRunner(ctx, func(ctx context.Context, txn isql.Txn) error { result.Execution.TxnResults = result.Execution.TxnResults[:0] result.Execution.Retries = retryNum retryNum++ curSize := uintptr(0) + var outerErr error + checkSize := func(size uintptr) error { + if size > uintptr(requestPayload.MaxResultSize) { + return errors.New("max result size exceeded") + } + return nil + } addSize := func(row tree.Datums) error { for _, c := range row { curSize += c.Size() } - if curSize > uintptr(requestPayload.MaxResultSize) { - return errors.New("max result size exceeded") - } - return nil + return checkSize(curSize) } for stmtIdx, stmt := range requestPayload.Statements { @@ -395,7 +434,7 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) { txnRes := &result.Execution.TxnResults[stmtIdx] returnType := stmt.stmt.AST.StatementReturnType() - stmtErr := func() (retErr error) { + stmtErr := stmtRunner(ctx, txn, func(ctx context.Context, txn isql.Txn) (retErr error) { txnRes.Start = jsonTime(SQLAPIClock.Now()) txnRes.Statement = stmtIdx + 1 txnRes.Tag = stmt.stmt.AST.StatementTag() @@ -407,6 +446,13 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) { } }() + // If the max size has been exceeded by previous statements/transactions + // avoid executing, return immediately. + err := checkSize(curSize) + if err != nil { + return err + } + it, err := txn.QueryIteratorEx(ctx, "run-query-via-api", txn.KV(), sessiondata.InternalExecutorOverride{ User: username, @@ -442,12 +488,13 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) { } } return err - }() + }, options...) + outerErr, stmtErr = handleStmtErr(outerErr, stmtErr) if stmtErr != nil { return stmtErr } } - return nil + return outerErr }, options...) }) if err != nil { diff --git a/pkg/server/testdata/api_v2_sql b/pkg/server/testdata/api_v2_sql index 3797ea0892c1..a2ca1faba8e7 100644 --- a/pkg/server/testdata/api_v2_sql +++ b/pkg/server/testdata/api_v2_sql @@ -11,6 +11,7 @@ sql admin "database": "system", "execute": false, "max_result_size": 10000, + "separate_txns": false, "statements": [ { "arguments": [ @@ -546,3 +547,360 @@ sql admin }, "num_statements": 1 } + +# Test that running queries in separate transactions returns expected results +# for each transaction. +sql admin +{ + "database": "testdb", + "execute": true, + "separate_txns": true, + "statements": [ + {"sql": "CREATE database testdb"}, + {"sql": "CREATE table testdb.test (id int)"}, + {"sql": "INSERT INTO testdb.test VALUES (1)"} + ] +} +---- +{ + "execution": { + "txn_results": [ + { + "columns": [ + { + "name": "rows_affected", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 1, + "tag": "CREATE DATABASE" + }, + { + "columns": [ + { + "name": "rows_affected", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 2, + "tag": "CREATE TABLE" + }, + { + "columns": [ + { + "name": "rows_affected", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows_affected": 1, + "start": "1970-01-01T00:00:00Z", + "statement": 3, + "tag": "INSERT" + } + ] + }, + "num_statements": 3 +} + +# Test that errors are localized to their respective transaction +# (i.e. an error on transaction 2 should not affect transaction 1 and 3). +# Ensure that the response level error is populated. +sql non-admin +{ + "database": "system", + "execute": true, + "separate_txns": true, + "statements": [ + {"sql": "SELECT username FROM users where username = 'admin'"}, + {"sql": "SELECT 1"}, + {"sql": "SELECT 1"} + ] +} +---- +{ + "error": { + "code": "XXUUU", + "message": "separate transaction payload encountered transaction error(s)", + "severity": "ERROR", + "source": { + "file": "api_v2_sql.go", + "function": "func6", + "line": 394 + } + }, + "execution": { + "txn_results": [ + { + "end": "1970-01-01T00:00:00Z", + "error": { + "code": "42501", + "message": "executing stmt 1: run-query-via-api: user authentic_user_noadmin does not have SELECT privilege on relation users", + "severity": "ERROR", + "source": { + "file": "authorization.go", + "function": "insufficientPrivilegeError", + "line": 1027 + } + }, + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 1, + "tag": "SELECT" + }, + { + "columns": [ + { + "name": "?column?", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows": [ + { + "?column?": 1 + } + ], + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 2, + "tag": "SELECT" + }, + { + "columns": [ + { + "name": "?column?", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows": [ + { + "?column?": 1 + } + ], + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 3, + "tag": "SELECT" + } + ] + }, + "num_statements": 3 +} + +# Test multiple localized transaction errors. +sql non-admin +{ + "database": "system", + "execute": true, + "separate_txns": true, + "statements": [ + {"sql": "SELECT username FROM users where username = 'admin'"}, + {"sql": "SELECT 1"}, + {"sql": "SELECT field FROM not_exist"} + ] +} +---- +{ + "error": { + "code": "XXUUU", + "message": "separate transaction payload encountered transaction error(s)", + "severity": "ERROR", + "source": { + "file": "api_v2_sql.go", + "function": "func6", + "line": 394 + } + }, + "execution": { + "txn_results": [ + { + "end": "1970-01-01T00:00:00Z", + "error": { + "code": "42501", + "message": "executing stmt 1: run-query-via-api: user authentic_user_noadmin does not have SELECT privilege on relation users", + "severity": "ERROR", + "source": { + "file": "authorization.go", + "function": "insufficientPrivilegeError", + "line": 1027 + } + }, + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 1, + "tag": "SELECT" + }, + { + "columns": [ + { + "name": "?column?", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows": [ + { + "?column?": 1 + } + ], + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 2, + "tag": "SELECT" + }, + { + "end": "1970-01-01T00:00:00Z", + "error": { + "code": "42P01", + "message": "executing stmt 3: run-query-via-api: relation \"not_exist\" does not exist", + "severity": "ERROR", + "source": { + "file": "errors.go", + "function": "NewUndefinedRelationError", + "line": 165 + } + }, + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 3, + "tag": "SELECT" + } + ] + }, + "num_statements": 3 +} + +# Test that total size is being tracked correctly across multiple transactions +# "SELECT 1" has a row size of 8. We set the max result size for the request +# to 23 and request execution of 4 "SELECT 1" statements, totalling a size of 32. +# We check that we receive a 'max size result exceeded' error on the expected statements (3 & 4) +# We check that query 4 does not execute as the max size limit is exceeded by query 3, prior to its execution. +sql admin +{ + "database": "system", + "execute": true, + "max_result_size": 23, + "separate_txns": true, + "statements": [ + {"sql": "SELECT 1"}, + {"sql": "SELECT 1"}, + {"sql": "SELECT 1"}, + {"sql": "SELECT 1"} + ] +} +---- +{ + "error": { + "code": "XXUUU", + "message": "separate transaction payload encountered transaction error(s)", + "severity": "ERROR", + "source": { + "file": "api_v2_sql.go", + "function": "func6", + "line": 394 + } + }, + "execution": { + "txn_results": [ + { + "columns": [ + { + "name": "?column?", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows": [ + { + "?column?": 1 + } + ], + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 1, + "tag": "SELECT" + }, + { + "columns": [ + { + "name": "?column?", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows": [ + { + "?column?": 1 + } + ], + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 2, + "tag": "SELECT" + }, + { + "columns": [ + { + "name": "?column?", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "error": { + "code": "XXUUU", + "message": "executing stmt 3: max result size exceeded", + "severity": "ERROR", + "source": { + "file": "api_v2_sql.go", + "function": "1", + "line": 416 + } + }, + "rows": [ + { + "?column?": 1 + } + ], + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 3, + "tag": "SELECT" + }, + { + "end": "1970-01-01T00:00:00Z", + "error": { + "code": "XXUUU", + "message": "executing stmt 4: max result size exceeded", + "severity": "ERROR", + "source": { + "file": "api_v2_sql.go", + "function": "1", + "line": 416 + } + }, + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 4, + "tag": "SELECT" + } + ] + }, + "num_statements": 4 +}