Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: implement spill disk for cursorFetch result #45163

Merged
merged 14 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parser/mysql/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ var MySQLErrName = map[uint16]*ErrMessage{
ErrKeyRefDoNotMatchTableRef: Message("Key reference and table reference don't match", nil),
ErrOperandColumns: Message("Operand should contain %d column(s)", nil),
ErrSubqueryNo1Row: Message("Subquery returns more than 1 row", nil),
ErrUnknownStmtHandler: Message("Unknown prepared statement handler (%.*s) given to %s", nil),
ErrUnknownStmtHandler: Message("Unknown prepared statement handler %s given to %s", nil),
ErrCorruptHelpDB: Message("Help database is corrupt or does not exist", nil),
ErrCyclicReference: Message("Cyclic reference on subqueries", nil),
ErrAutoConvert: Message("Converting column '%s' from %s to %s", nil),
Expand Down
65 changes: 31 additions & 34 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2270,7 +2270,12 @@ func (cc *clientConn) writeResultSet(ctx context.Context, rs ResultSet, binary b
cc.initResultEncoder(ctx)
defer cc.rsEncoder.Clean()
if mysql.HasCursorExistsFlag(serverStatus) {
if err := cc.writeChunksWithFetchSize(ctx, rs, serverStatus, fetchSize); err != nil {
crs, ok := rs.(cursorResultSet)
if !ok {
// this branch is actually unreachable
return false, errors.New("this cursor is not a resultSet")
}
if err := cc.writeChunksWithFetchSize(ctx, crs, serverStatus, fetchSize); err != nil {
return false, err
}
return false, cc.flush(ctx)
Expand Down Expand Up @@ -2404,43 +2409,27 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool
// binary specifies the way to dump data. It throws any error while dumping data.
// serverStatus, a flag bit represents server information.
// fetchSize, the desired number of rows to be fetched each time when client uses cursor.
func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet, serverStatus uint16, fetchSize int) error {
fetchedRows := rs.GetFetchedRows()

// tell the client COM_STMT_FETCH has finished by setting proper serverStatus,
// and close ResultSet.
if len(fetchedRows) == 0 {
serverStatus &^= mysql.ServerStatusCursorExists
serverStatus |= mysql.ServerStatusLastRowSend
return cc.writeEOF(ctx, serverStatus)
}

// construct the rows sent to the client according to fetchSize.
var curRows []chunk.Row
if fetchSize < len(fetchedRows) {
curRows = fetchedRows[:fetchSize]
fetchedRows = fetchedRows[fetchSize:]
} else {
curRows = fetchedRows
fetchedRows = fetchedRows[:0]
}
rs.StoreFetchedRows(fetchedRows)

func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs cursorResultSet, serverStatus uint16, fetchSize int) error {
var (
stmtDetail *execdetails.StmtExecDetails
err error
start time.Time
)
data := cc.alloc.AllocWithLen(4, 1024)
var stmtDetail *execdetails.StmtExecDetails
stmtDetailRaw := ctx.Value(execdetails.StmtExecDetailKey)
if stmtDetailRaw != nil {
//nolint:forcetypeassert
stmtDetail = stmtDetailRaw.(*execdetails.StmtExecDetails)
}
var (
err error
start time.Time
)
if stmtDetail != nil {
start = time.Now()
}
for _, row := range curRows {

iter := rs.GetRowContainerReader()
// send the rows to the client according to fetchSize.
for i := 0; i < fetchSize && iter.Current() != iter.End(); i++ {
row := iter.Current()

data = data[0:4]
data, err = dumpBinaryRow(data, rs.Columns(), row, cc.rsEncoder)
if err != nil {
Expand All @@ -2449,16 +2438,24 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet
if err = cc.writePacket(data); err != nil {
return err
}

iter.Next()
}
if stmtDetail != nil {
stmtDetail.WriteSQLRespDuration += time.Since(start)
if iter.Error() != nil {
return iter.Error()
}

// tell the client COM_STMT_FETCH has finished by setting proper serverStatus,
// and close ResultSet.
if iter.Current() == iter.End() {
serverStatus &^= mysql.ServerStatusCursorExists
serverStatus |= mysql.ServerStatusLastRowSend
}

if cl, ok := rs.(fetchNotifier); ok {
cl.OnFetchReturned()
YangKeao marked this conversation as resolved.
Show resolved Hide resolved
}
if stmtDetail != nil {
start = time.Now()
}

err = cc.writeEOF(ctx, serverStatus)
if stmtDetail != nil {
stmtDetail.WriteSQLRespDuration += time.Since(start)
Expand Down
166 changes: 134 additions & 32 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser"
Expand All @@ -53,13 +54,17 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/server/internal/dump"
"github.com/pingcap/tidb/server/internal/parse"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
storeerr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

func (cc *clientConn) handleStmtPrepare(ctx context.Context, sql string) error {
Expand Down Expand Up @@ -201,7 +206,11 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
}

err = parse.ExecArgs(cc.ctx.GetSessionVars().StmtCtx, args, stmt.BoundParams(), nullBitmaps, stmt.GetParamsType(), paramValues, cc.inputDecoder)
stmt.Reset()
// This `.Reset` resets the arguments, so it's fine to just ignore the error (and the it'll be reset again in the following routine)
errReset := stmt.Reset()
if errReset != nil {
logutil.Logger(ctx).Warn("fail to reset statement in EXECUTE command", zap.Error(errReset))
}
if err != nil {
return errors.Annotate(err, cc.preparedStmt2String(stmtID))
}
Expand Down Expand Up @@ -263,6 +272,26 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
PrepStmt: prepStmt,
}

// first, try to clear the left cursor if there is one
if useCursor && stmt.GetCursorActive() {
if stmt.GetResultSet() != nil && stmt.GetResultSet().GetRowContainerReader() != nil {
stmt.GetResultSet().GetRowContainerReader().Close()
}
if stmt.GetRowContainer() != nil {
stmt.GetRowContainer().GetMemTracker().Detach()
stmt.GetRowContainer().GetDiskTracker().Detach()
err := stmt.GetRowContainer().Close()
if err != nil {
logutil.Logger(ctx).Error(
"Fail to close rowContainer before executing statement. May cause resource leak",
zap.Error(err))
}
stmt.StoreRowContainer(nil)
}
stmt.StoreResultSet(nil)
stmt.SetCursorActive(false)
}

// For the combination of `ComPrepare` and `ComExecute`, the statement name is stored in the client side, and the
// TiDB only has the ID, so don't try to construct an `EXECUTE SOMETHING`. Use the original prepared statement here
// instead.
Expand Down Expand Up @@ -304,42 +333,83 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
// we should hold the ResultSet in PreparedStatement for next stmt_fetch, and only send back ColumnInfo.
// Tell the client cursor exists in server by setting proper serverStatus.
if useCursor {
crs := wrapWithCursor(rs)

cc.initResultEncoder(ctx)
defer cc.rsEncoder.Clean()
// fetch all results of the resultSet, and stored them locally, so that the future `FETCH` command can read
// the rows directly to avoid running executor and accessing shared params/variables in the session
// NOTE: chunk should not be allocated from the connection allocator, which will reset after executing this command
// but the rows are still needed in the following FETCH command.
//
// TODO: trace the memory used here
chk := rs.NewChunk(nil)
var rows []chunk.Row

// create the row container to manage spill
// this `rowContainer` will be released when the statement (or the connection) is closed.
rowContainer := chunk.NewRowContainer(crs.FieldTypes(), vars.MaxChunkSize)
rowContainer.GetMemTracker().AttachTo(vars.MemTracker)
rowContainer.GetMemTracker().SetLabel(memory.LabelForCursorFetch)
rowContainer.GetDiskTracker().AttachTo(vars.DiskTracker)
rowContainer.GetDiskTracker().SetLabel(memory.LabelForCursorFetch)
if variable.EnableTmpStorageOnOOM.Load() {
failpoint.Inject("testCursorFetchSpill", func(val failpoint.Value) {
if val, ok := val.(bool); val && ok {
actionSpill := rowContainer.ActionSpillForTest()
defer actionSpill.WaitForTest()
}
})
action := memory.NewActionWithPriority(rowContainer.ActionSpill(), memory.DefCursorFetchSpillPriority)
vars.MemTracker.FallbackOldAndSetNewAction(action)
}
defer func() {
if err != nil {
rowContainer.GetMemTracker().Detach()
rowContainer.GetDiskTracker().Detach()
errCloseRowContainer := rowContainer.Close()
if errCloseRowContainer != nil {
logutil.Logger(ctx).Error("Fail to close rowContainer in error handler. May cause resource leak",
zap.NamedError("original-error", err), zap.NamedError("close-error", errCloseRowContainer))
}
}
}()

for {
if err = rs.Next(ctx, chk); err != nil {
chk := crs.NewChunk(nil)

if err = crs.Next(ctx, chk); err != nil {
return false, err
}
rowCount := chk.NumRows()
if rowCount == 0 {
break
}
// filling fetchedRows with chunk
for i := 0; i < rowCount; i++ {
row := chk.GetRow(i)
rows = append(rows, row)

err = rowContainer.Add(chk)
if err != nil {
return false, err
}
chk = chunk.Renew(chk, vars.MaxChunkSize)
}
rs.StoreFetchedRows(rows)

stmt.StoreResultSet(rs)
if err = cc.writeColumnInfo(rs.Columns()); err != nil {
return false, err
}
if cl, ok := rs.(fetchNotifier); ok {
reader := chunk.NewRowContainerReader(rowContainer)
crs.StoreRowContainerReader(reader)
stmt.StoreResultSet(crs)
stmt.StoreRowContainer(rowContainer)
if cl, ok := crs.(fetchNotifier); ok {
cl.OnFetchReturned()
}

stmt.SetCursorActive(true)
defer func() {
if err != nil {
reader.Close()

// the resultSet and rowContainer have been closed in former "defer" statement.
stmt.StoreResultSet(nil)
stmt.StoreRowContainer(nil)
stmt.SetCursorActive(false)
}
}()

if err = cc.writeColumnInfo(crs.Columns()); err != nil {
return false, err
}

// explicitly flush columnInfo to client.
err = cc.writeEOF(ctx, cc.ctx.Status())
Expand All @@ -361,6 +431,12 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
cc.ctx.GetSessionVars().ClearAlloc(nil, false)
cc.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, true)
defer cc.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, false)
// Reset the warn count. TODO: consider whether it's better to reset the whole session context/statement context.
if cc.ctx.GetSessionVars().StmtCtx != nil {
cc.ctx.GetSessionVars().StmtCtx.SetWarnings(nil)
}
cc.ctx.GetSessionVars().SysErrorCount = 0
cc.ctx.GetSessionVars().SysWarningCount = 0

stmtID, fetchSize, err := parse.StmtFetchCmd(data)
if err != nil {
Expand All @@ -372,6 +448,21 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
return errors.Annotate(mysql.NewErr(mysql.ErrUnknownStmtHandler,
strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch"), cc.preparedStmt2String(stmtID))
}
if !stmt.GetCursorActive() {
return errors.Annotate(mysql.NewErr(mysql.ErrSpCursorNotOpen), cc.preparedStmt2String(stmtID))
}
// from now on, we have made sure: the statement has an active cursor
// then if facing any error, this cursor should be reset
defer func() {
if err != nil {
errReset := stmt.Reset()
if errReset != nil {
logutil.Logger(ctx).Error("Fail to reset statement in error handler. May cause resource leak.",
zap.NamedError("original-error", err), zap.NamedError("reset-error", errReset))
}
}
}()

if topsqlstate.TopSQLEnabled() {
prepareObj, _ := cc.preparedStmtID2CachePreparedStmt(stmtID)
if prepareObj != nil && prepareObj.SQLDigest != nil {
Expand All @@ -384,23 +475,22 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
}
cc.ctx.SetProcessInfo(sql, time.Now(), mysql.ComStmtExecute, 0)
rs := stmt.GetResultSet()
if rs == nil {
return errors.Annotate(mysql.NewErr(mysql.ErrUnknownStmtHandler,
strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch_rs"), cc.preparedStmt2String(stmtID))
}

sendingEOF := false
// if the `fetchedRows` are empty before writing result, we could say the `FETCH` command will send EOF
if len(rs.GetFetchedRows()) == 0 {
sendingEOF = true
}
_, err = cc.writeResultSet(ctx, rs, true, cc.ctx.Status(), int(fetchSize))
// if the iterator reached the end before writing result, we could say the `FETCH` command will send EOF
if rs.GetRowContainerReader().Current() == rs.GetRowContainerReader().End() {
// also reset the statement when the cursor reaches the end
// don't overwrite the `err` in outer scope, to avoid redundant `Reset()` in `defer` statement (though, it's not
// a big problem, as the `Reset()` function call is idempotent.)
err := stmt.Reset()
if err != nil {
logutil.Logger(ctx).Error("Fail to reset statement when FETCH command reaches the end. May cause resource leak",
zap.NamedError("error", err))
}
}
if err != nil {
return errors.Annotate(err, cc.preparedStmt2String(stmtID))
}
if sendingEOF {
stmt.SetCursorActive(false)
}

return nil
}
Expand Down Expand Up @@ -437,6 +527,10 @@ func (cc *clientConn) handleStmtSendLongData(data []byte) (err error) {
}

func (cc *clientConn) handleStmtReset(ctx context.Context, data []byte) (err error) {
// A reset command should reset the statement to the state when it was right after prepare
// Then the following state should be cleared:
// 1.The opened cursor, including the rowContainer (and its cursor/memTracker).
// 2.The argument sent through `SEND_LONG_DATA`.
if len(data) < 4 {
return mysql.ErrMalformPacket
}
Expand All @@ -447,8 +541,16 @@ func (cc *clientConn) handleStmtReset(ctx context.Context, data []byte) (err err
return mysql.NewErr(mysql.ErrUnknownStmtHandler,
strconv.Itoa(stmtID), "stmt_reset")
}
stmt.Reset()
stmt.StoreResultSet(nil)
err = stmt.Reset()
if err != nil {
// Both server and client cannot handle the error case well, so just left an error and return OK.
// It's fine to receive further `EXECUTE` command even the `Reset` function call failed.
logutil.Logger(ctx).Error("Fail to close statement in error handler of RESET command. May cause resource leak",
zap.NamedError("original-error", err), zap.NamedError("close-error", err))

return cc.writeOK(ctx)
}

return cc.writeOK(ctx)
}

Expand Down
Loading