-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
jobs_collection.go
89 lines (76 loc) · 2.5 KB
/
jobs_collection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package sql
import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
)
type jobsCollection []jobspb.JobID
func (jc *jobsCollection) add(ids ...jobspb.JobID) {
*jc = append(*jc, ids...)
}
func (jc *jobsCollection) reset() {
*jc = nil
}
// txnJobsCollection is used to collect information of all jobs created in a
// transaction. It's also used as a cache of job records created outside
// declarative schema changer.
type txnJobsCollection struct {
// created represents a list of job IDs that has been created and queued to
// system.jobs.
created jobsCollection
// uniqueToCreate contains job records unique to a descriptor ID. Typically,
// this kind of jobs are created when mutating relations, we only allow one
// job for a relation in one transaction. These jobs will be created and
// queued at commit time.
uniqueToCreate map[descpb.ID]*jobs.Record
// uniqueToCreate contains job records that are not unique to a descriptor
// IDs. These jobs will be created and queued at commit time.
nonUniqueToCreate []*jobs.Record
}
func newTxnJobsCollection() *txnJobsCollection {
ret := &txnJobsCollection{
uniqueToCreate: make(map[descpb.ID]*jobs.Record),
}
return ret
}
func (j *txnJobsCollection) addCreatedJobID(jobID ...jobspb.JobID) {
j.created.add(jobID...)
}
func (j *txnJobsCollection) addNonUniqueJobToCreate(jobRecord *jobs.Record) {
j.nonUniqueToCreate = append(j.nonUniqueToCreate, jobRecord)
}
func (j *txnJobsCollection) reset() {
j.created.reset()
for id := range j.uniqueToCreate {
delete(j.uniqueToCreate, id)
}
j.nonUniqueToCreate = nil
}
func (j *txnJobsCollection) numToCreate() int {
return len(j.uniqueToCreate) + len(j.nonUniqueToCreate)
}
func (j *txnJobsCollection) hasAnyToCreate() bool {
return j.numToCreate() > 0
}
func (j *txnJobsCollection) forEachToCreate(fn func(jobRecord *jobs.Record) error) error {
for _, r := range j.uniqueToCreate {
if err := fn(r); err != nil {
return err
}
}
for _, r := range j.nonUniqueToCreate {
if err := fn(r); err != nil {
return err
}
}
return nil
}