Skip to content

Commit

Permalink
sql: audit all usages of QueryEx to use iterator pattern
Browse files Browse the repository at this point in the history
This commit audits the usage of `QueryEx` method of the internal
executor in the following manner:
- if the caller only needed to execute the statement, `ExecEx` is now
used
- if the query can return at most one row, then `QueryRowEx` is now used
- if the caller can be refactored to use the iterator pattern, it is
done so.

As a result, almost all usages have been refactored (most notably the
virtual `crdb_internal.jobs` table now uses the iterator pattern, thus
aleviating OOM concerns - the ad-hoc memory accounting logic has been
removed).

`QueryEx` has been renamed to `QueryBufferedEx` to highlight that the
full buffering occurs, and it was added to `sqlutil.InternalExecutor`
interface. The method is now used only in three places.

Release justification: bug fix.

Release note (bug fix): `crdb_internal.jobs` virtual table is now
populated in a paginated fashion, thus, alleviating memory related
concerns when previously we could encounter OOM crash.
  • Loading branch information
yuzefovich committed Feb 26, 2021
1 parent 7df2237 commit dbc8676
Show file tree
Hide file tree
Showing 18 changed files with 339 additions and 248 deletions.
20 changes: 5 additions & 15 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,11 @@ func (r *Registry) runJob(

func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqlliveness.Session) error {
return r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
it, err := r.ex.QueryIteratorEx(
// Note that we have to buffer all rows first - before processing each
// job - because we have to make sure that the query executes without an
// error (otherwise, the system.jobs table might diverge from the jobs
// registry).
rows, err := r.ex.QueryBufferedEx(
ctx, "cancel/pause-requested", txn, sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, `
UPDATE system.jobs
SET status =
Expand All @@ -283,20 +287,6 @@ RETURNING id, status`,
if err != nil {
return errors.Wrap(err, "could not query jobs table")
}
// Note that we have to buffer all rows first - before processing each
// job - because we have to make sure that the query executes without an
// error (otherwise, the system.jobs table might diverge from the jobs
// registry).
// TODO(yuzefovich): use QueryBufferedEx method once it is added to
// sqlutil.InternalExecutor interface.
var rows []tree.Datums
var ok bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
rows = append(rows, it.Cur())
}
if err != nil {
return errors.Wrap(err, "could not query jobs table")
}
for _, row := range rows {
id := jobspb.JobID(*row[0].(*tree.DInt))
job := &Job{id: id, registry: r}
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,17 @@ func (ie *wrappedInternalExecutor) QueryRowExWithCols(
panic("not implemented")
}

func (ie *wrappedInternalExecutor) QueryBufferedEx(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
stmt string,
qargs ...interface{},
) ([]tree.Datums, error) {
panic("not implemented")
}

func (ie *wrappedInternalExecutor) QueryIterator(
ctx context.Context, opName string, txn *kv.Txn, stmt string, qargs ...interface{},
) (sqlutil.InternalRows, error) {
Expand Down
Loading

0 comments on commit dbc8676

Please sign in to comment.