From d9726d2ea3661b5dda1fd7ccd1a5ba6e05d8c8fc Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 2 Nov 2022 13:53:11 -0700 Subject: [PATCH 1/4] sql: do not print stack trace when logging if txn is not open After executing each statement, that statement might be logged. If there were any audit events, then we attempt to resolve the table names for which the audit events have occurred. To do the resolution we're using the current txn. Previously, if that txn has been aborted or committed, it would result in a scary-looking stack trace added to the log, and this commit fixes it. Release note: None --- pkg/sql/exec_log.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 4ce38aa6d11a..dfba246c1712 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) @@ -167,6 +168,8 @@ func (p *planner) maybeLogStatement( p.maybeLogStatementInternal(ctx, execType, isCopy, numRetries, txnCounter, rows, err, queryReceived, hasAdminRoleCache, telemetryLoggingMetrics, stmtFingerprintID, queryStats) } +var errTxnIsNotOpen = errors.New("txn is already committed or rolled back") + func (p *planner) maybeLogStatementInternal( ctx context.Context, execType executorType, @@ -323,13 +326,19 @@ func (p *planner) maybeLogStatementInternal( mode = "rw" } tableName := "" + var tn *tree.TableName // We only have a valid *table* name if the object being // audited is table-like (includes view, sequence etc). For // now, this is sufficient because the auditing feature can // only audit tables. If/when the mechanisms are extended to // audit databases and schema, we need more logic here to // extract a name to include in the logging events. - tn, err := p.getQualifiedTableName(ctx, ev.desc) + if p.txn != nil && p.txn.IsOpen() { + // Only open txn accepts further commands. + tn, err = p.getQualifiedTableName(ctx, ev.desc) + } else { + err = errTxnIsNotOpen + } if err != nil { log.Warningf(ctx, "name for audited table ID %d not found: %v", ev.desc.GetID(), err) } else { From 2ba983d1731548db3f834c3995f5568f1443f4d0 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Wed, 9 Nov 2022 00:17:41 +0000 Subject: [PATCH 2/4] jobs: clear job claim after execution Since #89014 the job system reset a job's claim when transitioning it from pause-requested to paused and from cancel-requested to reverting. The job system signals these transitions to the running Resumer by cancelling the job's context and does not wait for the resumer to exit. Once the claim is clear, another node can adopt the job and start running it's OnFailOrCancel callback. As a result, clearing the context makes it more likely that OnFailOrCancel executions will overlap with Resume executions. In general, Jobs need to assume that Resume may still be running while OnFailOrCancel is called. But, making it more likely isn't in our interest. Here, we only clear the lease when we exit the job state machine. This makes it much more likely that OnFailOrCancel doesn't start until Resume has returned. Release note: None --- pkg/ccl/changefeedccl/BUILD.bazel | 1 + .../changefeedccl/alter_changefeed_test.go | 3 +- pkg/jobs/adopt.go | 41 +++++++++++++-- pkg/jobs/jobs_test.go | 51 +++++++++++++++++-- pkg/jobs/registry.go | 2 +- pkg/testutils/jobutils/jobs_verification.go | 13 +++++ 6 files changed, 101 insertions(+), 10 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 5cccc7d139bf..ce5b5782aeb6 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -250,6 +250,7 @@ go_test( "//pkg/sql/types", "//pkg/storage", "//pkg/testutils", + "//pkg/testutils/jobutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index 39aefc5108fb..759df1a35439 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -426,7 +427,7 @@ func TestAlterChangefeedTelemetry(t *testing.T) { feed := testFeed.(cdctest.EnterpriseTestFeed) require.NoError(t, feed.Pause()) - + jobutils.WaitForJobToHaveNoLease(t, sqlDB, feed.JobID()) sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d DROP bar, foo ADD baz UNSET diff SET resolved, format=json`, feed.JobID())) counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts) diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 69a7c6ed399e..24557ddcada4 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -417,26 +417,57 @@ func (r *Registry) runJob( log.Errorf(ctx, "job %d: adoption completed with error %v", job.ID(), err) } - r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err) r.maybeRecordExecutionFailure(ctx, err, job) + // NB: After this point, the job may no longer have the claim + // and further updates to the job record from this node may + // fail. + r.maybeClearLease(job, err) + r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err) if r.knobs.AfterJobStateMachine != nil { r.knobs.AfterJobStateMachine() } return err } +const clearClaimQuery = ` + UPDATE system.jobs + SET claim_session_id = NULL, claim_instance_id = NULL + WHERE id = $1 + AND claim_session_id = $2 + AND claim_instance_id = $3 + AND status NOT IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')` + +// maybeClearLease clears the claim on the given job, provided that +// the current lease matches our liveness Session. +func (r *Registry) maybeClearLease(job *Job, jobErr error) { + if jobErr == nil { + return + } + + // We use the serverCtx here rather than the context from the + // caller since the caller's context may have been canceled. + r.withSession(r.serverCtx, func(ctx context.Context, s sqlliveness.Session) { + n, err := r.ex.ExecEx(ctx, "clear-job-claim", nil, /* txn */ + sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, + clearClaimQuery, job.ID(), s.ID().UnsafeBytes(), r.ID()) + if err != nil { + log.Warningf(ctx, "could not clear job claim: %s", err.Error()) + return + } + log.VEventf(ctx, 2, "cleared leases for %d jobs", n) + }) +} + const pauseAndCancelUpdate = ` UPDATE system.jobs - SET status = + SET status = CASE WHEN status = '` + string(StatusPauseRequested) + `' THEN '` + string(StatusPaused) + `' WHEN status = '` + string(StatusCancelRequested) + `' THEN '` + string(StatusReverting) + `' ELSE status END, num_runs = 0, - last_run = NULL, - claim_session_id = NULL, - claim_instance_id = NULL + last_run = NULL WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')) AND ((claim_session_id = $1) AND (claim_instance_id = $2)) RETURNING id, status diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 243cab6fd063..a312245fd419 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -21,6 +21,7 @@ import ( "runtime/pprof" "sort" "strings" + "sync" "sync/atomic" "testing" "time" @@ -1211,10 +1212,11 @@ func TestJobLifecycle(t *testing.T) { done := make(chan struct{}) defer close(done) - + resumeSignaler := newResumeStartedSignaler() jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{ OnResume: func(ctx context.Context) error { + resumeSignaler.SignalResumeStarted() select { case <-ctx.Done(): return ctx.Err() @@ -1469,6 +1471,10 @@ func TestJobLifecycle(t *testing.T) { t.Fatal(err) } + // Wait for job to be adopted so that we have the + // lease and can move to succeeded. + resumeSignaler.WaitForResumeStarted() + // PauseRequested fails after job is successful. if err := job.Succeeded(ctx); err != nil { t.Fatal(err) @@ -3109,6 +3115,35 @@ func checkBundle(t *testing.T, zipFile string, expectedFiles []string) { require.Equal(t, expectedFiles, filesInZip) } +type resumeStartedSignaler struct { + syncutil.Mutex + cond *sync.Cond + isStarted bool +} + +func newResumeStartedSignaler() *resumeStartedSignaler { + ret := &resumeStartedSignaler{} + ret.cond = sync.NewCond(&ret.Mutex) + return ret + +} + +func (r *resumeStartedSignaler) SignalResumeStarted() { + r.Lock() + r.isStarted = true + r.cond.Signal() + r.Unlock() +} + +func (r *resumeStartedSignaler) WaitForResumeStarted() { + r.Lock() + for !r.isStarted { + r.cond.Wait() + } + r.isStarted = false + r.Unlock() +} + // TestPauseReason tests pausing a job with a user specified reason. func TestPauseReason(t *testing.T) { defer leaktest.AfterTest(t)() @@ -3125,10 +3160,11 @@ func TestPauseReason(t *testing.T) { done := make(chan struct{}) defer close(done) - + resumeSignaler := newResumeStartedSignaler() jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{ OnResume: func(ctx context.Context) error { + resumeSignaler.SignalResumeStarted() select { case <-ctx.Done(): return ctx.Err() @@ -3160,9 +3196,16 @@ func TestPauseReason(t *testing.T) { return n } mustNotHaveClaim := func() { - require.Equal(t, 0, countRowsWithClaimInfo()) + t.Helper() + testutils.SucceedsSoon(t, func() error { + if countRowsWithClaimInfo() == 0 { + return nil + } + return errors.New("still waiting for claim to clear") + }) } mustHaveClaim := func() { + t.Helper() testutils.SucceedsSoon(t, func() error { if countRowsWithClaimInfo() == 1 { return nil @@ -3175,6 +3218,7 @@ func TestPauseReason(t *testing.T) { q := fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", jobID) tdb.CheckQueryResultsRetry(t, q, [][]string{{"running"}}) mustHaveClaim() + resumeSignaler.WaitForResumeStarted() getStatusAndPayload := func(t *testing.T, id jobspb.JobID) (string, jobspb.Payload) { var payloadBytes []byte @@ -3208,6 +3252,7 @@ func TestPauseReason(t *testing.T) { checkStatusAndPauseReason(t, jobID, "running", "for testing") mustHaveClaim() + resumeSignaler.WaitForResumeStarted() } { // Pause the job again with a different reason. Verify that the job is paused with the reason. diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index af16ea4a9470..ce7d1a8e3e41 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -1576,7 +1576,7 @@ func (r *Registry) maybeRecordExecutionFailure(ctx context.Context, err error, j return } if updateErr != nil { - log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, err) + log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, updateErr) } } diff --git a/pkg/testutils/jobutils/jobs_verification.go b/pkg/testutils/jobutils/jobs_verification.go index 1872596d41c7..be8febec0f5b 100644 --- a/pkg/testutils/jobutils/jobs_verification.go +++ b/pkg/testutils/jobutils/jobs_verification.go @@ -89,6 +89,19 @@ func waitForJobToHaveStatus( }, 2*time.Minute) } +func WaitForJobToHaveNoLease(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) { + t.Helper() + testutils.SucceedsWithin(t, func() error { + var sessionID []byte + var instanceID gosql.NullInt64 + db.QueryRow(t, `SELECT claim_session_id, claim_instance_id FROM system.jobs WHERE id = $1`, jobID).Scan(&sessionID, &instanceID) + if sessionID == nil && !instanceID.Valid { + return nil + } + return errors.Newf("job %d still has claim information") + }, 2*time.Minute) +} + // RunJob runs the provided job control statement, initializing, notifying and // closing the chan at the passed pointer (see below for why) and returning the // jobID and error result. PAUSE JOB and CANCEL JOB are racy in that it's hard From deb0896bdd7e43ab457f6edb5a2834067ee87cfa Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 10 Nov 2022 12:09:51 +0000 Subject: [PATCH 3/4] jobs: add log scopes to tests Release note: None --- pkg/jobs/delegate_control_test.go | 7 +++++++ pkg/jobs/lease_test.go | 2 ++ pkg/jobs/registry_test.go | 2 ++ 3 files changed, 11 insertions(+) diff --git a/pkg/jobs/delegate_control_test.go b/pkg/jobs/delegate_control_test.go index 0272f38acec2..e6f2ceb045b5 100644 --- a/pkg/jobs/delegate_control_test.go +++ b/pkg/jobs/delegate_control_test.go @@ -24,12 +24,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) func TestScheduleControl(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + th, cleanup := newTestHelper(t) defer cleanup() @@ -139,6 +142,8 @@ func TestScheduleControl(t *testing.T) { func TestJobsControlForSchedules(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables, nil) defer cleanup() @@ -247,6 +252,7 @@ func TestJobsControlForSchedules(t *testing.T) { // jobs prior to executing the control command. func TestFilterJobsControlForSchedules(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) defer ResetConstructors()() argsFn := func(args *base.TestServerArgs) { @@ -327,6 +333,7 @@ func TestFilterJobsControlForSchedules(t *testing.T) { func TestJobControlByType(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) defer ResetConstructors()() argsFn := func(args *base.TestServerArgs) { diff --git a/pkg/jobs/lease_test.go b/pkg/jobs/lease_test.go index ba9c2d027fcf..816882de5ee9 100644 --- a/pkg/jobs/lease_test.go +++ b/pkg/jobs/lease_test.go @@ -20,12 +20,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) func TestJobsTableClaimFamily(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) diff --git a/pkg/jobs/registry_test.go b/pkg/jobs/registry_test.go index e9f0c18c4a39..d8aad354bd0b 100644 --- a/pkg/jobs/registry_test.go +++ b/pkg/jobs/registry_test.go @@ -973,6 +973,7 @@ func TestRunWithoutLoop(t *testing.T) { func TestJobIdleness(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() intervalOverride := time.Millisecond @@ -1111,6 +1112,7 @@ func TestJobIdleness(t *testing.T) { // allow other job registries in the cluster to claim and run this job. func TestDisablingJobAdoptionClearsClaimSessionID(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) intervalOverride := time.Millisecond s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ From 4e6730e50225091fe880afebf4858ac145049c05 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 14 Nov 2022 13:24:03 -0800 Subject: [PATCH 4/4] colflow: temporarily disable test assertion about closers Release note: None --- pkg/sql/colflow/vectorized_flow.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 3a8e7dc0ed21..ec5aa8289076 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -384,17 +384,18 @@ func (f *vectorizedFlow) Cleanup(ctx context.Context) { // This cleans up all the memory and disk monitoring of the vectorized flow. f.creator.cleanup(ctx) - if buildutil.CrdbTestBuild && f.FlowBase.Started() && !f.FlowCtx.EvalCtx.SessionData().TestingVectorizeInjectPanics { - // Check that all closers have been closed. Note that we don't check - // this in case the flow was never started in the first place (it is ok - // to not check this since closers haven't allocated any resources in - // such a case). We also don't check when the panic injection is - // enabled since then Close() might be legitimately not called (if a - // panic is injected in Init() of the wrapped operator). - if numClosed := atomic.LoadInt32(f.testingInfo.numClosed); numClosed != f.testingInfo.numClosers { - colexecerror.InternalError(errors.AssertionFailedf("expected %d components to be closed, but found that only %d were", f.testingInfo.numClosers, numClosed)) - } - } + // TODO(yuzefovich): uncomment this once the assertion is no longer flaky. + //if buildutil.CrdbTestBuild && f.FlowBase.Started() && !f.FlowCtx.EvalCtx.SessionData().TestingVectorizeInjectPanics { + // // Check that all closers have been closed. Note that we don't check + // // this in case the flow was never started in the first place (it is ok + // // to not check this since closers haven't allocated any resources in + // // such a case). We also don't check when the panic injection is + // // enabled since then Close() might be legitimately not called (if a + // // panic is injected in Init() of the wrapped operator). + // if numClosed := atomic.LoadInt32(f.testingInfo.numClosed); numClosed != f.testingInfo.numClosers { + // colexecerror.InternalError(errors.AssertionFailedf("expected %d components to be closed, but found that only %d were", f.testingInfo.numClosers, numClosed)) + // } + //} f.tempStorage.Lock() created := f.tempStorage.path != ""