Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
59339: sql: audit all usages of Query to use iterator pattern r=yuzefovich a=yuzefovich

Similarly to the previous commit (dbc8676), here we audit all usages of `Query`
method of the internal executor to take advantage of the iterator API
wherever possible (or switching to `Exec` or `QueryRow`).
`QueryBuffered` has been added to the interface too.

The only place where it would be beneficial to use the iterator pattern
but it is not done currently is for `SHOW STATISTICS` statement - in
there, we have a panic-catcher which works only on the assumption of not
updating any of the shared state (which the iterator API contradicts).
Refactoring that part is left as a TODO.

Fixes: #48595.

Release justification: low-risk update to existing functionality.

Release note: None

60283: sql: improved diff tool for any namespace r=rafiss a=mnovelodou

Previously, diff tool worked only for pg_catalog
This was inadequate because it can be used for information_schema as well
To address this, this patch takes the namespace as parameter to compare a
different database

Release note: None
Release justification: non-production code changes

Fixes #58037

61263: roachtest: mark pgx prepared stmt test as passing r=otan a=rafiss

fixes #61250 

Release justification: testing only change

Release note: None

61304: ccl: test that local scan is planned for RBR table with computed region r=rytaft a=rytaft

This commit adds a test that a local scan is planned for a `REGIONAL BY ROW`
table with a computed region column.

Informs #57722

Release justification: non-production code changes.

Release note: None

61321: sql: fix tracing of postqueries r=yuzefovich a=yuzefovich

When we're performing tracing, we derive a special `consumeCtx` when
creating a DistSQLReceiver for the whole plan. That context should be
used for all components of the physical plan (main query, sub- and
post-queries), and the span needs to be finished by calling the stored
`cleanup` function. Previously, this was called in `ProducerDone` of the
main query which resulted in the span being finished before the
post-queries are run. As a result, the tracing spans for cascades and
checks could be incomplete.

This commit fixes the problem by delaying the finish of the span until
the DistSQLReceiver is released (since it is a convenient place that all
callers of `MakeDistSQLReceiver` call in a deferred invocation).

Release justification: bug fix.

Release note (bug fix): Previously, the traces of cascades and checks
could be incomplete, and now it is fixed.

61348: sql: fixing information_schema.columns.is_identity r=rafiss a=mnovelodou

Previously, information_schema.columns.is_identity was set to null
This was inadequate because is incorrect value
To address this, this patch sets column to NO as identity columns
are not yet supported

Release justification: bug fixes and low-risk updates to new functionality
Release note (sql change): fixed information_schema.columns.is_identity to
display the correct value

Fixes #61011

61375: kvserver: add a log.Scope r=andreimatei a=andreimatei

