Skip to content

Commit

Permalink
Merge #52494 #52568
Browse files Browse the repository at this point in the history
52494: sql: add event logging for type DDL statements r=rohany a=rohany

Fixes #52411.

This commit adds event logging for type DDL statements.

Release note: None

52568: bulkio: Hook backup job resume to notify schedules upon completion. r=miretskiy a=miretskiy

Notify scheduled jobs system when backup job completes.

Release Notes: None

Co-authored-by: Rohan Yadav <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
3 people committed Aug 11, 2020
3 parents 29617e6 + a242e92 + 179fc2c commit a7c61be
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 72 deletions.
25 changes: 25 additions & 0 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/cloudimpl"
Expand Down Expand Up @@ -570,9 +571,27 @@ func (b *backupResumer) Resume(
}
}

b.maybeNotifyScheduledJobCompletion(ctx, jobs.StatusSucceeded, p.ExecCfg().InternalExecutor)

return nil
}

func (b *backupResumer) maybeNotifyScheduledJobCompletion(
ctx context.Context, jobStatus jobs.Status, ex sqlutil.InternalExecutor,
) {
if b.job.CreatedBy() == nil || b.job.CreatedBy().Name != jobs.CreatedByScheduledJobs {
return
}
info := b.job.CreatedBy()

if err := jobs.NotifyJobTermination(
ctx, nil /* env */, *b.job.ID(), jobStatus, info.ID, ex, nil); err != nil {
log.Warningf(ctx,
"failed to notify schedule %d of completion of job %d; err=%s",
info.ID, *b.job.ID(), err)
}
}

