-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathgc_job.go
220 lines (198 loc) · 6.65 KB
/
gc_job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package gcjob
import (
"context"
"time"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)
var (
// MaxSQLGCInterval is the longest the polling interval between checking if
// elements should be GC'd.
MaxSQLGCInterval = 5 * time.Minute
)
// SetSmallMaxGCIntervalForTest sets the MaxSQLGCInterval and then returns a closure
// that resets it.
// This is to be used in tests like:
// defer SetSmallMaxGCIntervalForTest()
func SetSmallMaxGCIntervalForTest() func() {
oldInterval := MaxSQLGCInterval
MaxSQLGCInterval = 500 * time.Millisecond
return func() {
MaxSQLGCInterval = oldInterval
}
}
type schemaChangeGCResumer struct {
jobID int64
}
// performGC GCs any schema elements that are in the DELETING state and returns
// a bool indicating if it GC'd any elements.
func performGC(
ctx context.Context,
execCfg *sql.ExecutorConfig,
details *jobspb.SchemaChangeGCDetails,
progress *jobspb.SchemaChangeGCProgress,
) (bool, error) {
didGC := false
if details.Indexes != nil {
if didGCIndex, err := gcIndexes(ctx, execCfg, details.ParentID, progress); err != nil {
return false, errors.Wrap(err, "attempting to GC indexes")
} else if didGCIndex {
didGC = true
}
} else if details.Tables != nil {
if didGCTable, err := gcTables(ctx, execCfg, progress); err != nil {
return false, errors.Wrap(err, "attempting to GC tables")
} else if didGCTable {
didGC = true
}
// Drop database zone config when all the tables have been GCed.
if details.ParentID != descpb.InvalidID && isDoneGC(progress) {
if err := deleteDatabaseZoneConfig(ctx, execCfg.DB, execCfg.Codec, details.ParentID); err != nil {
return false, errors.Wrap(err, "deleting database zone config")
}
}
}
return didGC, nil
}
// Resume is part of the jobs.Resumer interface.
//<<<<<<< HEAD
func (r schemaChangeGCResumer) Resume(
ctx context.Context, phs interface{}, _ chan<- tree.Datums,
) (err error) {
defer func() {
if err != nil && !r.isPermanentGCError(err) {
err = errors.Mark(err, jobs.NewRetryJobError("gc"))
}
}()
p := phs.(sql.PlanHookState)
// TODO(pbardea): Wait for no versions.
execCfg := p.ExecCfg()
if fn := execCfg.GCJobTestingKnobs.RunBeforeResume; fn != nil {
if err := fn(r.jobID); err != nil {
return err
}
}
details, progress, err := initDetailsAndProgress(ctx, execCfg, r.jobID)
if err != nil {
return err
}
// If there are any interleaved indexes to drop as part of a table TRUNCATE
// operation, then drop the indexes before waiting on the GC timer.
if len(details.InterleavedIndexes) > 0 {
// Before deleting any indexes, ensure that old versions of the table
// descriptor are no longer in use.
if err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, details.InterleavedTable.ID); err != nil {
return err
}
if err := sql.TruncateInterleavedIndexes(
ctx,
execCfg,
tabledesc.NewImmutable(*details.InterleavedTable),
details.InterleavedIndexes,
); err != nil {
return err
}
}
gossipUpdateC, cleanup := execCfg.GCJobNotifier.AddNotifyee(ctx)
defer cleanup()
tableDropTimes, indexDropTimes := getDropTimes(details)
allTables := getAllTablesWaitingForGC(details, progress)
if len(allTables) == 0 {
return nil
}
expired, earliestDeadline := refreshTables(ctx, execCfg, allTables, tableDropTimes, indexDropTimes, r.jobID, progress)
timerDuration := timeutil.Until(earliestDeadline)
if expired {
timerDuration = 0
} else if timerDuration > MaxSQLGCInterval {
timerDuration = MaxSQLGCInterval
}
timer := timeutil.NewTimer()
defer timer.Stop()
timer.Reset(timerDuration)
for {
select {
case <-gossipUpdateC:
// Upon notification of a gossip update, update the status of the relevant schema elements.
if log.V(2) {
log.Info(ctx, "received a new system config")
}
remainingTables := getAllTablesWaitingForGC(details, progress)
if len(remainingTables) == 0 {
return nil
}
expired, earliestDeadline = refreshTables(ctx, execCfg, remainingTables, tableDropTimes, indexDropTimes, r.jobID, progress)
if isDoneGC(progress) {
return nil
}
timerDuration := time.Until(earliestDeadline)
if expired {
timerDuration = 0
} else if timerDuration > MaxSQLGCInterval {
timerDuration = MaxSQLGCInterval
}
timer.Reset(timerDuration)
case <-timer.C:
timer.Read = true
if log.V(2) {
log.Info(ctx, "SchemaChangeGC timer triggered")
}
// Refresh the status of all tables in case any GC TTLs have changed.
remainingTables := getAllTablesWaitingForGC(details, progress)
_, earliestDeadline = refreshTables(ctx, execCfg, remainingTables, tableDropTimes, indexDropTimes, r.jobID, progress)
if didWork, err := performGC(ctx, execCfg, details, progress); err != nil {
return err
} else if didWork {
persistProgress(ctx, execCfg, r.jobID, progress)
}
if isDoneGC(progress) {
return nil
}
// Schedule the next check for GC.
timerDuration := time.Until(earliestDeadline)
if timerDuration > MaxSQLGCInterval {
timerDuration = MaxSQLGCInterval
}
timer.Reset(timerDuration)
case <-ctx.Done():
return ctx.Err()
}
}
}
// OnFailOrCancel is part of the jobs.Resumer interface.
func (r schemaChangeGCResumer) OnFailOrCancel(context.Context, interface{}) error {
return nil
}
// isPermanentGCError returns true if the error is a permanent job failure,
// which indicates that the failed GC job cannot be retried.
func (r *schemaChangeGCResumer) isPermanentGCError(err error) bool {
// Currently we classify errors based on Schema Change function to backport
// to 20.2 and 21.1. This functionality should be changed once #44594 is
// implemented.
return sql.IsPermanentSchemaChangeError(err)
}
func init() {
createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
return &schemaChangeGCResumer{
jobID: *job.ID(),
}
}
jobs.RegisterConstructor(jobspb.TypeSchemaChangeGC, createResumerFn)
}