-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathindex_backfiller.go
170 lines (155 loc) · 6.15 KB
/
index_backfiller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package sql
import (
"context"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
// IndexBackfillPlanner holds dependencies for an index backfiller.
type IndexBackfillPlanner struct {
execCfg *ExecutorConfig
ieFactory sqlutil.SessionBoundInternalExecutorFactory
}
// NewIndexBackfiller creates a new IndexBackfillPlanner.
func NewIndexBackfiller(
execCfg *ExecutorConfig, ieFactory sqlutil.SessionBoundInternalExecutorFactory,
) *IndexBackfillPlanner {
return &IndexBackfillPlanner{execCfg: execCfg, ieFactory: ieFactory}
}
// BackfillIndex will backfill the specified index on the passed table.
//
// TODO(ajwerner): allow backfilling multiple indexes.
func (ib *IndexBackfillPlanner) BackfillIndex(
ctx context.Context,
tracker scexec.JobProgressTracker,
descriptor catalog.TableDescriptor,
source descpb.IndexID,
toBackfill ...descpb.IndexID,
) error {
// Pick an arbitrary read timestamp for the reads of the backfill.
// It's safe to use any timestamp to read even if we've partially backfilled
// at an earlier timestamp because other writing transactions have been
// writing at the appropriate timestamps in-between.
backfillReadTimestamp := ib.execCfg.DB.Clock().Now()
targetSpans := make([]roachpb.Span, len(toBackfill))
for i, idxID := range toBackfill {
targetSpans[i] = descriptor.IndexSpan(ib.execCfg.Codec, idxID)
}
if err := ib.scanTargetSpansToPushTimestampCache(
ctx, backfillReadTimestamp, targetSpans,
); err != nil {
return err
}
// TODO(dt): persist a write ts, don't rescan above.
backfillWriteTimestamp := backfillReadTimestamp
resumeSpans, err := tracker.GetResumeSpans(ctx, descriptor.GetID(), source)
if err != nil {
return err
}
run, err := ib.plan(ctx, descriptor, backfillReadTimestamp, backfillWriteTimestamp, backfillReadTimestamp, resumeSpans, toBackfill, func(
ctx context.Context, meta *execinfrapb.ProducerMetadata,
) error {
// TODO(ajwerner): Hook up the jobs tracking stuff.
log.Infof(ctx, "got update: %v", meta)
return nil
})
if err != nil {
return err
}
return run(ctx)
}
// Index backfilling ingests SSTs that don't play nicely with running txns
// since they just add their keys blindly. Running a Scan of the target
// spans at the time the SSTs' keys will be written will calcify history up
// to then since the scan will resolve intents and populate tscache to keep
// anything else from sneaking under us. Since these are new indexes, these
// spans should be essentially empty, so this should be a pretty quick and
// cheap scan.
func (ib *IndexBackfillPlanner) scanTargetSpansToPushTimestampCache(
ctx context.Context, backfillTimestamp hlc.Timestamp, targetSpans []roachpb.Span,
) error {
const pageSize = 10000
return ib.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetFixedTimestamp(ctx, backfillTimestamp); err != nil {
return err
}
for _, span := range targetSpans {
// TODO(dt): a Count() request would be nice here if the target isn't
// empty, since we don't need to drag all the results back just to
// then ignore them -- we just need the iteration on the far end.
if err := txn.Iterate(ctx, span.Key, span.EndKey, pageSize, iterateNoop); err != nil {
return err
}
}
return nil
})
}
func iterateNoop(_ []kv.KeyValue) error { return nil }
var _ scexec.IndexBackfiller = (*IndexBackfillPlanner)(nil)
func (ib *IndexBackfillPlanner) plan(
ctx context.Context,
tableDesc catalog.TableDescriptor,
nowTimestamp, writeAsOf, readAsOf hlc.Timestamp,
sourceSpans []roachpb.Span,
indexesToBackfill []descpb.IndexID,
callback func(_ context.Context, meta *execinfrapb.ProducerMetadata) error,
) (runFunc func(context.Context) error, _ error) {
var p *PhysicalPlan
var evalCtx extendedEvalContext
var planCtx *PlanningCtx
td := tabledesc.NewBuilder(tableDesc.TableDesc()).BuildExistingMutableTable()
if err := DescsTxn(ctx, ib.execCfg, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, nowTimestamp, ib.ieFactory, descriptors)
planCtx = ib.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
true /* distribute */)
// TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the
// batch size. Also plumb in a testing knob.
chunkSize := indexBackfillBatchSize.Get(&ib.execCfg.Settings.SV)
spec, err := initIndexBackfillerSpec(*td.TableDesc(), writeAsOf, readAsOf, chunkSize, indexesToBackfill)
if err != nil {
return err
}
p, err = ib.execCfg.DistSQLPlanner.createBackfillerPhysicalPlan(planCtx, spec, sourceSpans)
return err
}); err != nil {
return nil, err
}
return func(ctx context.Context) error {
cbw := MetadataCallbackWriter{rowResultWriter: &errOnlyResultWriter{}, fn: callback}
recv := MakeDistSQLReceiver(
ctx,
&cbw,
tree.Rows, /* stmtType - doesn't matter here since no result are produced */
ib.execCfg.RangeDescriptorCache,
nil, /* txn - the flow does not run wholly in a txn */
ib.execCfg.Clock,
evalCtx.Tracing,
ib.execCfg.ContentionRegistry,
nil, /* testingPushCallback */
)
defer recv.Release()
evalCtxCopy := evalCtx
ib.execCfg.DistSQLPlanner.Run(planCtx, nil, p, recv, &evalCtxCopy, nil)()
return cbw.Err()
}, nil
}