Skip to content

Commit

Permalink
server: support separate transactions on sql api
Browse files Browse the repository at this point in the history
Part of: #102386

This change introduces a `separate_txns` field to the sql api endpoint.
This field allows callers to run provided statements in separate
transactions. Failures in separate transactions are scoped to their
respective transaction and do not interfere with the execution of
subsequent transactions. The exception to this case is if the max
response size is exceeded. If the response size is exceeded in an
earlier transaction, it will still exceed the limit in subsequent
transactions.  The response-level error using `separate_txns` is a
generic message: `separate transaction payload encountered transaction
error(s)`, indicating that an error has occurred in at least one of the
transactions.

We additionally add a `stop_on_error` field to allow callers to stop
execution of subsequent txns (i.e. in the case where `separate_txns` is
true).

Release note: None
  • Loading branch information
Thomas Hardy committed Aug 9, 2023
1 parent 05b6181 commit 6fabd1d
Show file tree
Hide file tree
Showing 4 changed files with 466 additions and 17 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/serverccl/testdata/api_v2_sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ sql admin
"database": "system",
"execute": false,
"max_result_size": 10000,
"separate_txns": false,
"statements": [
{
"arguments": [
Expand All @@ -21,6 +22,7 @@ sql admin
"sql": "SELECT username FROM users WHERE username = $1"
}
],
"stop_on_error": false,
"timeout": "5s"
}
}
Expand Down
78 changes: 67 additions & 11 deletions pkg/server/api_v2_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ var SQLAPIClock timeutil.TimeSource = timeutil.DefaultTimeSource{}
// type: array
// description: The result rows.
// items: {}

