Skip to content

Commit

Permalink
Merge #98531 #98534
Browse files Browse the repository at this point in the history
98531: sql: skip TestRelocateNonVoters r=mgartner a=mgartner

See #97320.

Epic: None

Release note: None

98534: jobs: avoid a dangling goroutine r=ajwerner a=knz

Release note: None
Epic: None

Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
3 people committed Mar 14, 2023
3 parents 2ac6e51 + eaa6094 + cd7b335 commit 60e7c78
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 9 deletions.
6 changes: 5 additions & 1 deletion pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,12 @@ func (r *Registry) resumeJob(

// If the job's type was registered to disable tenant cost control, then
// exclude the job's costs from tenant accounting.
if opts, ok := options[payload.Type()]; ok && opts.disableTenantCostControl {
if opts, ok := getRegisterOptions(payload.Type()); ok && opts.disableTenantCostControl {
resumeCtx = multitenant.WithTenantCostControlExemption(resumeCtx)
}
if alreadyAdopted := r.addAdoptedJob(jobID, s, cancel); alreadyAdopted {
// Not needing the context after all. Avoid leaking resources.
cancel()
return nil
}

Expand All @@ -333,6 +335,8 @@ func (r *Registry) resumeJob(
_ = r.runJob(resumeCtx, resumer, job, status, job.taskName())
}); err != nil {
r.unregister(jobID)
// Also avoid leaking a goroutine in this case.
cancel()
return err
}
return nil
Expand Down
10 changes: 8 additions & 2 deletions pkg/jobs/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,15 @@ import "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"

// ResetConstructors resets the registered Resumer constructors.
func ResetConstructors() func() {
globalMu.Lock()
defer globalMu.Unlock()
old := make(map[jobspb.Type]Constructor)
for k, v := range constructors {
for k, v := range globalMu.constructors {
old[k] = v
}
return func() { constructors = old }
return func() {
globalMu.Lock()
defer globalMu.Unlock()
globalMu.constructors = old
}
}
2 changes: 1 addition & 1 deletion pkg/jobs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (m *Metrics) init(histogramWindowInterval time.Duration) {
ExpiredPTS: metric.NewCounter(makeMetaExpiredPTS(typeStr)),
ProtectedAge: metric.NewGauge(makeMetaProtectedAge(typeStr)),
}
if opts, ok := options[jt]; ok && opts.metrics != nil {
if opts, ok := getRegisterOptions(jt); ok && opts.metrics != nil {
m.JobSpecificMetrics[jt] = opts.metrics
}
}
Expand Down
35 changes: 30 additions & 5 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1411,8 +1411,26 @@ type JobResultsReporter interface {
// that can be used by the other methods.
type Constructor func(job *Job, settings *cluster.Settings) Resumer

var constructors = make(map[jobspb.Type]Constructor)
var options = make(map[jobspb.Type]registerOptions)
// The constructors and options are protected behind a mutex because
// the unit tests in this package register constructors/options
// concurrently with the initialization of test servers, where jobs
// can get created and adopted.
var globalMu = struct {
syncutil.Mutex

constructors map[jobspb.Type]Constructor
options map[jobspb.Type]registerOptions
}{
constructors: make(map[jobspb.Type]Constructor),
options: make(map[jobspb.Type]registerOptions),
}

func getRegisterOptions(typ jobspb.Type) (registerOptions, bool) {
globalMu.Lock()
defer globalMu.Unlock()
opts, ok := globalMu.options[typ]
return opts, ok
}

// RegisterConstructor registers a Resumer constructor for a certain job type.
//
Expand All @@ -1422,7 +1440,10 @@ var options = make(map[jobspb.Type]registerOptions)
// engineers to explicitly pass one of these options so that they will be
// prompted to think about which is appropriate for their new job type.
func RegisterConstructor(typ jobspb.Type, fn Constructor, opts ...RegisterOption) {
constructors[typ] = fn
globalMu.Lock()
defer globalMu.Unlock()

globalMu.constructors[typ] = fn

// Apply all options to the struct.
var resOpts registerOptions
Expand All @@ -1433,12 +1454,16 @@ func RegisterConstructor(typ jobspb.Type, fn Constructor, opts ...RegisterOption
panic("when registering a new job type, either jobs.DisablesTenantCostControl " +
"or jobs.UsesTenantCostControl is required; see comments for these options to learn more")
}
options[typ] = resOpts
globalMu.options[typ] = resOpts
}

func (r *Registry) createResumer(job *Job, settings *cluster.Settings) (Resumer, error) {
payload := job.Payload()
fn := constructors[payload.Type()]
fn := func() Constructor {
globalMu.Lock()
defer globalMu.Unlock()
return globalMu.constructors[payload.Type()]
}()
if fn == nil {
return nil, errors.Errorf("no resumer is available for %s", payload.Type())
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/multitenant_admin_function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqltestutils"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -827,6 +828,8 @@ func TestRelocateNonVoters(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 97320, "flaky test")

testCases := []testCase{
{
desc: "ALTER RANGE x RELOCATE NONVOTERS",
Expand Down

0 comments on commit 60e7c78

Please sign in to comment.