Release note: None
Release justification: Test only.

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: MiguelNovelo <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Rebecca Taft <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
6 people committed Mar 3, 2021
8 parents e1e5d5a + 10ede95 + f95040f + ad69471 + a4c77ac + cb3e0aa + 0ae5cb7 + 176cbfd commit 4ab29d4
Show file tree
Hide file tree
Showing 37 changed files with 6,176 additions and 184 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ func backupPlanHook(

// Include all tenants.
// TODO(tbg): make conditional on cluster setting.
tenantRows, err = p.ExecCfg().InternalExecutor.Query(
tenantRows, err = p.ExecCfg().InternalExecutor.QueryBuffered(
ctx, "backup-lookup-tenant", p.ExtendedEvalContext().Txn,
`SELECT id, active, info FROM system.tenants`,
)
Expand Down
11 changes: 11 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/regional_by_row
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,17 @@ INSERT INTO regional_by_row_table_as (pk, a, b) VALUES (30, 1, 1)
statement error pq: duplicate key value violates unique constraint "regional_by_row_table_as_b_key"\nDETAIL: Key \(b\)=\(1\) already exists\.
INSERT INTO regional_by_row_table_as (pk, a, b) VALUES (2, 1, 1)

# Verify that we plan single-region scans for REGIONAL BY ROW tables with a computed region.
query T
EXPLAIN SELECT * FROM regional_by_row_table_as WHERE pk = 10
----
distribution: local
vectorized: true
·
• scan
missing stats
table: regional_by_row_table_as@primary
spans: [/'us-east-1'/10 - /'us-east-1'/10]

# Tests for altering the survivability of a REGIONAL BY ROW table.
statement ok
Expand Down
16 changes: 10 additions & 6 deletions pkg/cli/initial_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// runInitialSQL concerns itself with running "initial SQL" code when
Expand Down Expand Up @@ -80,21 +81,24 @@ func createAdminUser(ctx context.Context, s *server.Server, adminUser, adminPass
// given server object.
func cliDisableReplication(ctx context.Context, s *server.Server) error {
return s.RunLocalSQL(ctx,
func(ctx context.Context, ie *sql.InternalExecutor) error {
rows, err := ie.Query(ctx, "get-zones", nil,
func(ctx context.Context, ie *sql.InternalExecutor) (retErr error) {
it, err := ie.QueryIterator(ctx, "get-zones", nil,
"SELECT target FROM crdb_internal.zones")
if err != nil {
return err
}
// We have to make sure to close the iterator since we might return
// from the for loop early (before Next() returns false).
defer func() { retErr = errors.CombineErrors(retErr, it.Close()) }()

for _, row := range rows {
zone := string(*row[0].(*tree.DString))
var ok bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
zone := string(*it.Cur()[0].(*tree.DString))
if _, err := ie.Exec(ctx, "set-zone", nil,
fmt.Sprintf("ALTER %s CONFIGURE ZONE USING num_replicas = 1", zone)); err != nil {
return err
}
}

return nil
return err
})
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")

go_library(
name = "generate-pg-catalog_lib",
name = "generate-postgres-metadata-tables_lib",
srcs = ["main.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/cmd/generate-pg-catalog",
importpath = "github.com/cockroachdb/cockroach/pkg/cmd/generate-postgres-metadata-tables",
visibility = ["//visibility:private"],
deps = [
"//pkg/sql",
Expand All @@ -12,7 +12,7 @@ go_library(
)

go_binary(
name = "generate-pg-catalog",
embed = [":generate-pg-catalog_lib"],
name = "generate-postgres-metadata-tables",
embed = [":generate-postgres-metadata-tables_lib"],
visibility = ["//visibility:public"],
)
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,18 @@ import (
const getServerVersion = `SELECT current_setting('server_version');`

var (
postgresAddr = flag.String("addr", "localhost:5432", "Postgres server address")
postgresUser = flag.String("user", "postgres", "Postgres user")
postgresAddr = flag.String("addr", "localhost:5432", "Postgres server address")
postgresUser = flag.String("user", "postgres", "Postgres user")
postgresSchema = flag.String("catalog", "pg_catalog", "Catalog or namespace, default: pg_catalog")
)

func main() {
flag.Parse()
db := connect()
defer closeDB(db)
pgCatalogFile := &sql.PGCatalogFile{
PgVersion: getPGVersion(db),
PgCatalog: sql.PGCatalogTables{},
pgCatalogFile := &sql.PGMetadataFile{
PGVersion: getPGVersion(db),
PGMetadata: sql.PGMetadataTables{},
}

rows := describePgCatalog(db)
Expand All @@ -55,14 +56,14 @@ func main() {
if err := rows.Scan(&table, &column, &dataType, &dataTypeOid); err != nil {
panic(err)
}
pgCatalogFile.PgCatalog.AddColumnMetadata(table, column, dataType, dataTypeOid)
pgCatalogFile.PGMetadata.AddColumnMetadata(table, column, dataType, dataTypeOid)
}

pgCatalogFile.Save(os.Stdout)
}

func describePgCatalog(conn *pgx.Conn) *pgx.Rows {
rows, err := conn.Query(sql.GetPGCatalogSQL)
rows, err := conn.Query(sql.GetPGMetadataSQL, *postgresSchema)
if err != nil {
panic(err)
}
Expand Down
37 changes: 36 additions & 1 deletion pkg/cmd/roachtest/pgx_blocklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,42 @@ var pgxBlocklists = blocklistsForVersion{
// Please keep these lists alphabetized for easy diffing.
// After a failed run, an updated version of this blocklist should be available
// in the test log.
var pgxBlocklist21_1 = pgxBlocklist20_2
var pgxBlocklist21_1 = blocklist{
"v4.Example_CustomType": "27796",
"v4.TestConnBeginBatchDeferredError": "31632",
"v4.TestConnCopyFromLarge": "52722",
"v4.TestConnQueryDeferredError": "31632",
"v4.TestConnQueryErrorWhileReturningRows": "26925",
"v4.TestConnQueryReadRowMultipleTimes": "26925",
"v4.TestConnQueryValues": "26925",
"v4.TestConnSendBatch": "44712",
"v4.TestConnSimpleProtocol": "21286",
"v4.TestConnSimpleProtocolRefusesNonStandardConformingStrings": "36215",
"v4.TestConnSimpleProtocolRefusesNonUTF8ClientEncoding": "37129",
"v4.TestDomainType": "27796",
"v4.TestFatalRxError": "35897",
"v4.TestFatalTxError": "35897",
"v4.TestInetCIDRArrayTranscodeIP": "18846",
"v4.TestInetCIDRArrayTranscodeIPNet": "18846",
"v4.TestInetCIDRTranscodeIP": "18846",
"v4.TestInetCIDRTranscodeIPNet": "18846",
"v4.TestInetCIDRTranscodeWithJustIP": "18846",
"v4.TestLargeObjects": "26725",
"v4.TestLargeObjectsMultipleTransactions": "26725",
"v4.TestLargeObjectsPreferSimpleProtocol": "26725",
"v4.TestListenNotify": "41522",
"v4.TestListenNotifySelfNotification": "41522",
"v4.TestListenNotifyWhileBusyIsSafe": "41522",
"v4.TestQueryContextErrorWhileReceivingRows": "26925",
"v4.TestRowDecode": "26925",
"v4.TestTransactionSuccessfulCommit": "31632",
"v4.TestTransactionSuccessfulRollback": "31632",
"v4.TestTxCommitSerializationFailure": "12701",
"v4.TestTxCommitWhenTxBroken": "31632",
"v4.TestTxNestedTransactionCommit": "31632",
"v4.TestTxNestedTransactionRollback": "31632",
"v4.TestUnregisteredTypeUsableAsStringArgumentAndBaseResult": "27796",
}

var pgxBlocklist20_2 = blocklist{
"v4.Example_CustomType": "27796",
Expand Down
13 changes: 1 addition & 12 deletions pkg/jobs/deprecated.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,14 @@ func (r *Registry) deprecatedMaybeAdoptJob(
SELECT id, payload, progress IS NULL, status
FROM system.jobs
WHERE status IN ($1, $2, $3, $4, $5) ORDER BY created DESC`
it, err := r.ex.QueryIterator(
rows, err := r.ex.QueryBuffered(
ctx, "adopt-job", nil /* txn */, stmt,
StatusPending, StatusRunning, StatusCancelRequested, StatusPauseRequested, StatusReverting,
)
if err != nil {
return errors.Wrap(err, "failed querying for jobs")
}

// TODO(yuzefovich): use QueryBuffered method once it is added to
// sqlutil.InternalExecutor interface.
var rows []tree.Datums
var ok bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
rows = append(rows, it.Cur())
}
if err != nil {
return errors.Wrap(err, "failed querying for jobs")
}

if randomizeJobOrder {
rand.Seed(timeutil.Now().UnixNano())
rand.Shuffle(len(rows), func(i, j int) { rows[i], rows[j] = rows[j], rows[i] })
Expand Down
7 changes: 1 addition & 6 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,7 @@ func (s *jobScheduler) executeSchedules(

// We have to make sure to close the iterator since we might return from the
// for loop early (before Next() returns false).
defer func() {
closeErr := it.Close()
if retErr == nil {
retErr = closeErr
}
}()
defer func() { retErr = errors.CombineErrors(retErr, it.Close()) }()

// The loop below might encounter an error after some schedules have been
// executed (i.e. previous iterations succeeded), and this is ok.
Expand Down
7 changes: 1 addition & 6 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,12 +919,7 @@ func (r *Registry) cleanupOldJobsPage(
}
// We have to make sure to close the iterator since we might return from the
// for loop early (before Next() returns false).
defer func() {
closeErr := it.Close()
if retErr == nil {
retErr = closeErr
}
}()
defer func() { retErr = errors.CombineErrors(retErr, it.Close()) }()
toDelete := tree.NewDArray(types.Int)
oldMicros := timeutil.ToUnixMicros(olderThan)

Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,12 @@ func (ie *wrappedInternalExecutor) QueryRowExWithCols(
panic("not implemented")
}

func (ie *wrappedInternalExecutor) QueryBuffered(
ctx context.Context, opName string, txn *kv.Txn, stmt string, qargs ...interface{},
) ([]tree.Datums, error) {
panic("not implemented")
}

func (ie *wrappedInternalExecutor) QueryBufferedEx(
ctx context.Context,
opName string,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func TestAddReplicaViaLearner(t *testing.T) {

func TestAddRemoveNonVotingReplicasBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
knobs, ltk := makeReplicationTestKnobs()
Expand Down
12 changes: 1 addition & 11 deletions pkg/migration/migrationmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,17 +310,7 @@ SELECT id, status
if err != nil {
return false, 0, errors.Wrap(err, "failed to marshal version to JSON")
}
// TODO(yuzefovich): use QueryBuffered method once it is added to
// sqlutil.InternalExecutor interface.
it, err := m.ie.QueryIterator(ctx, "migration-manager-find-jobs", txn, query, jsonMsg.String())
if err != nil {
return false, 0, err
}
var rows []tree.Datums
var ok bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
rows = append(rows, it.Cur())
}
rows, err := m.ie.QueryBuffered(ctx, "migration-manager-find-jobs", txn, query, jsonMsg.String())
if err != nil {
return false, 0, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ go_library(
"partition.go",
"partition_utils.go",
"pg_catalog.go",
"pg_catalog_diff.go",
"pg_extension.go",
"pg_metadata_diff.go",
"plan.go",
"plan_batch.go",
"plan_columns.go",
Expand Down Expand Up @@ -449,7 +449,7 @@ go_test(
"namespace_test.go",
"old_foreign_key_desc_test.go",
"partition_test.go",
"pg_catalog_test.go",
"pg_metadata_test.go",
"pg_oid_test.go",
"pgwire_internal_test.go",
"plan_opt_test.go",
Expand Down
9 changes: 7 additions & 2 deletions pkg/sql/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,14 +385,16 @@ func (p *planner) resolveMemberOfWithAdminOption(
}
visited[m] = struct{}{}

rows, err := p.ExecCfg().InternalExecutor.Query(
it, err := p.ExecCfg().InternalExecutor.QueryIterator(
ctx, "expand-roles", txn, lookupRolesStmt, m.Normalized(),
)
if err != nil {
return nil, err
}

for _, row := range rows {
var ok bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
row := it.Cur()
roleName := tree.MustBeDString(row[0])
isAdmin := row[1].(*tree.DBool)

Expand All @@ -403,6 +405,9 @@ func (p *planner) resolveMemberOfWithAdminOption(
// We need to expand this role. Let the "pop" worry about already-visited elements.
toVisit = append(toVisit, role)
}
if err != nil {
return nil, err
}
}

return ret, nil
Expand Down
13 changes: 2 additions & 11 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -2081,19 +2081,10 @@ SELECT "descID", version, expiration FROM system.public.lease AS OF SYSTEM TIME
retryOptions.Closer = m.stopper.ShouldQuiesce()
// The retry is required because of errors caused by node restarts. Retry 30 times.
if err := retry.WithMaxAttempts(ctx, retryOptions, 30, func() error {
it, err := m.storage.internalExecutor.QueryIterator(
var err error
rows, err = m.storage.internalExecutor.QueryBuffered(
ctx, "read orphaned leases", nil /*txn*/, sqlQuery,
)
if err != nil {
return err
}
rows = rows[:0]
// TODO(yuzefovich): use QueryBuffered method once it is added to
// sqlutil.InternalExecutor interface.
var ok bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
rows = append(rows, it.Cur())
}
return err
}); err != nil {
log.Warningf(ctx, "unable to read orphaned leases: %+v", err)
Expand Down
Loading

0 comments on commit 4ab29d4

Please sign in to comment.