func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) {
// Type for the request.
type requestType struct {
Expand All @@ -204,12 +205,15 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) {
Database string `json:"database"`
ApplicationName string `json:"application_name"`
Execute bool `json:"execute"`
SeparateTxns bool `json:"separate_txns"`
StopOnError bool `json:"stop_on_error"`
Statements []struct {
SQL string `json:"sql"`
stmt statements.Statement[tree.Statement] `json:"-"`
Arguments []interface{} `json:"arguments,omitempty"`
} `json:"statements"`
}

// Type for the result.
type txnResult struct {
Statement int `json:"statement"` // index of statement in request.
Expand Down Expand Up @@ -364,25 +368,68 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) {
options := []isql.TxnOption{
isql.WithPriority(admissionpb.NormalPri),
}

// execResultRunner is the function that runs the entirety of the request.
var execResultRunner func(context.Context, func(context.Context, isql.Txn) error, ...isql.TxnOption) error
// stmtRunner is the function that runs each statement in turn.
var stmtRunner func(ctx context.Context, outerTxn isql.Txn, queryFn func(ctx context.Context, innerTxn isql.Txn) error, opts ...isql.TxnOption) error
var handleStmtErr func(outerErr, stmtErr error) (err error, terminate bool)

// Select which runners to use depending on the transaction mode.
if !requestPayload.SeparateTxns {
execResultRunner = a.sqlServer.internalDB.Txn
handleStmtErr = func(_, stmtErr error) (error, bool) {
// In a single txn, any stmt err ends the txn
return stmtErr, stmtErr != nil
}
stmtRunner = func(ctx context.Context, outerTxn isql.Txn, queryFn func(ctx context.Context, innerTxn isql.Txn) error, _ ...isql.TxnOption) error {
return queryFn(ctx, outerTxn)
}
} else {
execResultRunner = func(ctx context.Context, queryFn func(ctx context.Context, txn isql.Txn) error, _ ...isql.TxnOption) error {
return queryFn(ctx, nil /* txn */)
}
handleStmtErr = func(outerErr, stmtErr error) (error, bool) {
// If we encounter a stmt error with separate txns, set the outer error
if stmtErr != nil {
if outerErr == nil {
outerErr = errors.New("separate transaction payload encountered transaction error(s)")
}
// If StopOnError is specified, return the outerErr and terminate
if requestPayload.StopOnError {
return outerErr, true
}
}
// Return outerErr without terminating
return outerErr, false
}
stmtRunner = func(ctx context.Context, _ isql.Txn, queryFn func(ctx context.Context, innerTxn isql.Txn) error, opts ...isql.TxnOption) error {
return a.sqlServer.internalDB.Txn(ctx, queryFn, opts...)
}
}

result.Execution = &execResult{}
result.Execution.TxnResults = make([]txnResult, 0, len(requestPayload.Statements))

err = contextutil.RunWithTimeout(ctx, "run-sql-via-api", timeout, func(ctx context.Context) error {
retryNum := 0

return a.sqlServer.internalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return execResultRunner(ctx, func(ctx context.Context, txn isql.Txn) error {
result.Execution.TxnResults = result.Execution.TxnResults[:0]
result.Execution.Retries = retryNum
retryNum++
curSize := uintptr(0)
var outerErr error
checkSize := func(size uintptr) error {
if size > uintptr(requestPayload.MaxResultSize) {
return errors.New("max result size exceeded")
}
return nil
}
addSize := func(row tree.Datums) error {
for _, c := range row {
curSize += c.Size()
}
if curSize > uintptr(requestPayload.MaxResultSize) {
return errors.New("max result size exceeded")
}
return nil
return checkSize(curSize)
}

for stmtIdx, stmt := range requestPayload.Statements {
Expand All @@ -395,7 +442,7 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) {
txnRes := &result.Execution.TxnResults[stmtIdx]

returnType := stmt.stmt.AST.StatementReturnType()
stmtErr := func() (retErr error) {
stmtErr := stmtRunner(ctx, txn, func(ctx context.Context, txn isql.Txn) (retErr error) {
txnRes.Start = jsonTime(SQLAPIClock.Now())
txnRes.Statement = stmtIdx + 1
txnRes.Tag = stmt.stmt.AST.StatementTag()
Expand All @@ -407,6 +454,13 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) {
}
}()

// If the max size has been exceeded by previous statements/transactions
// avoid executing, return immediately.
err := checkSize(curSize)
if err != nil {
return err
}

it, err := txn.QueryIteratorEx(ctx, "run-query-via-api", txn.KV(),
sessiondata.InternalExecutorOverride{
User: username,
Expand Down Expand Up @@ -442,12 +496,14 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) {
}
}
return err
}()
if stmtErr != nil {
return stmtErr
}, options...)
handledErr, terminate := handleStmtErr(outerErr, stmtErr)
outerErr = handledErr
if terminate {
return outerErr
}
}
return nil
return outerErr
}, options...)
})
if err != nil {
Expand Down
44 changes: 38 additions & 6 deletions pkg/server/api_v2_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,44 @@ func TestExecSQL(t *testing.T) {
require.NoError(t, err)
return fmt.Sprintf("%s|%s", er.Error.Code, er.Error.Message)
}
var u interface{}
err = json.Unmarshal(r, &u)
require.NoError(t, err)
s, err := json.MarshalIndent(u, "", " ")
require.NoError(t, err)
return string(s)
return getMarshalledResponse(t, r)
},
)
}

func getMarshalledResponse(t *testing.T, r []byte) string {
type marshallJsonError struct {
Code string `json:"code"`
Message string `json:"message"`
Severity string `json:"severity"`
}
// Type for the result.
type marshalledTxnResult struct {
Columns interface{} `json:"columns,omitempty"`
End interface{} `json:"end"` // end timestamp.
Error *marshallJsonError `json:"error,omitempty"`
Rows interface{} `json:"rows,omitempty"`
RowsAffected int `json:"rows_affected"`
Start interface{} `json:"start"` // start timestamp.
Statement int `json:"statement"` // index of statement in request.
Tag string `json:"tag"` // SQL statement tag.
}
type marshalledExecResult struct {
Retries int `json:"retries,omitempty"`
TxnResults []marshalledTxnResult `json:"txn_results"`
}

type marshalledResponse struct {
Error *marshallJsonError `json:"error,omitempty"`
Execution *marshalledExecResult `json:"execution,omitempty"`
NumStatements int `json:"num_statements,omitempty"`
Request interface{} `json:"request,omitempty"`
}

var u = &marshalledResponse{}
err := json.Unmarshal(r, u)
require.NoError(t, err)
s, err := json.MarshalIndent(u, "", " ")
require.NoError(t, err)
return string(s)
}
Loading

0 comments on commit 6fabd1d

Please sign in to comment.