-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathcollection.go
406 lines (357 loc) · 15.1 KB
/
collection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
// Package descs provides abstractions for dealing with sets of descriptors.
// It is utilized during schema changes and by catalog.Accessor implementations.
package descs
import (
"bytes"
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/hydratedtables"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
// makeCollection constructs a Collection.
func makeCollection(
leaseMgr *lease.Manager,
settings *cluster.Settings,
hydratedTables *hydratedtables.Cache,
virtualSchemas catalog.VirtualSchemas,
temporarySchemaProvider TemporarySchemaProvider,
) Collection {
codec := keys.SystemSQLCodec
if leaseMgr != nil { // permitted for testing
codec = leaseMgr.Codec()
}
return Collection{
settings: settings,
hydratedTables: hydratedTables,
virtual: makeVirtualDescriptors(virtualSchemas),
leased: makeLeasedDescriptors(leaseMgr),
synthetic: makeSyntheticDescriptors(),
uncommitted: makeUncommittedDescriptors(),
kv: makeKVDescriptors(codec),
temporary: makeTemporaryDescriptors(codec, temporarySchemaProvider),
}
}
// Collection is a collection of descriptors held by a single session that
// serves SQL requests, or a background job using descriptors. The
// collection is cleared using ReleaseAll() which is called at the
// end of each transaction on the session, or on hitting conditions such
// as errors, or retries that result in transaction timestamp changes.
type Collection struct {
// settings dictate whether we validate descriptors on write.
settings *cluster.Settings
// virtualSchemas optionally holds the virtual schemas.
virtual virtualDescriptors
// A collection of descriptors valid for the timestamp. They are released once
// the transaction using them is complete.
leased leasedDescriptors
// Descriptors modified by the uncommitted transaction affiliated with this
// Collection. This allows a transaction to see its own modifications while
// bypassing the descriptor lease mechanism. The lease mechanism will have its
// own transaction to read the descriptor and will hang waiting for the
// uncommitted changes to the descriptor if this transaction is PRIORITY HIGH.
// These descriptors are local to this Collection and their state is thus not
// visible to other transactions.
uncommitted uncommittedDescriptors
// A collection of descriptors which were read from the store.
kv kvDescriptors
// syntheticDescriptors contains in-memory descriptors which override all
// other matching descriptors during immutable descriptor resolution (by name
// or by ID), but should not be written to disk. These support internal
// queries which need to use a special modified descriptor (e.g. validating
// non-public schema elements during a schema change). Attempting to resolve
// a mutable descriptor by name or ID when a matching synthetic descriptor
// exists is illegal.
synthetic syntheticDescriptors
// temporary contains logic to access temporary schema descriptors.
temporary temporaryDescriptors
// hydratedTables is node-level cache of table descriptors which utlize
// user-defined types.
hydratedTables *hydratedtables.Cache
// skipValidationOnWrite should only be set to true during forced descriptor
// repairs.
skipValidationOnWrite bool
// droppedDescriptors that will not need to wait for new
// lease versions.
deletedDescs []catalog.Descriptor
// maxTimestampBoundDeadlineHolder contains the maximum timestamp to read
// schemas at. This is only set during the retries of bounded_staleness when
// nearest_only=True, in which we want a schema read that should be no older
// than MaxTimestampBound.
maxTimestampBoundDeadlineHolder maxTimestampBoundDeadlineHolder
// Session is a sqlliveness.Session which may be optionally set.
// It must be set in the multi-tenant environment for ephemeral
// SQL pods. It should not be set otherwise.
sqlLivenessSession sqlliveness.Session
}
var _ catalog.Accessor = (*Collection)(nil)
// MaybeUpdateDeadline updates the deadline in a given transaction
// based on the leased descriptors in this collection. This update is
// only done when a deadline exists.
func (tc *Collection) MaybeUpdateDeadline(ctx context.Context, txn *kv.Txn) (err error) {
return tc.leased.maybeUpdateDeadline(ctx, txn, tc.sqlLivenessSession)
}
// SetMaxTimestampBound sets the maximum timestamp to read schemas at.
func (tc *Collection) SetMaxTimestampBound(maxTimestampBound hlc.Timestamp) {
tc.maxTimestampBoundDeadlineHolder.maxTimestampBound = maxTimestampBound
}
// ResetMaxTimestampBound resets the maximum timestamp to read schemas at.
func (tc *Collection) ResetMaxTimestampBound() {
tc.maxTimestampBoundDeadlineHolder.maxTimestampBound = hlc.Timestamp{}
}
// SkipValidationOnWrite avoids validating uncommitted descriptors prior to
// a transaction commit.
func (tc *Collection) SkipValidationOnWrite() {
tc.skipValidationOnWrite = true
}
// ReleaseSpecifiedLeases releases the leases for the descriptors with ids in
// the passed slice. Errors are logged but ignored.
func (tc *Collection) ReleaseSpecifiedLeases(ctx context.Context, descs []lease.IDVersion) {
tc.leased.release(ctx, descs)
}
// ReleaseLeases releases all leases. Errors are logged but ignored.
func (tc *Collection) ReleaseLeases(ctx context.Context) {
tc.leased.releaseAll(ctx)
// Clear the associated sqlliveness.session
tc.sqlLivenessSession = nil
}
// ReleaseAll releases all state currently held by the Collection.
// ReleaseAll calls ReleaseLeases.
func (tc *Collection) ReleaseAll(ctx context.Context) {
tc.ReleaseLeases(ctx)
tc.uncommitted.reset()
tc.kv.reset()
tc.synthetic.reset()
tc.deletedDescs = nil
}
// HasUncommittedTables returns true if the Collection contains uncommitted
// tables.
func (tc *Collection) HasUncommittedTables() bool {
return tc.uncommitted.hasUncommittedTables()
}
// HasUncommittedTypes returns true if the Collection contains uncommitted
// types.
func (tc *Collection) HasUncommittedTypes() bool {
return tc.uncommitted.hasUncommittedTypes()
}
// Satisfy the linter.
var _ = (*Collection).HasUncommittedTypes
// AddUncommittedDescriptor adds an uncommitted descriptor modified in the
// transaction to the Collection. The descriptor must either be a new descriptor
// or carry the original version or carry the subsequent version to the original
// version.
//
// Subsequent attempts to resolve this descriptor mutably, either by name or ID
// will return this exact object. Subsequent attempts to resolve this descriptor
// immutably will return a copy of the descriptor in the current state. A deep
// copy is performed in this call.
func (tc *Collection) AddUncommittedDescriptor(desc catalog.MutableDescriptor) error {
return tc.uncommitted.checkIn(desc)
}
// ValidateOnWriteEnabled is the cluster setting used to enable or disable
// validating descriptors prior to writing.
var ValidateOnWriteEnabled = settings.RegisterBoolSetting(
"sql.catalog.descs.validate_on_write.enabled",
"set to true to validate descriptors prior to writing, false to disable; default is true",
true, /* defaultValue */
)
// WriteDescToBatch calls MaybeIncrementVersion, adds the descriptor to the
// collection as an uncommitted descriptor, and writes it into b.
func (tc *Collection) WriteDescToBatch(
ctx context.Context, kvTrace bool, desc catalog.MutableDescriptor, b *kv.Batch,
) error {
desc.MaybeIncrementVersion()
if !tc.skipValidationOnWrite && ValidateOnWriteEnabled.Get(&tc.settings.SV) {
if err := catalog.ValidateSelf(desc); err != nil {
return err
}
}
if err := tc.AddUncommittedDescriptor(desc); err != nil {
return err
}
return catalogkv.WriteDescToBatch(ctx, kvTrace, tc.settings, b, tc.codec(), desc.GetID(), desc)
}
// WriteDesc constructs a new Batch, calls WriteDescToBatch and runs it.
func (tc *Collection) WriteDesc(
ctx context.Context, kvTrace bool, desc catalog.MutableDescriptor, txn *kv.Txn,
) error {
b := txn.NewBatch()
if err := tc.WriteDescToBatch(ctx, kvTrace, desc, b); err != nil {
return err
}
return txn.Run(ctx, b)
}
// GetDescriptorsWithNewVersion returns all the IDVersion pairs that have
// undergone a schema change. Returns nil for no schema changes. The version
// returned for each schema change is ClusterVersion - 1, because that's the one
// that will be used when checking for table descriptor two version invariance.
func (tc *Collection) GetDescriptorsWithNewVersion() (originalVersions []lease.IDVersion) {
_ = tc.uncommitted.iterateNewVersionByID(func(_ catalog.NameEntry, originalVersion lease.IDVersion) error {
originalVersions = append(originalVersions, originalVersion)
return nil
})
return originalVersions
}
// GetUncommittedTables returns all the tables updated or created in the
// transaction.
func (tc *Collection) GetUncommittedTables() (tables []catalog.TableDescriptor) {
return tc.uncommitted.getUncommittedTables()
}
func newMutableSyntheticDescriptorAssertionError(id descpb.ID) error {
return errors.AssertionFailedf("attempted mutable access of synthetic descriptor %d", id)
}
// GetAllDescriptors returns all descriptors visible by the transaction,
// first checking the Collection's cached descriptors for validity if validate
// is set to true before defaulting to a key-value scan, if necessary.
func (tc *Collection) GetAllDescriptors(
ctx context.Context, txn *kv.Txn,
) ([]catalog.Descriptor, error) {
return tc.kv.getAllDescriptors(ctx, txn)
}
// GetAllDatabaseDescriptors returns all database descriptors visible by the
// transaction, first checking the Collection's cached descriptors for
// validity before scanning system.namespace and looking up the descriptors
// in the database cache, if necessary.
// If the argument allowMissingDesc is true, the function will return nil-s for
// missing database descriptors.
func (tc *Collection) GetAllDatabaseDescriptors(
ctx context.Context, txn *kv.Txn,
) ([]catalog.DatabaseDescriptor, error) {
return tc.kv.getAllDatabaseDescriptors(ctx, txn)
}
// GetAllTableDescriptorsInDatabase returns all the table descriptors visible to
// the transaction inside the database referenced by the given database ID. It
// first checks the collections cached descriptors before defaulting to a key-value scan.
func (tc *Collection) GetAllTableDescriptorsInDatabase(
ctx context.Context, txn *kv.Txn, dbID descpb.ID,
) ([]catalog.TableDescriptor, error) {
// Ensure the given ID does indeed belong to a database.
found, _, err := tc.getDatabaseByID(ctx, txn, dbID, tree.DatabaseLookupFlags{
AvoidCached: false,
})
if err != nil {
return nil, err
}
if !found {
return nil, sqlerrors.NewUndefinedDatabaseError(fmt.Sprintf("[%d]", dbID))
}
descs, err := tc.GetAllDescriptors(ctx, txn)
if err != nil {
return nil, err
}
var ret []catalog.TableDescriptor
for _, desc := range descs {
if desc.GetParentID() == dbID {
if table, ok := desc.(catalog.TableDescriptor); ok {
ret = append(ret, table)
}
}
}
return ret, nil
}
// GetSchemasForDatabase returns the schemas for a given database
// visible by the transaction. This uses the schema cache locally
// if possible, or else performs a scan on kv.
func (tc *Collection) GetSchemasForDatabase(
ctx context.Context, txn *kv.Txn, dbID descpb.ID,
) (map[descpb.ID]string, error) {
return tc.kv.getSchemasForDatabase(ctx, txn, dbID)
}
// GetObjectNamesAndIDs returns the names and IDs of all objects in a database and schema.
func (tc *Collection) GetObjectNamesAndIDs(
ctx context.Context,
txn *kv.Txn,
dbDesc catalog.DatabaseDescriptor,
scName string,
flags tree.DatabaseListFlags,
) (tree.TableNames, descpb.IDs, error) {
if ok, names, ds := tc.virtual.maybeGetObjectNamesAndIDs(
scName, dbDesc, flags,
); ok {
return names, ds, nil
}
schemaFlags := tree.SchemaLookupFlags{
Required: flags.Required,
AvoidCached: flags.RequireMutable || flags.AvoidCached,
IncludeDropped: flags.IncludeDropped,
IncludeOffline: flags.IncludeOffline,
}
schema, err := tc.getSchemaByName(ctx, txn, dbDesc, scName, schemaFlags)
if err != nil {
return nil, nil, err
}
if schema == nil { // required must have been false
return nil, nil, nil
}
log.Eventf(ctx, "fetching list of objects for %q", dbDesc.GetName())
prefix := catalogkeys.MakeObjectNameKey(tc.codec(), dbDesc.GetID(), schema.GetID(), "")
sr, err := txn.Scan(ctx, prefix, prefix.PrefixEnd(), 0)
if err != nil {
return nil, nil, err
}
alreadySeen := make(map[string]bool)
var tableNames tree.TableNames
var tableIDs descpb.IDs
for _, row := range sr {
_, tableName, err := encoding.DecodeUnsafeStringAscending(bytes.TrimPrefix(
row.Key, prefix), nil)
if err != nil {
return nil, nil, err
}
alreadySeen[tableName] = true
tn := tree.MakeTableNameWithSchema(tree.Name(dbDesc.GetName()), tree.Name(scName), tree.Name(tableName))
tn.ExplicitCatalog = flags.ExplicitPrefix
tn.ExplicitSchema = flags.ExplicitPrefix
tableNames = append(tableNames, tn)
tableIDs = append(tableIDs, descpb.ID(row.ValueInt()))
}
return tableNames, tableIDs, nil
}
// SetSyntheticDescriptors sets the provided descriptors as the synthetic
// descriptors to override all other matching descriptors during immutable
// access. An immutable copy is made if the descriptor is mutable. See the
// documentation on syntheticDescriptors.
func (tc *Collection) SetSyntheticDescriptors(descs []catalog.Descriptor) {
tc.synthetic.set(descs)
}
func (tc *Collection) codec() keys.SQLCodec {
return tc.kv.codec
}
// AddDeletedDescriptor is temporarily tracking descriptors that have been,
// deleted which from an add state without any intermediate steps
// Any descriptors marked as deleted will be skipped for the
// wait for one version logic inside descs.Txn, since they will no longer
// be inside storage.
// Note: that this happens, at time of writing, only when reverting an
// IMPORT or RESTORE.
func (tc *Collection) AddDeletedDescriptor(desc catalog.Descriptor) {
tc.deletedDescs = append(tc.deletedDescs, desc)
}
// SetSession sets the sqlliveness.Session for the transaction. This
// should only be called in a multi-tenant environment.
func (tc *Collection) SetSession(session sqlliveness.Session) {
tc.sqlLivenessSession = session
}