Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
100678: cli: unskip TestUnavailableZip r=aadityasondhi a=aadityasondhi

cli: make debug zip resilient to single failures

This fixes a regression in debug zip where a timeout failure in calls to
the status server would fail the entire collection of debug zip. We
instead want to continue trying all endpoints instead.

----

cli: unskip TestUnavailableZip

Fixes: #53306

Release note: None

101892: colexecjoin: minor cleanup of the hash joiner r=yuzefovich a=yuzefovich

This commit adds the missing memory accounting for the `buildRowMatched` slice used by the hash joiner for FULL OUTER, RIGHT OUTER, RIGHT SEMI, and RIGHT ANTI joins. Unlike other slices in `probeState` struct, this slice is the only one that is of non-constant size (i.e. it's not limited by `coldata.BatchSize()`), so we should be including it into the accounting.

It also removes the redundant allocation of `Visited` slice for LEFT ANTI joins since that slice isn't used by that join type.

Additionally, this commit makes a few minor adjustments to the comments and to reusing a utility method for slice allocation.

Epic: None

Release note: None

101920: logictest: remove a couple of redundant rowsort directives r=yuzefovich a=yuzefovich

Two queries have ORDER BY which should provide unique ordering. Noticed when reviewing another change.

Epic: None

Release note: None

101928: sql: cap the tenant IDs that can be allocated via create_tenant r=yuzefovich a=knz

Requested  by `@yuzefovich` [here](#101845 (review)).

This patch introduces a limit (MaxUint32) to the fixed IDs that can be
selected by `crdb_internal.create_tenant`. This ensures that there are
always IDs remaining available for CREATE TENANT after the maximum
value has been selected via `create_tenant`.

Release note: None
Epic: CRDB-23559

Co-authored-by: Aaditya Sondhi <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
4 people committed Apr 20, 2023
5 parents ac03a55 + 95eaa0e + 1cd29de + 2fd89ca + 0f2502f commit fe19f93
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 149 deletions.
156 changes: 85 additions & 71 deletions pkg/cli/testdata/zip/unavailable

Large diffs are not rendered by default.

45 changes: 21 additions & 24 deletions pkg/cli/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (
"github.com/jackc/pgconn"
"github.com/marusama/semaphore"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// zipRequest abstracts a possible server API call to one of the API
Expand Down Expand Up @@ -144,6 +142,11 @@ func runDebugZip(_ *cobra.Command, args []string) (retErr error) {
return err
}

timeout := 10 * time.Second
if cliCtx.cmdTimeout != 0 {
timeout = cliCtx.cmdTimeout
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -163,24 +166,23 @@ func runDebugZip(_ *cobra.Command, args []string) (retErr error) {
}
defer finish()

resp, err := serverpb.NewAdminClient(conn).ListTenants(ctx, &serverpb.ListTenantsRequest{})
if err != nil {
if code := status.Code(errors.Cause(err)); code == codes.Unimplemented {
// For pre-v23.1 clusters, this endpoint in not implemented, proceed with
// only querying the system tenant.
resp, sErr := serverpb.NewStatusClient(conn).Details(ctx, &serverpb.DetailsRequest{NodeId: "local"})
if sErr != nil {
return s.fail(errors.CombineErrors(err, sErr))
}
tenants = append(tenants, &serverpb.Tenant{
TenantId: &roachpb.SystemTenantID,
TenantName: catconstants.SystemTenantName,
SqlAddr: resp.SQLAddress.String(),
RpcAddr: serverCfg.Addr,
})
} else {
return s.fail(err)
var resp *serverpb.ListTenantsResponse
if err := contextutil.RunWithTimeout(context.Background(), "list tenants", timeout, func(ctx context.Context) error {
resp, err = serverpb.NewAdminClient(conn).ListTenants(ctx, &serverpb.ListTenantsRequest{})
return err
}); err != nil {
// For pre-v23.1 clusters, this endpoint in not implemented, proceed with
// only querying the system tenant.
resp, sErr := serverpb.NewStatusClient(conn).Details(ctx, &serverpb.DetailsRequest{NodeId: "local"})
if sErr != nil {
return s.fail(errors.CombineErrors(err, sErr))
}
tenants = append(tenants, &serverpb.Tenant{
TenantId: &roachpb.SystemTenantID,
TenantName: catconstants.SystemTenantName,
SqlAddr: resp.SQLAddress.String(),
RpcAddr: serverCfg.Addr,
})
} else {
tenants = resp.Tenants
}
Expand Down Expand Up @@ -256,11 +258,6 @@ func runDebugZip(_ *cobra.Command, args []string) (retErr error) {
s.done()
}

timeout := 10 * time.Second
if cliCtx.cmdTimeout != 0 {
timeout = cliCtx.cmdTimeout
}

// Only add tenant prefix for non system tenants.
var prefix string
if tenant.TenantId.ToUint64() != roachpb.SystemTenantID.ToUint64() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/cli/zip_cluster_wide.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,13 @@ func (zc *debugZipContext) collectClusterData(

{
s := zc.clusterPrinter.start("requesting nodes")
var nodesStatus *serverpb.NodesResponse
err := zc.runZipFn(ctx, s, func(ctx context.Context) error {
nodesList, err = zc.status.NodesList(ctx, &serverpb.NodesListRequest{})
nodesStatus, err = zc.status.Nodes(ctx, &serverpb.NodesRequest{})
return err
})

nodesStatus, err := zc.status.Nodes(ctx, &serverpb.NodesRequest{})
if code := status.Code(errors.Cause(err)); code == codes.Unimplemented {
// running on non system tenant, use data from NodesList()
if cErr := zc.z.createJSONOrError(s, debugBase+"/nodes.json", nodesList, err); cErr != nil {
Expand Down
22 changes: 16 additions & 6 deletions pkg/cli/zip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ create table defaultdb."../system"(x int);
// need the SSL certs dir to run a CLI test securely.
func TestUnavailableZip(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 53306, "flaky test")

skip.UnderShort(t)
// Race builds make the servers so slow that they report spurious
Expand Down Expand Up @@ -324,11 +323,7 @@ func TestUnavailableZip(t *testing.T) {

// Strip any non-deterministic messages.
out = eraseNonDeterministicZipOutput(out)

// In order to avoid non-determinism here, we erase the output of
// the range retrieval.
re := regexp.MustCompile(`(?m)^(requesting ranges.*found|writing: debug/nodes/\d+/ranges).*\n`)
out = re.ReplaceAllString(out, ``)
out = eraseNonDeterministicErrors(out)

datadriven.RunTest(t, datapathutils.TestDataPath(t, "zip", "unavailable"),
func(t *testing.T, td *datadriven.TestData) string {
Expand Down Expand Up @@ -371,6 +366,21 @@ func eraseNonDeterministicZipOutput(out string) string {
return out
}

func eraseNonDeterministicErrors(out string) string {
// In order to avoid non-determinism here, we erase the output of
// the range retrieval.
re := regexp.MustCompile(`(?m)^(requesting ranges.*found|writing: debug/nodes/\d+/ranges).*\n`)
out = re.ReplaceAllString(out, ``)

re = regexp.MustCompile(`(?m)^\[cluster\] requesting data for debug\/settings.*\n`)
out = re.ReplaceAllString(out, ``)

// In order to avoid non-determinism here, we truncate error messages.
re = regexp.MustCompile(`(?m)last request failed: .*$`)
out = re.ReplaceAllString(out, `last request failed: ...`)
return out
}

// This tests the operation of zip over partial clusters.
//
// We cannot combine this test with TestZip above because TestPartialZip
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colexec/colexechash/hashtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ type hashTableProbeBuffer struct {
// tuples from the probing batch. Concretely, ToCheckID[i] is the keyID of
// the tuple in the hash table which we are currently comparing with the ith
// tuple of the probing batch. i is included in ToCheck. The result of the
// comparison is stored in 'differs' and/or 'distinct'.
// comparison is stored in 'differs' and/or 'foundNull'.
//
// On the first iteration:
// ToCheckID[i] = First[hash[i]]
Expand Down Expand Up @@ -819,8 +819,8 @@ func (ht *HashTable) buildNextChains(first, next []keyID, offset, batchSize uint
ht.cancelChecker.CheckEveryCall()
}

// SetupLimitedSlices ensures that HeadID, differs, distinct, ToCheckID, and
// ToCheck are of the desired length and are setup for probing.
// SetupLimitedSlices ensures that HeadID, differs, foundNull, ToCheckID, and
// ToCheck are of the desired length and are set up for probing.
// Note that if the old ToCheckID or ToCheck slices have enough capacity, they
// are *not* zeroed out.
func (p *hashTableProbeBuffer) SetupLimitedSlices(length int, buildMode HashTableBuildMode) {
Expand Down
93 changes: 53 additions & 40 deletions pkg/sql/colexec/colexecjoin/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ type hashJoiner struct {

// probeState is used in hjProbing state.
probeState struct {
// buildIdx and probeIdx represents the matching row indices that are used to
// stitch together the join results.
// buildIdx and probeIdx represents the matching row indices that are
// used to stitch together the join results.
buildIdx []int
probeIdx []int

Expand All @@ -205,17 +205,22 @@ type hashJoiner struct {
// be NULL on the build table. This indicates that the probe table row
// did not match any build table rows.
probeRowUnmatched []bool
// buildRowMatched is used in the case that spec.trackBuildMatches is true. This
// means that an outer join is performed on the build side and buildRowMatched
// marks all the build table rows that have been matched already. The rows
// that were unmatched are emitted during the hjEmittingRight phase.
// buildRowMatched is used in the case that spec.trackBuildMatches is
// true. This means that an outer join is performed on the build side
// and buildRowMatched marks all the build table rows that have been
// matched already. The rows that were unmatched are emitted during the
// hjEmittingRight phase.
//
// Note that this is the only slice in probeState of non-constant size
// (i.e. not limited by coldata.BatchSize() in capacity), so it's the
// only one we perform the memory accounting for.
buildRowMatched []bool

// buckets is used to store the computed hash value of each key in a single
// probe batch.
// buckets is used to store the computed hash value of each key in a
// single probe batch.
buckets []uint64
// prevBatch, if not nil, indicates that the previous probe input batch has
// not been fully processed.
// prevBatch, if not nil, indicates that the previous probe input batch
// has not been fully processed.
prevBatch coldata.Batch
// prevBatchResumeIdx indicates the index of the probe row to resume the
// collection from. It is used only in case of non-distinct build source
Expand All @@ -235,6 +240,9 @@ type hashJoiner struct {
hashtableSame int64
// hashtableVisited tracks the current memory usage of hj.ht.Visited.
hashtableVisited int64
// buildRowMatched tracks the current memory usage of
// hj.probeState.buildRowMatched.
buildRowMatched int64
}

exportBufferedState struct {
Expand Down Expand Up @@ -328,50 +336,57 @@ func (hj *hashJoiner) Next() coldata.Batch {
func (hj *hashJoiner) build() {
hj.ht.FullBuild(hj.InputTwo)

// At this point, we have fully built the hash table on the right side
// (meaning we have fully consumed the right input), so it'd be a shame to
// fall back to disk, thus, we use the unlimited allocator.
allocator := hj.outputUnlimitedAllocator

// If we might have duplicates in the hash table (meaning that rightDistinct
// is false), we need to set up Same and Visited slices for the prober
// (depending on the join type).
var needSame bool
// Visited slice is always used for set-operation joins, regardless of
// the fact whether the right side is distinct.
needVisited := hj.spec.JoinType.IsSetOpJoin()

if !hj.spec.rightDistinct {
switch hj.spec.JoinType {
case descpb.LeftAntiJoin, descpb.ExceptAllJoin, descpb.IntersectAllJoin:
default:
// We don't need Same with LEFT ANTI, EXCEPT ALL, and INTERSECT ALL
// joins because they have a separate collectSingleMatch method.
hj.ht.Same = colexecutils.MaybeAllocateUint64Array(hj.ht.Same, hj.ht.Vals.Length()+1)
// At this point, we have fully built the hash table on the right
// side (meaning we have fully consumed the right input), so it'd be
// a shame to fallback to disk, thus, we use the unlimited
// allocator.
newAccountedFor := memsize.Uint64 * int64(cap(hj.ht.Same))
// hj.ht.Same will never shrink, so the delta is non-negative.
hj.outputUnlimitedAllocator.AdjustMemoryUsageAfterAllocation(newAccountedFor - hj.accountedFor.hashtableSame)
hj.accountedFor.hashtableSame = newAccountedFor
needSame = true
// Visited isn't needed for LEFT ANTI joins (it's used by EXCEPT ALL
// and INTERSECT ALL, but those cases are handled above already).
needVisited = true
}
}
if !hj.spec.rightDistinct || hj.spec.JoinType.IsSetOpJoin() {
// Visited slice is also used for set-operation joins, regardless of
// the fact whether the right side is distinct.
if needSame {
hj.ht.Same = colexecutils.MaybeAllocateUint64Array(hj.ht.Same, hj.ht.Vals.Length()+1)
newAccountedFor := memsize.Uint64 * int64(cap(hj.ht.Same))
// hj.ht.Same will never shrink, so the delta is non-negative.
allocator.AdjustMemoryUsageAfterAllocation(newAccountedFor - hj.accountedFor.hashtableSame)
hj.accountedFor.hashtableSame = newAccountedFor
}

if needVisited {
hj.ht.Visited = colexecutils.MaybeAllocateBoolArray(hj.ht.Visited, hj.ht.Vals.Length()+1)
// At this point, we have fully built the hash table on the right side
// (meaning we have fully consumed the right input), so it'd be a shame
// to fallback to disk, thus, we use the unlimited allocator.
newAccountedFor := memsize.Bool * int64(cap(hj.ht.Visited))
// hj.ht.Visited will never shrink, so the delta is non-negative.
hj.outputUnlimitedAllocator.AdjustMemoryUsageAfterAllocation(newAccountedFor - hj.accountedFor.hashtableVisited)
allocator.AdjustMemoryUsageAfterAllocation(newAccountedFor - hj.accountedFor.hashtableVisited)
hj.accountedFor.hashtableVisited = newAccountedFor
// Since keyID = 0 is reserved for end of list, it can be marked as visited
// at the beginning.
// Since keyID = 0 is reserved for end of list, it can be marked as
// visited at the beginning.
hj.ht.Visited[0] = true
}

if hj.spec.trackBuildMatches {
if cap(hj.probeState.buildRowMatched) < hj.ht.Vals.Length() {
hj.probeState.buildRowMatched = make([]bool, hj.ht.Vals.Length())
} else {
hj.probeState.buildRowMatched = hj.probeState.buildRowMatched[:hj.ht.Vals.Length()]
for n := 0; n < hj.ht.Vals.Length(); n += copy(hj.probeState.buildRowMatched[n:], colexecutils.ZeroBoolColumn) {
}
}
hj.probeState.buildRowMatched = colexecutils.MaybeAllocateBoolArray(hj.probeState.buildRowMatched, hj.ht.Vals.Length())
newAccountedFor := memsize.Bool * int64(cap(hj.probeState.buildRowMatched))
// hj.probeState.buildRowMatched will never shrink, so the delta is
// non-negative.
allocator.AdjustMemoryUsageAfterAllocation(newAccountedFor - hj.accountedFor.buildRowMatched)
hj.accountedFor.buildRowMatched = newAccountedFor
}

hj.state = hjProbing
Expand Down Expand Up @@ -463,11 +478,9 @@ func (hj *hashJoiner) prepareForCollecting(batchSize int) {
return
}
if hj.spec.JoinType.IsLeftOuterOrFullOuter() {
if cap(hj.probeState.probeRowUnmatched) < batchSize {
hj.probeState.probeRowUnmatched = make([]bool, batchSize)
} else {
hj.probeState.probeRowUnmatched = hj.probeState.probeRowUnmatched[:batchSize]
}
hj.probeState.probeRowUnmatched = colexecutils.MaybeAllocateLimitedBoolArray(
hj.probeState.probeRowUnmatched, batchSize,
)
}
if cap(hj.probeState.buildIdx) < batchSize {
hj.probeState.buildIdx = make([]int, batchSize)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public descriptor_id_seq
public role_id_seq
public tenant_id_seq

query TTTTT colnames,rowsort
query TTTTT colnames
SELECT schema_name, table_name, type, owner, locality FROM [SHOW TABLES FROM system]
ORDER BY schema_name, table_name
----
Expand Down Expand Up @@ -328,7 +328,7 @@ public users table NULL NULL
public web_sessions table NULL NULL
public zones table NULL NULL

query TTTTTT colnames,rowsort
query TTTTTT colnames
SELECT schema_name, table_name, type, owner, locality, comment FROM [SHOW TABLES FROM system WITH COMMENT]
ORDER BY schema_name, table_name
----
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/tenant_builtins
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,8 @@ query I
SELECT crdb_internal.create_tenant('{"name":"tenant-number-ten", "if_not_exists": true}'::JSONB)
----
NULL

subtest avoid_too_large_ids

query error tenant ID 10000000000 out of range
SELECT crdb_internal.create_tenant(10000000000)
14 changes: 12 additions & 2 deletions pkg/sql/tenant_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
gojson "encoding/json"
"fmt"
"math"
"strings"
"time"

Expand Down Expand Up @@ -62,6 +63,14 @@ func (p *planner) CreateTenant(
}
}

if ctcfg.ID != nil && *ctcfg.ID > math.MaxUint32 {
// Tenant creation via this interface (which includes
// crdb_internal.create_tenant) should be prevented from gobbling
// up the entire tenant ID space by asking for too large values.
// Otherwise, CREATE TENANT will not be possible any more.
return tid, pgerror.Newf(pgcode.ProgramLimitExceeded, "tenant ID %d out of range", *ctcfg.ID)
}

configTemplate := mtinfopb.TenantInfoWithUsage{}

return p.createTenantInternal(ctx, ctcfg, &configTemplate)
Expand Down Expand Up @@ -608,6 +617,8 @@ HAVING ($1 = '' OR NOT EXISTS (SELECT 1 FROM system.tenants t WHERE t.name = $1)
return roachpb.MakeTenantID(nextID)
}

var tenantIDSequenceFQN = tree.MakeTableNameWithSchema(catconstants.SystemDatabaseName, tree.PublicSchemaName, tree.Name(catconstants.TenantIDSequenceTableName))

// getTenantIDSequenceDesc retrieves a leased descriptor for the
// sequence system.tenant_id_seq.
func getTenantIDSequenceDesc(ctx context.Context, txn isql.Txn) (catalog.TableDescriptor, error) {
Expand All @@ -626,9 +637,8 @@ func getTenantIDSequenceDesc(ctx context.Context, txn isql.Txn) (catalog.TableDe
coll := itxn.Descriptors()

// Full name of the sequence.
tn := tree.MakeTableNameWithSchema(catconstants.SystemDatabaseName, tree.PublicSchemaName, tree.Name(catconstants.TenantIDSequenceTableName))
// Look up the sequence by name with lease.
_, desc, err := descs.PrefixAndTable(ctx, coll.ByNameWithLeased(txn.KV()).Get(), &tn)
_, desc, err := descs.PrefixAndTable(ctx, coll.ByNameWithLeased(txn.KV()).Get(), &tenantIDSequenceFQN)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit fe19f93

Please sign in to comment.