Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
94355: pgwire: support decoding binary regXXX types r=rafiss a=jordanlewis

Fixes #94353

Epic: None

Release note (bug fix): support receiving regXXX type values in binary extended protocol

94372: roachtest: change fingerprinting builtin in c2c roachtest r=msbutler,stevendanna a=adityamaru

If no rangekeys are encountered during
fingerprinting the underlying SSTWriter does not write
any data to the SST, but still produces a non-empty
SST file. This file contains an empty data block, properties and
a footer that amount to 795 bytes. Since the file does
not contain any rangekeys it is going to be thrown away
on the client side, and so ferrying this file in the
ExportResponse is wasteful.

Experiments on the c2c/kv0 roachtest showed that fingerprinting
a single range ~250MB would result in 600k+ empty files. This magnitude
of pagination was all attributable to the elastic CPU limiter that
preempts an ExportRequest if it is consuming more CPU than it
was allotted. 600k+ empty files meant that we were buffering close
to 500MB of ExportResponse_Files (600k * 790bytes) on the node
serving the ExportRequest. This was then shipped over grpc to the
client where we were opening a multi-mem iterator over these 600k+ empty
files causing a node with 15GiB RAM to OOM.

Since fingerprinting does not set a `TargetBytes` on the export requests
it sends, in a cluster with more than one ranage there could be several
requests concurrently buffering these empty files, resulting in an even
more pronounced memory blowup.

This change ensures that the MemFile is nil'ed out if there
are no rangekeys encountered during the fingerprinting.

This change also returns nil values for the `BulkOpSummary`
and `Span` field of the `ExportResponse` when generated as
part of fingerprinting. These fields are not used and show
up in memory profiles of the c2c/kv0 roachtest.

Release note: None

Informs: #93078

95100: multitenant: support crdb_internal.list_sql_keys_in_range() for secondary tenants r=knz,arulajmani a=ecwall

Fixes #95006

The crdb_internal.list_sql_keys_in_range builtin now uses the RangeDescIterator
and scopes the range span by the tenant span to prevent these errors:
```
ERROR: RangeIterator failed to seek to /Meta2/"\x00": rpc error: code = Unauthenticated desc = requested key /Meta2/"\x00" not fully contained in tenant keyspace /Tenant/1{2-3}
```
    
----

Fixes #92072

The last range of the last tenant's end key is `/Max` which is outside of the
tenant's key space.

This is fixed by adding a split at the end of the tenant's key space when the
tenant is created. For example, when tenant 10 is created, a split is added at
/tenant/11.

Release note: None

95276: *: stop swallowing errors from privilege checks r=ecwall a=rafiss

sql: add HasPrivilege and HasAnyPrivilege to AuthorizationAccessor
*: stop swallowing errors from CheckPrivilegeForUser
*: stop swallowing errors from CheckAnyPrivilege
*: stop swallowing errors from CheckPrivilege
server: don't swallow error in hasGlobalPrivilege

Instead of swallowing errors, we use the new Has*Privilege functions. If
there was an error that causes the transaction to abort, it's important
to propagate it.

Epic: None
Release note: None

95482: sql: fix a race with finishing a span and executing RPCs r=yuzefovich a=yuzefovich

This commit fixes a race that was introduced when we fully parallelized the execution of the setup of the remote flows. In particular, it became possible for a `setup-flow-async` "runner" tracing span be finished before a concurrent `SetupFlow` RPC is issued, which uses that span as a parent for the RPC span. The issue is only present in the case that we didn't spin up a "listener" goroutine for whatever reason (most likely because the server is quescing), in which case we didn't block to wait for the concurrent RPCs to be evaluated. We canceled the context of those RPCs, but it appears that the context cancellation is not immediately noticed by the RPC call (or at least it is noticed _after_ the gRPC interceptor code attempt to create a new tracing span). This is now fixed by blocking the cleanup until all concurrently-issued RPCs are completed.

Fixes: #92809.

Release note: None