func (b *backupResumer) clearStats(ctx context.Context, DB *kv.DB) error {
details := b.job.Details().(jobspb.BackupDetails)
var backupManifest BackupManifest
Expand All @@ -593,6 +612,12 @@ func (b *backupResumer) clearStats(ctx context.Context, DB *kv.DB) error {

// OnFailOrCancel is part of the jobs.Resumer interface.
func (b *backupResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error {
defer b.maybeNotifyScheduledJobCompletion(
ctx,
jobs.StatusFailed,
phs.(sql.PlanHookState).ExecCfg().InternalExecutor,
)

telemetry.Count("backup.total.failed")
telemetry.CountBucketed("backup.duration-sec.failed",
int64(timeutil.Since(timeutil.FromUnixMicros(b.job.Payload().StartedMicros)).Seconds()))
Expand Down
7 changes: 1 addition & 6 deletions pkg/ccl/backupccl/schedule_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,7 @@ func (se *scheduledBackupExecutor) ExecuteJob(

// NotifyJobTermination implements jobs.ScheduledJobExecutor interface.
func (se *scheduledBackupExecutor) NotifyJobTermination(
ctx context.Context,
cfg *scheduledjobs.JobExecutionConfig,
env scheduledjobs.JobSchedulerEnv,
md *jobs.JobMetadata,
sj *jobs.ScheduledJob,
txn *kv.Txn,
ctx context.Context, jobID int64, jobStatus jobs.Status, schedule *jobs.ScheduledJob, txn *kv.Txn,
) error {
return errors.New("unimplemented yet")
}
Expand Down
11 changes: 3 additions & 8 deletions pkg/jobs/executor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,11 @@ func (e *inlineScheduledJobExecutor) ExecuteJob(

// NotifyJobTermination implements ScheduledJobExecutor interface.
func (e *inlineScheduledJobExecutor) NotifyJobTermination(
_ context.Context,
_ *scheduledjobs.JobExecutionConfig,
_ scheduledjobs.JobSchedulerEnv,
md *JobMetadata,
schedule *ScheduledJob,
_ *kv.Txn,
ctx context.Context, jobID int64, jobStatus Status, schedule *ScheduledJob, _ *kv.Txn,
) error {
// For now, only interested in failed status.
if md.Status == StatusFailed {
DefaultHandleFailedRun(schedule, md.ID, nil)
if jobStatus == StatusFailed {
DefaultHandleFailedRun(schedule, jobID, nil)
}
return nil
}
Expand Down
8 changes: 2 additions & 6 deletions pkg/jobs/executor_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,8 @@ func TestInlineExecutorFailedJobsHandling(t *testing.T) {
require.NoError(t, j.Create(ctx, h.cfg.InternalExecutor, nil))

// Pretend we failed running; we expect job to be rescheduled.
md := &JobMetadata{
ID: 123,
Status: "failed",
}

require.NoError(t, NotifyJobTermination(ctx, h.cfg, h.env, md, j.ScheduleID(), nil))
require.NoError(t, NotifyJobTermination(
ctx, h.env, 123, StatusFailed, j.ScheduleID(), h.cfg.InternalExecutor, nil))

// Verify nextRun updated
loaded := h.loadSchedule(t, j.ScheduleID())
Expand Down
7 changes: 1 addition & 6 deletions pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,7 @@ func (n *recordScheduleExecutor) ExecuteJob(
}

func (n *recordScheduleExecutor) NotifyJobTermination(
_ context.Context,
_ *scheduledjobs.JobExecutionConfig,
_ scheduledjobs.JobSchedulerEnv,
_ *JobMetadata,
_ *ScheduledJob,
_ *kv.Txn,
ctx context.Context, jobID int64, jobStatus Status, schedule *ScheduledJob, txn *kv.Txn,
) error {
return nil
}
Expand Down
21 changes: 8 additions & 13 deletions pkg/jobs/scheduled_job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ type ScheduledJobExecutor interface {
// Modifications to the ScheduledJob object will be persisted.
NotifyJobTermination(
ctx context.Context,
cfg *scheduledjobs.JobExecutionConfig,
env scheduledjobs.JobSchedulerEnv,
md *JobMetadata,
jobID int64,
jobStatus Status,
schedule *ScheduledJob,
txn *kv.Txn,
) error
Expand Down Expand Up @@ -94,36 +93,32 @@ func DefaultHandleFailedRun(schedule *ScheduledJob, jobID int64, err error) {
// with the job status changes.
func NotifyJobTermination(
ctx context.Context,
cfg *scheduledjobs.JobExecutionConfig,
env scheduledjobs.JobSchedulerEnv,
md *JobMetadata,
jobID int64,
jobStatus Status,
scheduleID int64,
ex sqlutil.InternalExecutor,
txn *kv.Txn,
) error {
if !md.Status.Terminal() {
return errors.Newf(
"job completion expects terminal state, found %s instead for job %d", md.Status, md.ID)
}

if env == nil {
env = scheduledjobs.ProdJobSchedulerEnv
}

// Get the executor for this schedule.
schedule, executor, err := lookupScheduleAndExecutor(
ctx, env, cfg.InternalExecutor, scheduleID, txn)
ctx, env, ex, scheduleID, txn)
if err != nil {
return err
}

// Delegate handling of the job termination to the executor.
err = executor.NotifyJobTermination(ctx, cfg, env, md, schedule, txn)
err = executor.NotifyJobTermination(ctx, jobID, jobStatus, schedule, txn)
if err != nil {
return err
}

// Update this schedule in case executor made changes to it.
return schedule.Update(ctx, cfg.InternalExecutor, txn)
return schedule.Update(ctx, ex, txn)
}

func lookupScheduleAndExecutor(
Expand Down
35 changes: 4 additions & 31 deletions pkg/jobs/scheduled_job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -39,14 +38,9 @@ func (s *statusTrackingExecutor) ExecuteJob(
}

func (s *statusTrackingExecutor) NotifyJobTermination(
_ context.Context,
_ *scheduledjobs.JobExecutionConfig,
_ scheduledjobs.JobSchedulerEnv,
md *JobMetadata,
_ *ScheduledJob,
_ *kv.Txn,
ctx context.Context, jobID int64, jobStatus Status, schedule *ScheduledJob, txn *kv.Txn,
) error {
s.counts[md.Status]++
s.counts[jobStatus]++
return nil
}

Expand All @@ -56,23 +50,6 @@ func newStatusTrackingExecutor() *statusTrackingExecutor {
return &statusTrackingExecutor{counts: make(map[Status]int)}
}

func TestNotifyJobTerminationExpectsTerminalState(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

for _, s := range []Status{
StatusPending, StatusRunning, StatusPaused, StatusReverting,
StatusCancelRequested, StatusPauseRequested,
} {
md := &JobMetadata{
ID: 123,
Status: s,
}
require.Error(t, NotifyJobTermination(
context.Background(), nil, nil, md, 321, nil))
}
}

func TestScheduledJobExecutorRegistration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -103,12 +80,8 @@ func TestJobTerminationNotification(t *testing.T) {

// Pretend it completes multiple runs with terminal statuses.
for _, s := range []Status{StatusCanceled, StatusFailed, StatusSucceeded} {
md := &JobMetadata{
ID: 123,
Status: s,
Payload: &jobspb.Payload{},
}
require.NoError(t, NotifyJobTermination(ctx, h.cfg, h.env, md, schedule.ScheduleID(), nil))
require.NoError(t, NotifyJobTermination(
ctx, h.env, 123, s, schedule.ScheduleID(), h.cfg.InternalExecutor, nil))
}

// Verify counts.
Expand Down
20 changes: 19 additions & 1 deletion pkg/sql/alter_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,25 @@ func (n *alterTypeNode) startExec(params runParams) error {
if err != nil {
return err
}
return n.desc.Validate(params.ctx, params.p.txn, params.ExecCfg().Codec)

// Validate the type descriptor after the changes.
if err := n.desc.Validate(params.ctx, params.p.txn, params.ExecCfg().Codec); err != nil {
return err
}

// Write a log event.
return MakeEventLogger(params.p.ExecCfg()).InsertEventRecord(
params.ctx,
params.p.txn,
EventLogAlterType,
int32(n.desc.ID),
int32(params.extendedEvalCtx.NodeID.SQLInstanceID()),
struct {
TypeName string
Statement string
User string
}{n.desc.Name, tree.AsStringWithFQNames(n.n, params.Ann()), params.p.User()},
)
}

func (p *planner) addEnumValue(
Expand Down
18 changes: 17 additions & 1 deletion pkg/sql/create_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,29 @@ func (p *planner) createEnum(params runParams, n *tree.CreateType) error {
typeDesc.ArrayTypeID = arrayTypeID

// Now create the type after the implicit array type as been created.
return p.createDescriptorWithID(
if err := p.createDescriptorWithID(
params.ctx,
typeKey.Key(params.ExecCfg().Codec),
id,
typeDesc,
params.EvalContext().Settings,
tree.AsStringWithFQNames(n, params.Ann()),
); err != nil {
return err
}

// Log the event.
return MakeEventLogger(p.ExecCfg()).InsertEventRecord(
params.ctx,
p.txn,
EventLogCreateType,
int32(typeDesc.GetID()),
int32(p.ExtendedEvalContext().NodeID.SQLInstanceID()),
struct {
TypeName string
Statement string
User string
}{typeName.FQString(), tree.AsStringWithFQNames(n, params.Ann()), p.User()},
)
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/sql/drop_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,21 @@ func (n *dropTypeNode) startExec(params runParams) error {
if err := params.p.dropTypeImpl(params.ctx, typ, tree.AsStringWithFQNames(n.n, params.Ann()), true /* queueJob */); err != nil {
return err
}
// Log a Drop Type event.
if err := MakeEventLogger(params.extendedEvalCtx.ExecCfg).InsertEventRecord(
params.ctx,
params.p.txn,
EventLogDropType,
int32(typ.ID),
int32(params.extendedEvalCtx.NodeID.SQLInstanceID()),
struct {
TypeName string
Statement string
User string
}{typ.Name, tree.AsStringWithFQNames(n.n, params.Ann()), params.SessionData().User},
); err != nil {
return err
}
}
return nil
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/event_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ const (
// initiated schema change rollback has completed.
EventLogFinishSchemaRollback EventLogType = "finish_schema_change_rollback"

// EventLogCreateType is recorded when a type is created.
EventLogCreateType EventLogType = "create_type"
// EventLogDropType is recorded when a type is dropped.
EventLogDropType EventLogType = "drop_type"
// EventAlterType is recorded when a type is altered.
EventLogAlterType EventLogType = "alter_type"

// EventLogNodeJoin is recorded when a node joins the cluster.
EventLogNodeJoin EventLogType = "node_join"
// EventLogNodeRestart is recorded when an existing node rejoins the cluster
Expand Down

0 comments on commit a7c61be

Please sign in to comment.