Skip to content

Commit

Permalink
server: support separate transactions on sql api
Browse files Browse the repository at this point in the history
Part of: cockroachdb#102386

This change introduces a `separate_txns` field to the sql api endpoint.
This field allows callers to run provided statements in separate
transactions. Failures in separate transactions are scoped to their
respective transaction and do not interfere with the execution of
subsequent transactions. The exception to this case is if the max
response size is exceeded. If the response size is exceeded in an
earlier transaction, it will still exceed the limit in subsequent
transactions.  The response-level error using `separate_txns` is a
generic message: `separate transaction payload encountered transaction
error(s)`, indicating that an error has occurred in at least one of the
transactions.

Release note: None
  • Loading branch information
Thomas Hardy committed Aug 10, 2023
1 parent 2c650af commit d116a38
Show file tree
Hide file tree
Showing 3 changed files with 415 additions and 9 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/serverccl/testdata/api_v2_sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ sql admin
"database": "system",
"execute": false,
"max_result_size": 10000,
"separate_txns": false,
"statements": [
{
"arguments": [
Expand Down
65 changes: 56 additions & 9 deletions pkg/server/api_v2_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ var SQLAPIClock timeutil.TimeSource = timeutil.DefaultTimeSource{}
// type: array
// description: The result rows.
// items: {}

func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) {
// Type for the request.
type requestType struct {
Expand All @@ -204,12 +205,14 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) {
Database string `json:"database"`
ApplicationName string `json:"application_name"`
Execute bool `json:"execute"`
SeparateTxns bool `json:"separate_txns"`
Statements []struct {
SQL string `json:"sql"`
stmt statements.Statement[tree.Statement] `json:"-"`
Arguments []interface{} `json:"arguments,omitempty"`
} `json:"statements"`
}

// Type for the result.
type txnResult struct {
Statement int `json:"statement"` // index of statement in request.
Expand Down Expand Up @@ -364,25 +367,61 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) {
options := []isql.TxnOption{
isql.WithPriority(admissionpb.NormalPri),
}

// execResultRunner is the function that runs the entirety of the request.
var execResultRunner func(context.Context, func(context.Context, isql.Txn) error, ...isql.TxnOption) error
// stmtRunner is the function that runs each statement in turn.
var stmtRunner func(ctx context.Context, outerTxn isql.Txn, queryFn func(ctx context.Context, innerTxn isql.Txn) error, opts ...isql.TxnOption) error
var handleStmtErr func(outerErr, stmtErr error) (error, error)

// Select which runners to use depending on the transaction mode.
if !requestPayload.SeparateTxns {
execResultRunner = a.sqlServer.internalDB.Txn
handleStmtErr = func(_, stmtErr error) (error, error) {
// In a single txn, any stmt err ends the txn, so the stmt err is also the outer err
return stmtErr, stmtErr
}
stmtRunner = func(ctx context.Context, outerTxn isql.Txn, queryFn func(ctx context.Context, innerTxn isql.Txn) error, _ ...isql.TxnOption) error {
return queryFn(ctx, outerTxn)
}
} else {
execResultRunner = func(ctx context.Context, queryFn func(ctx context.Context, txn isql.Txn) error, _ ...isql.TxnOption) error {
return queryFn(ctx, nil /* txn */)
}
handleStmtErr = func(outerErr, stmtErr error) (error, error) {
// If we encounter a stmt error with separate txns, then set the outer error without returning it
if stmtErr != nil && outerErr == nil {
outerErr = errors.New("separate transaction payload encountered transaction error(s)")
}
return outerErr, nil
}
stmtRunner = func(ctx context.Context, _ isql.Txn, queryFn func(ctx context.Context, innerTxn isql.Txn) error, opts ...isql.TxnOption) error {
return a.sqlServer.internalDB.Txn(ctx, queryFn, opts...)
}
}

result.Execution = &execResult{}
result.Execution.TxnResults = make([]txnResult, 0, len(requestPayload.Statements))

err = timeutil.RunWithTimeout(ctx, "run-sql-via-api", timeout, func(ctx context.Context) error {
retryNum := 0

return a.sqlServer.internalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return execResultRunner(ctx, func(ctx context.Context, txn isql.Txn) error {
result.Execution.TxnResults = result.Execution.TxnResults[:0]
result.Execution.Retries = retryNum
retryNum++
curSize := uintptr(0)
var outerErr error
checkSize := func(size uintptr) error {
if size > uintptr(requestPayload.MaxResultSize) {
return errors.New("max result size exceeded")
}
return nil
}
addSize := func(row tree.Datums) error {
for _, c := range row {
curSize += c.Size()
}
if curSize > uintptr(requestPayload.MaxResultSize) {
return errors.New("max result size exceeded")
}
return nil
return checkSize(curSize)
}

for stmtIdx, stmt := range requestPayload.Statements {
Expand All @@ -395,7 +434,7 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) {
txnRes := &result.Execution.TxnResults[stmtIdx]

returnType := stmt.stmt.AST.StatementReturnType()
stmtErr := func() (retErr error) {
stmtErr := stmtRunner(ctx, txn, func(ctx context.Context, txn isql.Txn) (retErr error) {
txnRes.Start = jsonTime(SQLAPIClock.Now())
txnRes.Statement = stmtIdx + 1
txnRes.Tag = stmt.stmt.AST.StatementTag()
Expand All @@ -407,6 +446,13 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) {
}
}()

// If the max size has been exceeded by previous statements/transactions
// avoid executing, return immediately.
err := checkSize(curSize)
if err != nil {
return err
}

it, err := txn.QueryIteratorEx(ctx, "run-query-via-api", txn.KV(),
sessiondata.InternalExecutorOverride{
User: username,
Expand Down Expand Up @@ -442,12 +488,13 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) {
}
}
return err
}()
}, options...)
outerErr, stmtErr = handleStmtErr(outerErr, stmtErr)
if stmtErr != nil {
return stmtErr
}
}
return nil
return outerErr
}, options...)
})
if err != nil {
Expand Down
Loading

0 comments on commit d116a38

Please sign in to comment.