Co-authored-by: Jordan Lewis <[email protected]>
Co-authored-by: adityamaru <[email protected]>
Co-authored-by: Evan Wall <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
6 people committed Jan 19, 2023
6 parents 942b55e + 7a4b6cc + 4734dec + 5cce084 + 6fb8cc2 + 4bde78b commit b6434c9
Show file tree
Hide file tree
Showing 54 changed files with 851 additions and 374 deletions.
18 changes: 14 additions & 4 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,13 @@ func checkPrivilegesForBackup(
for _, desc := range targetDescs {
switch desc := desc.(type) {
case catalog.DatabaseDescriptor:
hasRequiredBackupPrivileges = hasRequiredBackupPrivileges &&
p.CheckPrivilegeForUser(ctx, desc, privilege.BACKUP, p.User()) == nil
if hasRequiredBackupPrivileges {
if ok, err := p.HasPrivilege(ctx, desc, privilege.BACKUP, p.User()); err != nil {
return err
} else {
hasRequiredBackupPrivileges = ok
}
}
}
}
} else if backupStmt.Targets.Tables.TablePatterns != nil {
Expand All @@ -439,8 +444,13 @@ func checkPrivilegesForBackup(
for _, desc := range targetDescs {
switch desc := desc.(type) {
case catalog.TableDescriptor:
hasRequiredBackupPrivileges = hasRequiredBackupPrivileges &&
p.CheckPrivilegeForUser(ctx, desc, privilege.BACKUP, p.User()) == nil
if hasRequiredBackupPrivileges {
if ok, err := p.HasPrivilege(ctx, desc, privilege.BACKUP, p.User()); err != nil {
return err
} else {
hasRequiredBackupPrivileges = ok
}
}
}
}
} else {
Expand Down
11 changes: 9 additions & 2 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,9 @@ func checkRestoreDestinationPrivileges(
func checkRestorePrivilegesOnDatabase(
ctx context.Context, p sql.PlanHookState, parentDB catalog.DatabaseDescriptor,
) (shouldBufferNotice bool, err error) {
if err := p.CheckPrivilege(ctx, parentDB, privilege.RESTORE); err == nil {
if ok, err := p.HasPrivilege(ctx, parentDB, privilege.RESTORE, p.User()); err != nil {
return false, err
} else if ok {
return false, nil
}

Expand Down Expand Up @@ -1275,7 +1277,12 @@ func checkPrivilegesForRestore(
// error. In 22.2 we continue to check for old style privileges and role
// options.
if len(restoreStmt.Targets.Databases) > 0 {
hasRestoreSystemPrivilege := p.CheckPrivilegeForUser(ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.RESTORE, p.User()) == nil
var hasRestoreSystemPrivilege bool
if ok, err := p.HasPrivilege(ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.RESTORE, p.User()); err != nil {
return err
} else {
hasRestoreSystemPrivilege = ok
}
if hasRestoreSystemPrivilege {
return checkRestoreDestinationPrivileges(ctx, p, from)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,12 @@ txn_id txn_fingerprint_id query implicit_txn session_id start_time end_tim
query ITTI
SELECT range_id, start_pretty, end_pretty, lease_holder FROM crdb_internal.ranges
----
55 /Tenant/10 /Max 1
55 /Tenant/10 /Tenant/11 1

query ITT
SELECT range_id, start_pretty, end_pretty FROM crdb_internal.ranges_no_leases
----
55 /Tenant/10 /Max
55 /Tenant/10 /Tenant/11

query IT
SELECT zone_id, target FROM crdb_internal.zones ORDER BY 1
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ state offset=47
/Table/5{3-4} database system (host)
/Tenant/10{-\x00} database system (tenant)
/Tenant/11{-\x00} database system (tenant)
/Tenant/12{-\x00} database system (tenant)

# Start the reconciliation loop for the secondary tenant.
reconcile tenant=10
Expand Down Expand Up @@ -127,6 +128,7 @@ state offset=47
/Tenant/10/Table/5{2-3} database system (tenant)
/Tenant/10/Table/5{3-4} database system (tenant)
/Tenant/11{-\x00} database system (tenant)
/Tenant/12{-\x00} database system (tenant)

exec-sql tenant=10
CREATE DATABASE db;
Expand Down Expand Up @@ -165,3 +167,4 @@ state offset=81
/Tenant/10/Table/11{2-3} range default
/Tenant/10/Table/11{3-4} range default
/Tenant/11{-\x00} database system (tenant)
/Tenant/12{-\x00} database system (tenant)
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ state offset=47
/Table/5{2-3} database system (host)
/Table/5{3-4} database system (host)
/Tenant/10{-\x00} database system (tenant)
/Tenant/11{-\x00} database system (tenant)

# Write a protected timestamp record on the system tenant cluster.
protect record-id=1 ts=1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ state offset=47
/Table/5{3-4} database system (host)
/Tenant/10{-\x00} database system (tenant)
/Tenant/11{-\x00} ttl_seconds=18000 ignore_strict_gc=true rangefeed_enabled=true
/Tenant/12{-\x00} ttl_seconds=18000 ignore_strict_gc=true rangefeed_enabled=true

# Start the reconciliation loop for the tenant=10. It should have the vanilla
# RANGE DEFAULT. Check against the underlying KV state, the SQL view of the
Expand Down Expand Up @@ -105,6 +106,7 @@ state offset=47
/Tenant/10/Table/5{2-3} database system (tenant)
/Tenant/10/Table/5{3-4} database system (tenant)
/Tenant/11{-\x00} ttl_seconds=18000 ignore_strict_gc=true rangefeed_enabled=true
/Tenant/12{-\x00} ttl_seconds=18000 ignore_strict_gc=true rangefeed_enabled=true

query-sql tenant=10
SHOW ZONE CONFIGURATION FOR RANGE DEFAULT
Expand Down Expand Up @@ -192,6 +194,7 @@ state offset=81
/Tenant/11/Table/5{1-2} ttl_seconds=18000 ignore_strict_gc=true rangefeed_enabled=true
/Tenant/11/Table/5{2-3} ttl_seconds=18000 ignore_strict_gc=true rangefeed_enabled=true
/Tenant/11/Table/5{3-4} ttl_seconds=18000 ignore_strict_gc=true rangefeed_enabled=true
/Tenant/12{-\x00} ttl_seconds=18000 ignore_strict_gc=true rangefeed_enabled=true

query-sql tenant=11
SHOW ZONE CONFIGURATION FOR RANGE DEFAULT
Expand Down
7 changes: 5 additions & 2 deletions pkg/cloud/cloudprivilege/privileges.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ func CheckDestinationPrivileges(ctx context.Context, p sql.PlanHookState, to []s
// Check if the destination requires the user to be an admin or have the
// `EXTERNALIOIMPLICITACCESS` privilege.
requiresImplicitAccess := !conf.AccessIsWithExplicitAuth()
hasImplicitAccessPrivilege :=
p.CheckPrivilege(ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.EXTERNALIOIMPLICITACCESS) == nil
hasImplicitAccessPrivilege, privErr :=
p.HasPrivilege(ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.EXTERNALIOIMPLICITACCESS, p.User())
if privErr != nil {
return privErr
}
if requiresImplicitAccess && !(p.ExecCfg().ExternalIODirConfig.EnableNonAdminImplicitAndArbitraryOutbound || hasImplicitAccessPrivilege) {
return pgerror.Newf(
pgcode.InsufficientPrivilege,
Expand Down
61 changes: 43 additions & 18 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/vm/local"
Expand Down Expand Up @@ -159,6 +160,10 @@ type c2cMetrics struct {
cutoverStart sizeTime

cutoverEnd sizeTime

fingerprintingStart time.Time

fingerprintingEnd time.Time
}

func (m c2cMetrics) export() map[string]exportedMetric {
Expand Down Expand Up @@ -427,7 +432,7 @@ func registerClusterToCluster(r registry.Registry) {
})

cutoverTime := chooseCutover(t, setup.dst.sql, workloadDuration, sp.cutover)
t.Status("cutover time chosen: %s", cutoverTime.String())
t.Status(fmt.Sprintf("cutover time chosen: %s", cutoverTime.String()))

// TODO(ssd): The job doesn't record the initial
// statement time, so we can't correctly measure the
Expand All @@ -446,23 +451,19 @@ func registerClusterToCluster(r registry.Registry) {
m.Wait()

t.Status("waiting for replication stream to cutover")
retainedTime := getReplicationRetainedTime(t, setup.dst.sql, roachpb.TenantName(setup.dst.name))
setup.metrics.cutoverStart = newSizeTime(ctx, du, setup.dst.kvNodes)
stopReplicationStream(t, setup.dst.sql, ingestionJobID, cutoverTime)
setup.metrics.cutoverEnd = newSizeTime(ctx, du, setup.dst.kvNodes)

t.Status("comparing fingerprints")
// Currently, it takes about 15 minutes to generate a fingerprint for
// about 30 GB of data. Once the fingerprinting job is used instead,
// this should only take about 5 seconds for the same amount of data. At
// that point, we should increase the number of warehouses in this test.
//
// The new fingerprinting job currently OOMs this test. Once it becomes
// more efficient, it will be used.
compareTenantFingerprintsAtTimestamp(
t,
m,
setup,
hlc.Timestamp{WallTime: cutoverTime.UnixNano()})
retainedTime,
cutoverTime,
)
lv.assertValid(t)

// TODO(msbutler): export metrics to roachperf or prom/grafana
Expand Down Expand Up @@ -492,24 +493,37 @@ func chooseCutover(
}

func compareTenantFingerprintsAtTimestamp(
t test.Test, m cluster.Monitor, setup *c2cSetup, ts hlc.Timestamp,
t test.Test, m cluster.Monitor, setup *c2cSetup, startTime, endTime time.Time,
) {
t.Status(fmt.Sprintf("comparing tenant fingerprints between start time %s and end time %s", startTime.UTC(), endTime.UTC()))

// TODO(adityamaru,lidorcarmel): Once we agree on the format and precision we
// display all user facing timestamps with, we should revisit how we format
// the start time to ensure we are fingerprinting from the most accurate lower
// bound.
microSecondRFC3339Format := "2006-01-02 15:04:05.999999"
startTimeStr := startTime.Format(microSecondRFC3339Format)
aost := hlc.Timestamp{WallTime: endTime.UnixNano()}.AsOfSystemTime()
fingerprintQuery := fmt.Sprintf(`
SELECT
xor_agg(
fnv64(crdb_internal.trim_tenant_prefix(key),
substring(value from 5))
) AS fingerprint
FROM crdb_internal.scan(crdb_internal.tenant_span($1::INT))
AS OF SYSTEM TIME '%s'`, ts.AsOfSystemTime())
SELECT *
FROM crdb_internal.fingerprint(crdb_internal.tenant_span($1::INT), '%s'::TIMESTAMPTZ, true)
AS OF SYSTEM TIME '%s'`, startTimeStr, aost)

var srcFingerprint int64
m.Go(func(ctx context.Context) error {
setup.src.sql.QueryRow(t, fingerprintQuery, setup.src.ID).Scan(&srcFingerprint)
return nil
})
var destFingerprint int64
setup.dst.sql.QueryRow(t, fingerprintQuery, setup.dst.ID).Scan(&destFingerprint)
m.Go(func(ctx context.Context) error {
// TODO(adityamaru): Measure and record fingerprinting throughput.
setup.metrics.fingerprintingStart = timeutil.Now()
setup.dst.sql.QueryRow(t, fingerprintQuery, setup.dst.ID).Scan(&destFingerprint)
setup.metrics.fingerprintingEnd = timeutil.Now()
fingerprintingDuration := setup.metrics.fingerprintingEnd.Sub(setup.metrics.fingerprintingStart).String()
t.L().Printf("fingerprinting the destination tenant took %s", fingerprintingDuration)
return nil
})

// If the goroutine gets cancelled or fataled, return before comparing fingerprints.
require.NoError(t, m.WaitE())
Expand Down Expand Up @@ -573,6 +587,17 @@ func waitForHighWatermark(t test.Test, db *gosql.DB, ingestionJobID int, wait ti
}, wait)
}

// getReplicationRetainedTime returns the `retained_time` of the replication
// job.
func getReplicationRetainedTime(
t test.Test, destSQL *sqlutils.SQLRunner, destTenantName roachpb.TenantName,
) time.Time {
var retainedTime time.Time
destSQL.QueryRow(t, `SELECT retained_time FROM [SHOW TENANT $1 WITH REPLICATION STATUS]`,
destTenantName).Scan(&retainedTime)
return retainedTime
}

func stopReplicationStream(
t test.Test, destSQL *sqlutils.SQLRunner, ingestionJob int, cutoverTime time.Time,
) {
Expand Down
38 changes: 0 additions & 38 deletions pkg/kv/kvclient/scan_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,41 +38,3 @@ func ScanMetaKVs(ctx context.Context, txn *kv.Txn, span roachpb.Span) ([]kv.KeyV
}
return kvs, nil
}

// GetRangeWithID returns the RangeDescriptor with the requested id, or nil if
// no such range exists. Note that it performs a scan over the meta.
func GetRangeWithID(
ctx context.Context, txn *kv.Txn, id roachpb.RangeID,
) (*roachpb.RangeDescriptor, error) {
// Scan the range meta K/V's to find the target range. We do this in a
// chunk-wise fashion to avoid loading all ranges into memory.
var ranges []kv.KeyValue
var err error
var rangeDesc roachpb.RangeDescriptor
const chunkSize = 100
metaStart := keys.RangeMetaKey(keys.MustAddr(keys.MinKey).Next())
metaEnd := keys.MustAddr(keys.Meta2Prefix.PrefixEnd())
for {
// Scan a batch of ranges.
ranges, err = txn.Scan(ctx, metaStart, metaEnd, chunkSize)
if err != nil {
return nil, err
}
// If no results were returned, then exit.
if len(ranges) == 0 {
break
}
for _, r := range ranges {
if err := r.ValueProto(&rangeDesc); err != nil {
return nil, err
}
// Look for a range that matches the target range ID.
if rangeDesc.RangeID == id {
return &rangeDesc, nil
}
}
// Set the next starting point to after the last key in this batch.
metaStart = keys.MustAddr(ranges[len(ranges)-1].Key.Next())
}
return nil, nil
}
60 changes: 46 additions & 14 deletions pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ func evalExport(
resumeKeyTS = args.ResumeKeyTS
}

maybeAnnotateExceedMaxSizeError := func(err error) error {
if errors.HasType(err, (*storage.ExceedMaxSizeError)(nil)) {
return errors.WithHintf(err,
"consider increasing cluster setting %q", MaxExportOverageSetting)
}
return err
}

var curSizeOfExportedSSTs int64
for start := args.Key; start != nil; {
destFile := &storage.MemFile{}
Expand All @@ -186,16 +194,26 @@ func evalExport(
StripTenantPrefix: true,
StripValueChecksum: true,
}
summary, resume, fingerprint, err = storage.MVCCExportFingerprint(ctx, cArgs.EvalCtx.ClusterSettings(), reader, opts, destFile)
var hasRangeKeys bool
summary, resume, fingerprint, hasRangeKeys, err = storage.MVCCExportFingerprint(ctx,
cArgs.EvalCtx.ClusterSettings(), reader, opts, destFile)
if err != nil {
return result.Result{}, maybeAnnotateExceedMaxSizeError(err)
}

// If no range keys were encountered during fingerprinting then we zero
// out the underlying SST file as there is no use in sending an empty file
// part of the ExportResponse. This frees up the memory used by the empty
// SST file.
if !hasRangeKeys {
destFile = &storage.MemFile{}
}
} else {
summary, resume, err = storage.MVCCExportToSST(ctx, cArgs.EvalCtx.ClusterSettings(), reader, opts, destFile)
}
if err != nil {
if errors.HasType(err, (*storage.ExceedMaxSizeError)(nil)) {
err = errors.WithHintf(err,
"consider increasing cluster setting %q", MaxExportOverageSetting)
summary, resume, err = storage.MVCCExportToSST(ctx, cArgs.EvalCtx.ClusterSettings(), reader,
opts, destFile)
if err != nil {
return result.Result{}, maybeAnnotateExceedMaxSizeError(err)
}
return result.Result{}, err
}
data := destFile.Data()

Expand All @@ -222,12 +240,26 @@ func evalExport(
} else {
span.EndKey = args.EndKey
}
exported := roachpb.ExportResponse_File{
Span: span,
EndKeyTS: resume.Timestamp,
Exported: summary,
SST: data,
Fingerprint: fingerprint,

var exported roachpb.ExportResponse_File
if args.ExportFingerprint {
// A fingerprinting ExportRequest does not need to return the
// BulkOpSummary or the exported Span. This is because we do not expect
// the sender of a fingerprint ExportRequest to use anything but the
// `Fingerprint` for point-keys and the SST file that contains the
// rangekeys we encountered during ExportRequest evaluation.
exported = roachpb.ExportResponse_File{
EndKeyTS: resume.Timestamp,
SST: data,
Fingerprint: fingerprint,
}
} else {
exported = roachpb.ExportResponse_File{
Span: span,
EndKeyTS: resume.Timestamp,
Exported: summary,
SST: data,
}
}
reply.Files = append(reply.Files, exported)
start = resume.Key
Expand Down
Loading

0 comments on commit b6434c9

Please sign in to comment.