Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
77297: roachtest: Increase import size for sst-corruption r=nicktrav a=itsbilal

On rare occasion(s), the grep for an sstable containing table
could fail, presumably due to the lack of ample sstables containing
table keys. This change bumps up the tpcc warehouse count for the
import to reduce the likelihood of this happening.

Fixes cockroachdb#77077. Hopefully.

Release note: None.

77306: jobs: Ensure schedules are cancelled when scheduler disabled. r=miretskiy a=miretskiy

Ensure that currently executing schedules are cancelled immediately
when jobs scheduler disabled via the `jobs.scheduler.enabled` setting.

Fixes cockroachdb#77248

Release Note (enterprise change): Currently executing schedules are
cancelled immediately when jobs scheduler disabled.

Release Justification: stability improvement.

Co-authored-by: Bilal Akhtar <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
3 people committed Mar 3, 2022
3 parents 1304061 + 399b32e + 3661ab1 commit 1e1d559
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/sstable_corruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func runSSTableCorruption(ctx context.Context, t test.Test, c cluster.Cluster) {
// to have multiple ranges, and some sstables with only table keys.
t.Status("importing tpcc fixture")
c.Run(ctx, workloadNode,
"./cockroach workload fixtures import tpcc --warehouses=100 --fks=false --checks=false")
"./cockroach workload fixtures import tpcc --warehouses=500 --fks=false --checks=false")
return nil
})
m.Wait()
Expand Down
60 changes: 56 additions & 4 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)
Expand Down Expand Up @@ -375,6 +376,54 @@ func (s *jobScheduler) schedulerEnabledOnThisNode(ctx context.Context) bool {
return enabled
}

type syncCancelFunc struct {
syncutil.Mutex
context.CancelFunc
}

// newCancelWhenDisabled arranges for scheduler enabled setting callback to cancel
// currently executing context.
func newCancelWhenDisabled(sv *settings.Values) *syncCancelFunc {
sf := &syncCancelFunc{}
schedulerEnabledSetting.SetOnChange(sv, func(ctx context.Context) {
if !schedulerEnabledSetting.Get(sv) {
sf.Lock()
if sf.CancelFunc != nil {
sf.CancelFunc()
}
sf.Unlock()
}
})
return sf
}

// withCancelOnDisabled executes provided function with the context which will be cancelled
// if scheduler is disabled.
func (sf *syncCancelFunc) withCancelOnDisabled(
ctx context.Context, sv *settings.Values, f func(ctx context.Context) error,
) error {
ctx, cancel := func() (context.Context, context.CancelFunc) {
sf.Lock()
defer sf.Unlock()

ctx, cancel := context.WithCancel(ctx)
sf.CancelFunc = cancel

if !schedulerEnabledSetting.Get(sv) {
cancel()
}

return ctx, func() {
sf.Lock()
defer sf.Unlock()
cancel()
sf.CancelFunc = nil
}
}()
defer cancel()
return f(ctx)
}

func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
_ = stopper.RunAsyncTask(ctx, "job-scheduler", func(ctx context.Context) {
initialDelay := getInitialScanDelay(s.TestingKnobs)
Expand All @@ -384,6 +433,8 @@ func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
log.Errorf(ctx, "error registering executor metrics: %+v", err)
}

whenDisabled := newCancelWhenDisabled(&s.Settings.SV)

for timer := time.NewTimer(initialDelay); ; timer.Reset(
getWaitPeriod(ctx, &s.Settings.SV, s.schedulerEnabledOnThisNode, jitter, s.TestingKnobs)) {
select {
Expand All @@ -395,10 +446,11 @@ func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
}

maxSchedules := schedulerMaxJobsPerIterationSetting.Get(&s.Settings.SV)
err := s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return s.executeSchedules(ctx, maxSchedules, txn)
})
if err != nil {
if err := whenDisabled.withCancelOnDisabled(ctx, &s.Settings.SV, func(ctx context.Context) error {
return s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return s.executeSchedules(ctx, maxSchedules, txn)
})
}); err != nil {
log.Errorf(ctx, "error executing schedules: %+v", err)
}

Expand Down
81 changes: 81 additions & 0 deletions pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,3 +826,84 @@ func TestSchedulerCanBeRestrictedToSingleNode(t *testing.T) {
})
}
}

type blockUntilCancelledExecutor struct {
started, done chan struct{}
}

var _ ScheduledJobExecutor = (*blockUntilCancelledExecutor)(nil)

func (e *blockUntilCancelledExecutor) ExecuteJob(
ctx context.Context,
cfg *scheduledjobs.JobExecutionConfig,
env scheduledjobs.JobSchedulerEnv,
schedule *ScheduledJob,
txn *kv.Txn,
) error {
defer close(e.done)
close(e.started)
<-ctx.Done()
return ctx.Err()
}

func (e *blockUntilCancelledExecutor) NotifyJobTermination(
ctx context.Context,
jobID jobspb.JobID,
jobStatus Status,
details jobspb.Details,
env scheduledjobs.JobSchedulerEnv,
schedule *ScheduledJob,
ex sqlutil.InternalExecutor,
txn *kv.Txn,
) error {
return nil
}

func (e *blockUntilCancelledExecutor) Metrics() metric.Struct {
return nil
}

func (e *blockUntilCancelledExecutor) GetCreateScheduleStatement(
ctx context.Context,
env scheduledjobs.JobSchedulerEnv,
txn *kv.Txn,
sj *ScheduledJob,
ex sqlutil.InternalExecutor,
) (string, error) {
return "", errors.AssertionFailedf("unexpected GetCreateScheduleStatement call")
}

func TestDisablingSchedulerCancelsSchedules(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const executorName = "block-until-cancelled-executor"
ex := &blockUntilCancelledExecutor{
started: make(chan struct{}),
done: make(chan struct{}),
}
defer registerScopedScheduledJobExecutor(executorName, ex)()

knobs := base.TestingKnobs{
JobsTestingKnobs: fastDaemonKnobs(overridePaceSetting(10 * time.Millisecond)),
}
ts, _, _ := serverutils.StartServer(t, base.TestServerArgs{Knobs: knobs})
defer ts.Stopper().Stop(context.Background())

// Create schedule which blocks until its context cancelled due to disabled scheduler.
// We only need to create one schedule. This is because
// scheduler executes its batch of schedules sequentially, and so, creating more
// than one doesn't change anything since we block.
schedule := NewScheduledJob(scheduledjobs.ProdJobSchedulerEnv)
schedule.SetScheduleLabel("test schedule")
schedule.SetOwner(security.TestUserName())
schedule.SetNextRun(timeutil.Now())
schedule.SetExecutionDetails(executorName, jobspb.ExecutionArguments{})
require.NoError(t, schedule.Create(
context.Background(), ts.InternalExecutor().(sqlutil.InternalExecutor), nil))

<-ex.started
// Disable scheduler and verify all running schedules were cancelled.
schedulerEnabledSetting.Override(context.Background(), &ts.ClusterSettings().SV, false)
<-ex.done
}

0 comments on commit 1e1d559

Please sign in to comment.