From afa9b6d6f33c040ee935ced67fd1028d5f0a4151 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Fri, 26 Mar 2021 09:51:38 -0400 Subject: [PATCH] log,importccl,backupccl: add event logs for import and restore jobs This change adds a new `CommonJobEventDetails` and two implementations of the interface in `Import` and `Restore`, to structure the events emitted during different phases of the job execution. Both Import/Restore will emit logs at the following stages: - When the job has completed preparing and is beginning execution. - When the job has succeeded. - When the job has started reverting. - When the job has finished reverting and is in a terminal failed state. Release note (sql change): IMPORT and RESTORE now emit structured events to the OPS channel. Refer to the generated documentation for details. --- Makefile | 3 +- docs/generated/eventlog.md | 59 ++ pkg/ccl/backupccl/BUILD.bazel | 4 + pkg/ccl/backupccl/backup_test.go | 63 ++ pkg/ccl/backupccl/restore_job.go | 34 +- pkg/ccl/backupccl/testutils.go | 68 +++ pkg/ccl/importccl/BUILD.bazel | 1 + pkg/ccl/importccl/import_stmt.go | 55 +- pkg/ccl/importccl/import_stmt_test.go | 91 ++- pkg/jobs/jobs.go | 20 + pkg/sql/event_log.go | 40 ++ pkg/ui/src/util/eventTypes.ts | 11 + pkg/ui/src/util/events.ts | 6 + pkg/util/log/eventpb/BUILD.bazel | 1 + .../eventpb/eventlog_channels_generated.go | 6 + pkg/util/log/eventpb/events.go | 12 + pkg/util/log/eventpb/events.pb.go | 475 ++++++++++++++- pkg/util/log/eventpb/events.proto | 27 +- pkg/util/log/eventpb/gen.go | 10 + pkg/util/log/eventpb/job_events.pb.go | 564 ++++++++++++++++++ pkg/util/log/eventpb/job_events.proto | 45 ++ pkg/util/log/eventpb/json_encode_generated.go | 94 +++ 22 files changed, 1651 insertions(+), 38 deletions(-) create mode 100644 pkg/ccl/backupccl/testutils.go create mode 100644 pkg/util/log/eventpb/job_events.pb.go create mode 100644 pkg/util/log/eventpb/job_events.proto diff --git a/Makefile b/Makefile index 7c2e793787af..8741d4eef016 100644 --- a/Makefile +++ b/Makefile @@ -1576,7 +1576,8 @@ EVENTLOG_PROTOS = \ pkg/util/log/eventpb/zone_events.proto \ pkg/util/log/eventpb/session_events.proto \ pkg/util/log/eventpb/sql_audit_events.proto \ - pkg/util/log/eventpb/cluster_events.proto + pkg/util/log/eventpb/cluster_events.proto \ + pkg/util/log/eventpb/job_events.proto LOGSINKDOC_DEP = pkg/util/log/logconfig/config.go diff --git a/docs/generated/eventlog.md b/docs/generated/eventlog.md index 85903e7dc598..94e0d1f8f375 100644 --- a/docs/generated/eventlog.md +++ b/docs/generated/eventlog.md @@ -137,6 +137,65 @@ after being offline. | `StartedAt` | The time when this node was last started. | no | | `LastUp` | The approximate last time the node was up before the last restart. | no | +## Job events + +Events in this category pertain to long-running jobs that are orchestrated by +a node's job registry. These system processes can create and/or modify stored +objects during the course of their execution. + +A job might choose to emit multiple events during its execution when +transitioning from one "state" to another. +Egs: IMPORT/RESTORE will emit events on job creation and successful +completion. If the job fails, events will be emitted on job creation, +failure, and successful revert. + +Events in this category are logged to channel OPS. + + +### `import` + +An event of type `import` is recorded when an import job is created and successful completion. +If the job fails, events will be emitted on job creation, failure, and +successful revert. + + + + +#### Common fields + +| Field | Description | Sensitive | +|--|--|--| +| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no | +| `EventType` | The type of the event. | no | +| `JobID` | The ID of the job that triggered the event. | no | +| `JobType` | The type of the job that triggered the event. | no | +| `Description` | A description of the job that triggered the event. Some jobs populate the description with an approximate representation of the SQL statement run to create the job. | yes | +| `User` | The user account that triggered the event. | yes | +| `DescriptorIDs` | The object descriptors affected by the job. Set to zero for operations that don't affect descriptors. | yes | +| `Status` | The status of the job that triggered the event. This allows the job to indicate which phase execution it is in when the event is triggered. | no | + +### `restore` + +An event of type `restore` is recorded when a restore job is created and successful completion. +If the job fails, events will be emitted on job creation, failure, and +successful revert. + + + + +#### Common fields + +| Field | Description | Sensitive | +|--|--|--| +| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no | +| `EventType` | The type of the event. | no | +| `JobID` | The ID of the job that triggered the event. | no | +| `JobType` | The type of the job that triggered the event. | no | +| `Description` | A description of the job that triggered the event. Some jobs populate the description with an approximate representation of the SQL statement run to create the job. | yes | +| `User` | The user account that triggered the event. | yes | +| `DescriptorIDs` | The object descriptors affected by the job. Set to zero for operations that don't affect descriptors. | yes | +| `Status` | The status of the job that triggered the event. This allows the job to indicate which phase execution it is in when the event is triggered. | no | + ## Miscellaneous SQL events Events in this category report miscellaneous SQL events. diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 74ad19543be1..653ddaa65e81 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "split_and_scatter_processor.go", "system_schema.go", "targets.go", + "testutils.go", ], embed = [":backupccl_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/backupccl", @@ -86,11 +87,13 @@ go_library( "//pkg/storage", "//pkg/storage/cloud", "//pkg/storage/cloudimpl", + "//pkg/testutils", "//pkg/util/ctxgroup", "//pkg/util/encoding", "//pkg/util/hlc", "//pkg/util/interval", "//pkg/util/log", + "//pkg/util/log/eventpb", "//pkg/util/metric", "//pkg/util/protoutil", "//pkg/util/retry", @@ -104,6 +107,7 @@ go_library( "@com_github_gogo_protobuf//types", "@com_github_gorhill_cronexpr//:cronexpr", "@com_github_lib_pq//oid", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 4a68e4637e8f..2bb29bb7f34b 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -7707,3 +7707,66 @@ table_name FROM [SHOW TABLES] ORDER BY schema_name, table_name` return nil }) } + +func TestRestoreJobEventLogging(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.ScopeWithoutShowLogs(t).Close(t) + + defer jobs.TestingSetProgressThresholds()() + defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)() + + baseDir := "testdata" + args := base.TestServerArgs{ExternalIODir: baseDir} + params := base.TestClusterArgs{ServerArgs: args} + _, tc, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, 1, + InitManualReplication, params) + defer cleanupFn() + + var forceFailure bool + for i := range tc.Servers { + tc.Servers[i].JobRegistry().(*jobs.Registry).TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{ + jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer { + r := raw.(*restoreResumer) + r.testingKnobs.beforePublishingDescriptors = func() error { + if forceFailure { + return errors.New("testing injected failure") + } + return nil + } + return r + }, + } + } + + sqlDB.Exec(t, `CREATE DATABASE r1`) + sqlDB.Exec(t, `CREATE TABLE r1.foo (id INT)`) + sqlDB.Exec(t, `BACKUP DATABASE r1 TO 'nodelocal://0/eventlogging'`) + sqlDB.Exec(t, `DROP DATABASE r1`) + + beforeRestore := timeutil.Now() + restoreQuery := `RESTORE DATABASE r1 FROM 'nodelocal://0/eventlogging'` + + var jobID int64 + var unused interface{} + sqlDB.QueryRow(t, restoreQuery).Scan(&jobID, &unused, &unused, &unused, &unused, + &unused) + + expectedStatus := []string{string(jobs.StatusSucceeded), string(jobs.StatusRunning)} + CheckEmittedEvents(t, expectedStatus, beforeRestore.UnixNano(), jobID, "restore", + "RESTORE") + + sqlDB.Exec(t, `DROP DATABASE r1`) + + // Now let's test the events that are emitted when a job fails. + forceFailure = true + beforeSecondRestore := timeutil.Now() + sqlDB.ExpectErrSucceedsSoon(t, "testing injected failure", restoreQuery) + + row := sqlDB.QueryRow(t, "SELECT job_id FROM [SHOW JOBS] WHERE status = 'failed'") + row.Scan(&jobID) + + expectedStatus = []string{string(jobs.StatusFailed), string(jobs.StatusReverting), + string(jobs.StatusRunning)} + CheckEmittedEvents(t, expectedStatus, beforeSecondRestore.UnixNano(), jobID, + "restore", "RESTORE") +} diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 5fd731f48d84..acf3d98381dc 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -49,6 +49,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/interval" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -1345,6 +1346,9 @@ func createImportingDescriptors( // Update the job once all descs have been prepared for ingestion. err := r.job.SetDetails(ctx, txn, details) + // Emit to the event log now that the job has finished preparing descs. + emitRestoreJobEvent(ctx, p, jobs.StatusRunning, r.job) + return err }) if err != nil { @@ -1507,6 +1511,7 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error return err } } + emitRestoreJobEvent(ctx, p, jobs.StatusSucceeded, r.job) return nil } @@ -1613,6 +1618,9 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error r.restoreStats = resTotal + // Emit an event now that the restore job has completed. + emitRestoreJobEvent(ctx, p, jobs.StatusSucceeded, r.job) + // Collect telemetry. { numClusterNodes, err := clusterNodeCount(p.ExecCfg().Gossip) @@ -1850,11 +1858,28 @@ func (r *restoreResumer) publishDescriptors( return nil } +func emitRestoreJobEvent( + ctx context.Context, p sql.JobExecContext, status jobs.Status, job *jobs.Job, +) { + // Emit to the event log now that we have completed the prepare step. + var restoreEvent eventpb.Restore + if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + return sql.LogEventForJobs(ctx, p.ExecCfg(), txn, &restoreEvent, int64(job.ID()), + job.Payload(), p.User(), status) + }); err != nil { + log.Warningf(ctx, "failed to log event: %v", err) + } +} + // OnFailOrCancel is part of the jobs.Resumer interface. Removes KV data that // has been committed from a restore that has failed or been canceled. It does // this by adding the table descriptors in DROP state, which causes the schema // change stuff to delete the keys in the background. func (r *restoreResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { + p := execCtx.(sql.JobExecContext) + // Emit to the event log that the job has started reverting. + emitRestoreJobEvent(ctx, p, jobs.StatusReverting, r.job) + telemetry.Count("restore.total.failed") telemetry.CountBucketed("restore.duration-sec.failed", int64(timeutil.Since(timeutil.FromUnixMicros(r.job.Payload().StartedMicros)).Seconds())) @@ -1862,7 +1887,7 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, execCtx interface{} details := r.job.Details().(jobspb.RestoreDetails) execCfg := execCtx.(sql.JobExecContext).ExecCfg() - return descs.Txn(ctx, execCfg.Settings, execCfg.LeaseManager, execCfg.InternalExecutor, + err := descs.Txn(ctx, execCfg.Settings, execCfg.LeaseManager, execCfg.InternalExecutor, execCfg.DB, func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error { for _, tenant := range details.Tenants { tenant.State = descpb.TenantInfo_DROP @@ -1874,6 +1899,13 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, execCtx interface{} } return r.dropDescriptors(ctx, execCfg.JobRegistry, execCfg.Codec, txn, descsCol) }) + if err != nil { + return err + } + + // Emit to the event log that the job has completed reverting. + emitRestoreJobEvent(ctx, p, jobs.StatusFailed, r.job) + return nil } // dropDescriptors implements the OnFailOrCancel logic. diff --git a/pkg/ccl/backupccl/testutils.go b/pkg/ccl/backupccl/testutils.go new file mode 100644 index 000000000000..706353dfbf2a --- /dev/null +++ b/pkg/ccl/backupccl/testutils.go @@ -0,0 +1,68 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl + +import ( + "encoding/json" + "math" + "regexp" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// CheckEmittedEvents is a helper method used by IMPORT and RESTORE tests to +// ensure events are emitted deterministically. +func CheckEmittedEvents( + t *testing.T, + expectedStatus []string, + startTime int64, + jobID int64, + expectedMessage, expectedJobType string, +) { + // Check that the structured event was logged. + testutils.SucceedsSoon(t, func() error { + log.Flush() + entries, err := log.FetchEntriesFromFiles(startTime, + math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData) + if err != nil { + t.Fatal(err) + } + foundEntry := false + for i, e := range entries { + if !strings.Contains(e.Message, expectedMessage) { + continue + } + foundEntry = true + // TODO(knz): Remove this when crdb-v2 becomes the new format. + e.Message = strings.TrimPrefix(e.Message, "Structured entry:") + // crdb-v2 starts json with an equal sign. + e.Message = strings.TrimPrefix(e.Message, "=") + jsonPayload := []byte(e.Message) + var ev eventpb.CommonJobEventDetails + if err := json.Unmarshal(jsonPayload, &ev); err != nil { + t.Errorf("unmarshalling %q: %v", e.Message, err) + } + require.Equal(t, expectedJobType, ev.JobType) + require.Equal(t, jobID, ev.JobID) + require.Equal(t, expectedStatus[i], ev.Status) + } + if !foundEntry { + return errors.New("structured entry for import not found in log") + } + return nil + }) +} + +var cmLogRe = regexp.MustCompile(`event_log\.go`) diff --git a/pkg/ccl/importccl/BUILD.bazel b/pkg/ccl/importccl/BUILD.bazel index f62c6884da05..f6a39e743c16 100644 --- a/pkg/ccl/importccl/BUILD.bazel +++ b/pkg/ccl/importccl/BUILD.bazel @@ -73,6 +73,7 @@ go_library( "//pkg/util/hlc", "//pkg/util/humanizeutil", "//pkg/util/log", + "//pkg/util/log/eventpb", "//pkg/util/protoutil", "//pkg/util/retry", "//pkg/util/timeutil", diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 7210bbf794e0..397e6f760b71 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -59,6 +59,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -1853,6 +1854,19 @@ type preparedSchemaMetadata struct { queuedSchemaJobs []jobspb.JobID } +func emitImportJobEvent( + ctx context.Context, p sql.JobExecContext, status jobs.Status, job *jobs.Job, +) { + // Emit to the event log now that we have completed the prepare step. + var importEvent eventpb.Import + if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + return sql.LogEventForJobs(ctx, p.ExecCfg(), txn, &importEvent, int64(job.ID()), + job.Payload(), p.User(), status) + }); err != nil { + log.Warningf(ctx, "failed to log event: %v", err) + } +} + // Resume is part of the jobs.Resumer interface. func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { p := execCtx.(sql.JobExecContext) @@ -1907,7 +1921,36 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { // Update the job details now that the schemas and table descs have // been "prepared". - return r.job.SetDetails(ctx, txn, preparedDetails) + err = r.job.SetDetails(ctx, txn, preparedDetails) + if err != nil { + return err + } + + // Update the job record with the schema and table IDs we will be + // ingesting into. + err = r.job.SetDescriptorIDs(ctx, txn, func(ctx context.Context, + descIDs []descpb.ID) ([]descpb.ID, error) { + var descriptorIDs []descpb.ID + if descIDs == nil { + for _, schema := range preparedDetails.Schemas { + descriptorIDs = append(descriptorIDs, schema.Desc.GetID()) + } + for _, table := range preparedDetails.Tables { + descriptorIDs = append(descriptorIDs, table.Desc.GetID()) + } + return descriptorIDs, nil + } + log.Warningf(ctx, "unexpected descriptor IDs %+v set in import job %d", descIDs, + r.job.ID()) + return nil, nil + }) + if err != nil { + // We don't want to fail the import if we fail to update the + // descriptor IDs as this is only for observability. + log.Warningf(ctx, "failed to update import job %d with target descriptor IDs", + r.job.ID()) + } + return nil }) if err != nil { return err @@ -1926,6 +1969,7 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { // Re-initialize details after prepare step. details = r.job.Details().(jobspb.ImportDetails) + emitImportJobEvent(ctx, p, jobs.StatusRunning, r.job) } // Create a mapping from schemaID to schemaName. @@ -2041,6 +2085,8 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { } } + emitImportJobEvent(ctx, p, jobs.StatusSucceeded, r.job) + addToFileFormatTelemetry(details.Format.Format.String(), "succeeded") telemetry.CountBucketed("import.rows", r.res.Rows) const mb = 1 << 20 @@ -2232,6 +2278,10 @@ func (r *importResumer) publishTables(ctx context.Context, execCfg *sql.Executor // stuff to delete the keys in the background. func (r *importResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { p := execCtx.(sql.JobExecContext) + + // Emit to the event log that the job has started reverting. + emitImportJobEvent(ctx, p, jobs.StatusReverting, r.job) + details := r.job.Details().(jobspb.ImportDetails) addToFileFormatTelemetry(details.Format.Format.String(), "failed") cfg := execCtx.(sql.JobExecContext).ExecCfg() @@ -2270,6 +2320,9 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) } } + // Emit to the event log that the job has completed reverting. + emitImportJobEvent(ctx, p, jobs.StatusFailed, r.job) + return nil } diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 2c03d53efd1a..8f79d6e2fd40 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -1983,9 +1983,20 @@ func TestImportCSVStmt(t *testing.T) { jobPrefix := fmt.Sprintf(`IMPORT TABLE %s.public.t (a INT8 PRIMARY KEY, b STRING, INDEX (b), INDEX (a, b))`, intodb) + var intodbID descpb.ID + sqlDB.QueryRow(t, fmt.Sprintf(`SELECT id FROM system.namespace WHERE name = '%s'`, + intodb)).Scan(&intodbID) + var publicSchemaID descpb.ID + sqlDB.QueryRow(t, fmt.Sprintf(`SELECT id FROM system.namespace WHERE name = '%s'`, + tree.PublicSchema)).Scan(&publicSchemaID) + var tableID int64 + sqlDB.QueryRow(t, `SELECT id FROM system.namespace WHERE "parentID" = $1 AND "parentSchemaID" = $2`, + intodbID, publicSchemaID).Scan(&tableID) + if err := jobutils.VerifySystemJob(t, sqlDB, testNum, jobspb.TypeImport, jobs.StatusSucceeded, jobs.Record{ - Username: security.RootUserName(), - Description: fmt.Sprintf(jobPrefix+` CSV DATA (%s)`+tc.jobOpts, strings.ReplaceAll(strings.Join(tc.files, ", "), "?AWS_SESSION_TOKEN=secrets", "?AWS_SESSION_TOKEN=redacted")), + Username: security.RootUserName(), + Description: fmt.Sprintf(jobPrefix+` CSV DATA (%s)`+tc.jobOpts, strings.ReplaceAll(strings.Join(tc.files, ", "), "?AWS_SESSION_TOKEN=secrets", "?AWS_SESSION_TOKEN=redacted")), + DescriptorIDs: []descpb.ID{descpb.ID(tableID)}, }); err != nil { t.Fatal(err) } @@ -2842,6 +2853,9 @@ func TestImportIntoCSV(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE t (a INT, b STRING)`) defer sqlDB.Exec(t, `DROP TABLE t`) + var tableID int64 + sqlDB.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 't'`).Scan(&tableID) + var unused string var restored struct { rows, idx, bytes int @@ -2869,8 +2883,9 @@ func TestImportIntoCSV(t *testing.T) { jobPrefix := `IMPORT INTO defaultdb.public.t(a, b)` if err := jobutils.VerifySystemJob(t, sqlDB, testNum, jobspb.TypeImport, jobs.StatusSucceeded, jobs.Record{ - Username: security.RootUserName(), - Description: fmt.Sprintf(jobPrefix+` CSV DATA (%s)`+tc.jobOpts, strings.ReplaceAll(strings.Join(tc.files, ", "), "?AWS_SESSION_TOKEN=secrets", "?AWS_SESSION_TOKEN=redacted")), + Username: security.RootUserName(), + Description: fmt.Sprintf(jobPrefix+` CSV DATA (%s)`+tc.jobOpts, strings.ReplaceAll(strings.Join(tc.files, ", "), "?AWS_SESSION_TOKEN=secrets", "?AWS_SESSION_TOKEN=redacted")), + DescriptorIDs: []descpb.ID{descpb.ID(tableID)}, }); err != nil { t.Fatal(err) } @@ -7003,3 +7018,71 @@ func TestDetachedImport(t *testing.T) { sqlDB.QueryRow(t, importIntoQueryDetached, simpleOcf).Scan(&jobID) waitForJobResult(t, tc, jobID, jobs.StatusFailed) } + +func TestImportJobEventLogging(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.ScopeWithoutShowLogs(t).Close(t) + + defer jobs.TestingSetProgressThresholds()() + defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)() + + const ( + nodes = 3 + ) + ctx := context.Background() + baseDir := filepath.Join("testdata", "avro") + args := base.TestServerArgs{ExternalIODir: baseDir} + params := base.TestClusterArgs{ServerArgs: args} + tc := testcluster.StartTestCluster(t, nodes, params) + defer tc.Stopper().Stop(ctx) + + var forceFailure bool + for i := range tc.Servers { + tc.Servers[i].JobRegistry().(*jobs.Registry).TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{ + jobspb.TypeImport: func(raw jobs.Resumer) jobs.Resumer { + r := raw.(*importResumer) + r.testingKnobs.afterImport = func(_ backupccl.RowCount) error { + if forceFailure { + return errors.New("testing injected failure") + } + return nil + } + return r + }, + } + } + + connDB := tc.Conns[0] + sqlDB := sqlutils.MakeSQLRunner(connDB) + + simpleOcf := fmt.Sprintf("nodelocal://0/%s", "simple.ocf") + + // First, let's test the happy path. Start a job, allow it to succeed and check + // the event log for the entries. + sqlDB.Exec(t, `CREATE DATABASE foo; SET DATABASE = foo`) + beforeImport := timeutil.Now() + importQuery := `IMPORT TABLE simple (i INT8 PRIMARY KEY, s text, b bytea) AVRO DATA ($1)` + + var jobID int64 + var unused interface{} + sqlDB.QueryRow(t, importQuery, simpleOcf).Scan(&jobID, &unused, &unused, &unused, &unused, + &unused) + + expectedStatus := []string{string(jobs.StatusSucceeded), string(jobs.StatusRunning)} + backupccl.CheckEmittedEvents(t, expectedStatus, beforeImport.UnixNano(), jobID, "import", "IMPORT") + + sqlDB.Exec(t, `DROP TABLE simple`) + + // Now let's test the events that are emitted when a job fails. + forceFailure = true + beforeSecondImport := timeutil.Now() + secondImport := `IMPORT TABLE simple (i INT8 PRIMARY KEY, s text, b bytea) AVRO DATA ($1)` + sqlDB.ExpectErrSucceedsSoon(t, "testing injected failure", secondImport, simpleOcf) + + row := sqlDB.QueryRow(t, "SELECT job_id FROM [SHOW JOBS] WHERE status = 'failed'") + row.Scan(&jobID) + + expectedStatus = []string{string(jobs.StatusFailed), string(jobs.StatusReverting), + string(jobs.StatusRunning)} + backupccl.CheckEmittedEvents(t, expectedStatus, beforeSecondImport.UnixNano(), jobID, "import", "IMPORT") +} diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 6a2e075bd71c..6993f8b4f351 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -288,6 +288,22 @@ func (j *Job) SetDescription(ctx context.Context, txn *kv.Txn, updateFn Descript }) } +// SetDescriptorIDs updates the description of a created job. +func (j *Job) SetDescriptorIDs( + ctx context.Context, txn *kv.Txn, updateFn DescriptorIdsUpdateFn, +) error { + return j.Update(ctx, txn, func(_ *kv.Txn, md JobMetadata, ju *JobUpdater) error { + prev := md.Payload.DescriptorIDs + descIDs, err := updateFn(ctx, prev) + if err != nil { + return err + } + md.Payload.DescriptorIDs = descIDs + ju.UpdatePayload(md.Payload) + return nil + }) +} + // SetNonCancelable updates the NonCancelable field of a created job. func (j *Job) SetNonCancelable( ctx context.Context, txn *kv.Txn, updateFn NonCancelableUpdateFn, @@ -312,6 +328,10 @@ type RunningStatusFn func(ctx context.Context, details jobspb.Details) (RunningS // given its current one. type DescriptionUpdateFn func(ctx context.Context, description string) (string, error) +// DescriptorIdsUpdateFn is a callback that computes a job's descriptor IDs given +// its current one. +type DescriptorIdsUpdateFn func(ctx context.Context, descIDs []descpb.ID) ([]descpb.ID, error) + // NonCancelableUpdateFn is a callback that computes a job's non-cancelable // status given its current one. type NonCancelableUpdateFn func(ctx context.Context, nonCancelable bool) bool diff --git a/pkg/sql/event_log.go b/pkg/sql/event_log.go index 550dc6923673..034de033c3d7 100644 --- a/pkg/sql/event_log.go +++ b/pkg/sql/event_log.go @@ -15,6 +15,8 @@ import ( "encoding/json" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings" @@ -142,6 +144,44 @@ func logEventInternalForSQLStatements( ) } +// LogEventForJobs emits a cluster event in the context of a job. +func LogEventForJobs( + ctx context.Context, + execCfg *ExecutorConfig, + txn *kv.Txn, + event eventpb.EventPayload, + jobID int64, + payload jobspb.Payload, + user security.SQLUsername, + status jobs.Status, +) error { + event.CommonDetails().Timestamp = txn.ReadTimestamp().WallTime + jobCommon, ok := event.(eventpb.EventWithCommonJobPayload) + if !ok { + return errors.AssertionFailedf("unknown event type: %T", event) + } + m := jobCommon.CommonJobDetails() + m.JobID = jobID + m.JobType = payload.Type().String() + m.User = user.Normalized() + m.Status = string(status) + for _, id := range payload.DescriptorIDs { + m.DescriptorIDs = append(m.DescriptorIDs, uint32(id)) + } + m.Description = payload.Description + + // Delegate the storing of the event to the regular event logic. + return InsertEventRecord( + ctx, execCfg.InternalExecutor, + txn, + 0, /* targetID */ + int32(execCfg.NodeID.SQLInstanceID()), + false, /* skipExternalLog */ + event, + false, /* onlyLog */ + ) +} + var eventLogEnabled = settings.RegisterBoolSetting( "server.eventlog.enabled", "if set, logged notable events are also stored in the table system.eventlog", diff --git a/pkg/ui/src/util/eventTypes.ts b/pkg/ui/src/util/eventTypes.ts index 63dc4c5a0562..9b9dbc04540b 100644 --- a/pkg/ui/src/util/eventTypes.ts +++ b/pkg/ui/src/util/eventTypes.ts @@ -99,6 +99,10 @@ export const CREATE_ROLE = "create_role"; export const DROP_ROLE = "drop_role"; // Recorded when a role is altered. export const ALTER_ROLE = "alter_role"; +// Recorded when an import job is in different stages of execution. +export const IMPORT = "import"; +// Recorded when a restore job is in different stages of execution. +export const RESTORE = "restore"; // Node Event Types export const nodeEvents = [ @@ -128,17 +132,20 @@ export const settingsEvents = [ SET_ZONE_CONFIG, REMOVE_ZONE_CONFIG, ]; +export const jobEvents = [IMPORT, RESTORE]; export const allEvents = [ ...nodeEvents, ...databaseEvents, ...tableEvents, ...settingsEvents, + ...jobEvents, ]; const nodeEventSet = _.invert(nodeEvents); const databaseEventSet = _.invert(databaseEvents); const tableEventSet = _.invert(tableEvents); const settingsEventSet = _.invert(settingsEvents); +const jobsEventSet = _.invert(jobEvents); export function isNodeEvent(e: Event): boolean { return !_.isUndefined(nodeEventSet[e.event_type]); @@ -155,3 +162,7 @@ export function isTableEvent(e: Event): boolean { export function isSettingsEvent(e: Event): boolean { return !_.isUndefined(settingsEventSet[e.event_type]); } + +export function isJobsEvent(e: Event): boolean { + return !_.isUndefined(jobsEventSet[e.event_type]); +} diff --git a/pkg/ui/src/util/events.ts b/pkg/ui/src/util/events.ts index c121f9d74db8..639b6b7757f1 100644 --- a/pkg/ui/src/util/events.ts +++ b/pkg/ui/src/util/events.ts @@ -136,6 +136,10 @@ export function getEventDescription(e: Event$Properties): string { return `Role Dropped: User ${info.User} dropped role ${info.RoleName}`; case eventTypes.ALTER_ROLE: return `Role Altered: User ${info.User} altered role ${info.RoleName} with options ${info.Options}`; + case eventTypes.IMPORT: + return `Import Job: User ${info.User} has a job ${info.JobID} running with status ${info.Status}`; + case eventTypes.RESTORE: + return `Restore Job: User ${info.User} has a job ${info.JobID} running with status ${info.Status}`; default: return `Unknown Event Type: ${e.event_type}, content: ${JSON.stringify( info, @@ -182,6 +186,8 @@ export interface EventInfo { DroppedSchemaObjects?: string[]; Grantees?: string; NewDatabaseParent?: string; + JobID?: string; + Status?: string; } export function getDroppedObjectsText(eventInfo: EventInfo): string { diff --git a/pkg/util/log/eventpb/BUILD.bazel b/pkg/util/log/eventpb/BUILD.bazel index 165d6e0963c2..8fa6c36bbf89 100644 --- a/pkg/util/log/eventpb/BUILD.bazel +++ b/pkg/util/log/eventpb/BUILD.bazel @@ -37,6 +37,7 @@ proto_library( "cluster_events.proto", "ddl_events.proto", "events.proto", + "job_events.proto", "misc_sql_events.proto", "privilege_events.proto", "role_events.proto", diff --git a/pkg/util/log/eventpb/eventlog_channels_generated.go b/pkg/util/log/eventpb/eventlog_channels_generated.go index 314ddb5c247a..3882f845bdd8 100644 --- a/pkg/util/log/eventpb/eventlog_channels_generated.go +++ b/pkg/util/log/eventpb/eventlog_channels_generated.go @@ -22,6 +22,12 @@ func (m *NodeRecommissioned) LoggingChannel() logpb.Channel { return logpb.Chann // LoggingChannel implements the EventPayload interface. func (m *NodeRestart) LoggingChannel() logpb.Channel { return logpb.Channel_OPS } +// LoggingChannel implements the EventPayload interface. +func (m *Import) LoggingChannel() logpb.Channel { return logpb.Channel_OPS } + +// LoggingChannel implements the EventPayload interface. +func (m *Restore) LoggingChannel() logpb.Channel { return logpb.Channel_OPS } + // LoggingChannel implements the EventPayload interface. func (m *SetClusterSetting) LoggingChannel() logpb.Channel { return logpb.Channel_DEV } diff --git a/pkg/util/log/eventpb/events.go b/pkg/util/log/eventpb/events.go index 60e754cca315..74b3e9351a5c 100644 --- a/pkg/util/log/eventpb/events.go +++ b/pkg/util/log/eventpb/events.go @@ -80,3 +80,15 @@ func (m *CommonSchemaChangeEventDetails) CommonSchemaChangeDetails() *CommonSche var _ EventWithCommonSchemaChangePayload = (*FinishSchemaChange)(nil) var _ EventWithCommonSchemaChangePayload = (*ReverseSchemaChange)(nil) var _ EventWithCommonSchemaChangePayload = (*FinishSchemaChangeRollback)(nil) + +// EventWithCommonJobPayload is implemented by CommonSQLEventDetails. +type EventWithCommonJobPayload interface { + EventPayload + CommonJobDetails() *CommonJobEventDetails +} + +// CommonJobDetails implements the EventWithCommonJobPayload interface. +func (m *CommonJobEventDetails) CommonJobDetails() *CommonJobEventDetails { return m } + +var _ EventWithCommonJobPayload = (*Import)(nil) +var _ EventWithCommonJobPayload = (*Restore)(nil) diff --git a/pkg/util/log/eventpb/events.pb.go b/pkg/util/log/eventpb/events.pb.go index 1591fe491fd7..6123fd4adfdc 100644 --- a/pkg/util/log/eventpb/events.pb.go +++ b/pkg/util/log/eventpb/events.pb.go @@ -33,7 +33,7 @@ func (m *CommonEventDetails) Reset() { *m = CommonEventDetails{} } func (m *CommonEventDetails) String() string { return proto.CompactTextString(m) } func (*CommonEventDetails) ProtoMessage() {} func (*CommonEventDetails) Descriptor() ([]byte, []int) { - return fileDescriptor_events_584d902029f1b16a, []int{0} + return fileDescriptor_events_1dbe437e52f03141, []int{0} } func (m *CommonEventDetails) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -80,7 +80,7 @@ func (m *CommonSQLEventDetails) Reset() { *m = CommonSQLEventDetails{} } func (m *CommonSQLEventDetails) String() string { return proto.CompactTextString(m) } func (*CommonSQLEventDetails) ProtoMessage() {} func (*CommonSQLEventDetails) Descriptor() ([]byte, []int) { - return fileDescriptor_events_584d902029f1b16a, []int{1} + return fileDescriptor_events_1dbe437e52f03141, []int{1} } func (m *CommonSQLEventDetails) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -105,9 +105,59 @@ func (m *CommonSQLEventDetails) XXX_DiscardUnknown() { var xxx_messageInfo_CommonSQLEventDetails proto.InternalMessageInfo +// CommonJobEventDetails contains the fields common to all job events. +type CommonJobEventDetails struct { + // The ID of the job that triggered the event. + JobID int64 `protobuf:"varint,1,opt,name=job_id,json=jobId,proto3" json:",omitempty"` + // The type of the job that triggered the event. + JobType string `protobuf:"bytes,2,opt,name=job_type,json=jobType,proto3" json:",omitempty" redact:"nonsensitive"` + // A description of the job that triggered the event. Some jobs populate the + // description with an approximate representation of the SQL statement run to + // create the job. + Description string `protobuf:"bytes,3,opt,name=description,proto3" json:",omitempty"` + // The user account that triggered the event. + User string `protobuf:"bytes,4,opt,name=user,proto3" json:",omitempty"` + // The object descriptors affected by the job. Set to zero for operations + // that don't affect descriptors. + DescriptorIDs []uint32 `protobuf:"varint,5,rep,packed,name=descriptor_ids,json=descriptorIds,proto3" json:",omitempty"` + // The status of the job that triggered the event. This allows the job to + // indicate which phase execution it is in when the event is triggered. + Status string `protobuf:"bytes,6,opt,name=status,proto3" json:",omitempty" redact:"nonsensitive"` +} + +func (m *CommonJobEventDetails) Reset() { *m = CommonJobEventDetails{} } +func (m *CommonJobEventDetails) String() string { return proto.CompactTextString(m) } +func (*CommonJobEventDetails) ProtoMessage() {} +func (*CommonJobEventDetails) Descriptor() ([]byte, []int) { + return fileDescriptor_events_1dbe437e52f03141, []int{2} +} +func (m *CommonJobEventDetails) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *CommonJobEventDetails) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *CommonJobEventDetails) XXX_Merge(src proto.Message) { + xxx_messageInfo_CommonJobEventDetails.Merge(dst, src) +} +func (m *CommonJobEventDetails) XXX_Size() int { + return m.Size() +} +func (m *CommonJobEventDetails) XXX_DiscardUnknown() { + xxx_messageInfo_CommonJobEventDetails.DiscardUnknown(m) +} + +var xxx_messageInfo_CommonJobEventDetails proto.InternalMessageInfo + func init() { proto.RegisterType((*CommonEventDetails)(nil), "cockroach.util.log.eventpb.CommonEventDetails") proto.RegisterType((*CommonSQLEventDetails)(nil), "cockroach.util.log.eventpb.CommonSQLEventDetails") + proto.RegisterType((*CommonJobEventDetails)(nil), "cockroach.util.log.eventpb.CommonJobEventDetails") } func (m *CommonEventDetails) Marshal() (dAtA []byte, err error) { size := m.Size() @@ -194,6 +244,70 @@ func (m *CommonSQLEventDetails) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *CommonJobEventDetails) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *CommonJobEventDetails) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.JobID != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintEvents(dAtA, i, uint64(m.JobID)) + } + if len(m.JobType) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintEvents(dAtA, i, uint64(len(m.JobType))) + i += copy(dAtA[i:], m.JobType) + } + if len(m.Description) > 0 { + dAtA[i] = 0x1a + i++ + i = encodeVarintEvents(dAtA, i, uint64(len(m.Description))) + i += copy(dAtA[i:], m.Description) + } + if len(m.User) > 0 { + dAtA[i] = 0x22 + i++ + i = encodeVarintEvents(dAtA, i, uint64(len(m.User))) + i += copy(dAtA[i:], m.User) + } + if len(m.DescriptorIDs) > 0 { + dAtA2 := make([]byte, len(m.DescriptorIDs)*10) + var j1 int + for _, num := range m.DescriptorIDs { + for num >= 1<<7 { + dAtA2[j1] = uint8(uint64(num)&0x7f | 0x80) + num >>= 7 + j1++ + } + dAtA2[j1] = uint8(num) + j1++ + } + dAtA[i] = 0x2a + i++ + i = encodeVarintEvents(dAtA, i, uint64(j1)) + i += copy(dAtA[i:], dAtA2[:j1]) + } + if len(m.Status) > 0 { + dAtA[i] = 0x32 + i++ + i = encodeVarintEvents(dAtA, i, uint64(len(m.Status))) + i += copy(dAtA[i:], m.Status) + } + return i, nil +} + func encodeVarintEvents(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -249,6 +363,41 @@ func (m *CommonSQLEventDetails) Size() (n int) { return n } +func (m *CommonJobEventDetails) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.JobID != 0 { + n += 1 + sovEvents(uint64(m.JobID)) + } + l = len(m.JobType) + if l > 0 { + n += 1 + l + sovEvents(uint64(l)) + } + l = len(m.Description) + if l > 0 { + n += 1 + l + sovEvents(uint64(l)) + } + l = len(m.User) + if l > 0 { + n += 1 + l + sovEvents(uint64(l)) + } + if len(m.DescriptorIDs) > 0 { + l = 0 + for _, e := range m.DescriptorIDs { + l += sovEvents(uint64(e)) + } + n += 1 + sovEvents(uint64(l)) + l + } + l = len(m.Status) + if l > 0 { + n += 1 + l + sovEvents(uint64(l)) + } + return n +} + func sovEvents(x uint64) (n int) { for { n++ @@ -545,6 +694,264 @@ func (m *CommonSQLEventDetails) Unmarshal(dAtA []byte) error { } return nil } +func (m *CommonJobEventDetails) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: CommonJobEventDetails: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: CommonJobEventDetails: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field JobID", wireType) + } + m.JobID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.JobID |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field JobType", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.JobType = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Description", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Description = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field User", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.User = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 5: + if wireType == 0 { + var v uint32 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.DescriptorIDs = append(m.DescriptorIDs, v) + } else if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + packedLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + packedLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + var elementCount int + var count int + for _, integer := range dAtA { + if integer < 128 { + count++ + } + } + elementCount = count + if elementCount != 0 && len(m.DescriptorIDs) == 0 { + m.DescriptorIDs = make([]uint32, 0, elementCount) + } + for iNdEx < postIndex { + var v uint32 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.DescriptorIDs = append(m.DescriptorIDs, v) + } + } else { + return fmt.Errorf("proto: wrong wireType = %d for field DescriptorIDs", wireType) + } + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Status = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipEvents(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipEvents(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 @@ -651,34 +1058,42 @@ var ( ) func init() { - proto.RegisterFile("util/log/eventpb/events.proto", fileDescriptor_events_584d902029f1b16a) + proto.RegisterFile("util/log/eventpb/events.proto", fileDescriptor_events_1dbe437e52f03141) } -var fileDescriptor_events_584d902029f1b16a = []byte{ - // 398 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0xd2, 0x4f, 0x8b, 0x13, 0x31, - 0x18, 0x06, 0xf0, 0x66, 0xbb, 0x2a, 0x0d, 0xbb, 0xfe, 0x09, 0x2e, 0x0c, 0x05, 0xd3, 0x32, 0x17, - 0x2b, 0x2c, 0x9d, 0x83, 0x27, 0x05, 0x2f, 0xdd, 0x2a, 0x2c, 0x88, 0xe0, 0x2a, 0x1e, 0xbc, 0x94, - 0x74, 0xe6, 0x75, 0x36, 0x98, 0xe4, 0x0d, 0x93, 0xb7, 0x85, 0x7e, 0x05, 0x4f, 0x7e, 0x0c, 0x3f, - 0xca, 0x1e, 0xf7, 0xb8, 0xa7, 0xa2, 0xd3, 0x9b, 0x47, 0x3f, 0x81, 0x34, 0x53, 0xba, 0x6b, 0xdd, - 0xd3, 0x4c, 0x78, 0x7f, 0x4f, 0xf2, 0x40, 0xc2, 0x9f, 0xcc, 0x48, 0x9b, 0xcc, 0x60, 0x99, 0xc1, - 0x1c, 0x1c, 0xf9, 0x69, 0xf3, 0x0d, 0x43, 0x5f, 0x21, 0xa1, 0xe8, 0xe6, 0x98, 0x7f, 0xad, 0x50, - 0xe5, 0xe7, 0xc3, 0x35, 0x1c, 0x1a, 0x2c, 0x87, 0x1b, 0xd8, 0x7d, 0x5c, 0x62, 0x89, 0x91, 0x65, - 0xeb, 0xbf, 0x26, 0xd1, 0xed, 0x95, 0x88, 0xa5, 0x81, 0x2c, 0xae, 0xa6, 0xb3, 0x2f, 0x19, 0x69, - 0x0b, 0x81, 0x94, 0xf5, 0x0d, 0x48, 0xbf, 0x31, 0x2e, 0x4e, 0xd0, 0x5a, 0x74, 0xaf, 0xd7, 0x1b, - 0x8d, 0x81, 0x94, 0x36, 0x41, 0x1c, 0xf3, 0xce, 0x56, 0x26, 0xac, 0xcf, 0x06, 0xed, 0xd1, 0xfd, - 0xdf, 0xcb, 0x1e, 0x3f, 0x46, 0xab, 0x09, 0xac, 0xa7, 0xc5, 0xd9, 0x35, 0x10, 0x6f, 0x38, 0x8f, - 0x35, 0x26, 0xb4, 0xf0, 0x90, 0xec, 0xf5, 0xd9, 0xa0, 0x33, 0x7a, 0xfa, 0x2f, 0xff, 0xb3, 0xec, - 0x1d, 0x55, 0x50, 0xa8, 0x9c, 0x5e, 0xa6, 0x0e, 0x5d, 0x00, 0x17, 0x34, 0xe9, 0x39, 0xa4, 0x67, - 0x9d, 0x18, 0xfd, 0xb8, 0xf0, 0x90, 0xfe, 0xd8, 0xe3, 0x47, 0x4d, 0x99, 0x0f, 0xef, 0xdf, 0xee, - 0xf6, 0x09, 0xa4, 0x08, 0x2c, 0x38, 0x8a, 0x7d, 0x3a, 0xff, 0xf7, 0xd9, 0x02, 0x91, 0xf2, 0xfd, - 0x59, 0x80, 0x6a, 0xd3, 0x64, 0x17, 0xc6, 0x99, 0x38, 0xe1, 0x87, 0x05, 0x84, 0xbc, 0xd2, 0x9e, - 0xb0, 0x9a, 0xe8, 0x22, 0x69, 0xf7, 0xd9, 0xe0, 0x70, 0x24, 0xeb, 0x65, 0xef, 0x60, 0xbc, 0x1d, - 0x9c, 0x8e, 0x77, 0xc2, 0x07, 0xd7, 0xa1, 0xd3, 0x42, 0xbc, 0xe0, 0x0f, 0x95, 0xf7, 0x46, 0xe7, - 0x8a, 0x34, 0xba, 0x89, 0x53, 0x16, 0x92, 0xfd, 0x5b, 0x0f, 0x7d, 0x70, 0xc3, 0xbd, 0x53, 0x16, - 0xc4, 0x2b, 0x2e, 0xbc, 0x51, 0x39, 0x9c, 0xa3, 0x29, 0xa0, 0x9a, 0xcc, 0x95, 0x99, 0x41, 0x48, - 0xee, 0xf4, 0xdb, 0xb7, 0x84, 0x1f, 0xdd, 0x90, 0x9f, 0x22, 0x1c, 0x3d, 0xbb, 0xf8, 0x25, 0x5b, - 0x17, 0xb5, 0x64, 0x97, 0xb5, 0x64, 0x57, 0xb5, 0x64, 0x3f, 0x6b, 0xc9, 0xbe, 0xaf, 0x64, 0xeb, - 0x72, 0x25, 0x5b, 0x57, 0x2b, 0xd9, 0xfa, 0x7c, 0x6f, 0xf3, 0x32, 0xa6, 0x77, 0xe3, 0x4d, 0x3f, - 0xff, 0x1b, 0x00, 0x00, 0xff, 0xff, 0xa4, 0x2e, 0x27, 0xfb, 0x5d, 0x02, 0x00, 0x00, +var fileDescriptor_events_1dbe437e52f03141 = []byte{ + // 513 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x93, 0x4f, 0x8b, 0xd3, 0x4e, + 0x1c, 0xc6, 0x9b, 0xed, 0xbf, 0x5f, 0xe7, 0xb7, 0x5d, 0x75, 0x70, 0x31, 0x14, 0x4c, 0x4a, 0x2e, + 0x56, 0x58, 0x5a, 0xd1, 0x93, 0x82, 0x08, 0xdd, 0xba, 0xd0, 0xa2, 0x82, 0xab, 0x78, 0xf0, 0x52, + 0x26, 0xc9, 0xd7, 0xec, 0xd4, 0x24, 0xdf, 0x21, 0x33, 0x2d, 0xf4, 0x2d, 0x78, 0xf2, 0x65, 0xf8, + 0x52, 0xf6, 0xb8, 0xc7, 0x3d, 0x05, 0x4d, 0x6f, 0x1e, 0x05, 0xef, 0x92, 0xa4, 0x66, 0xd3, 0x76, + 0x11, 0x3d, 0x25, 0x61, 0x3e, 0x4f, 0xe6, 0xf9, 0x3e, 0xcf, 0x0c, 0xb9, 0x3b, 0x57, 0xdc, 0x1f, + 0xf8, 0xe8, 0x0d, 0x60, 0x01, 0xa1, 0x12, 0x76, 0xfe, 0x94, 0x7d, 0x11, 0xa1, 0x42, 0xda, 0x71, + 0xd0, 0xf9, 0x18, 0x21, 0x73, 0xce, 0xfa, 0x29, 0xd8, 0xf7, 0xd1, 0xeb, 0xaf, 0xc1, 0xce, 0x6d, + 0x0f, 0x3d, 0xcc, 0xb0, 0x41, 0xfa, 0x96, 0x2b, 0x3a, 0xa6, 0x87, 0xe8, 0xf9, 0x30, 0xc8, 0xbe, + 0xec, 0xf9, 0x87, 0x81, 0xe2, 0x01, 0x48, 0xc5, 0x02, 0x91, 0x03, 0xd6, 0x27, 0x8d, 0xd0, 0x63, + 0x0c, 0x02, 0x0c, 0x9f, 0xa7, 0x3f, 0x1a, 0x81, 0x62, 0xdc, 0x97, 0xf4, 0x88, 0xb4, 0x0a, 0x52, + 0xd7, 0xba, 0x5a, 0xaf, 0x3a, 0x3c, 0xf8, 0x1e, 0x9b, 0xe4, 0x08, 0x03, 0xae, 0x20, 0x10, 0x6a, + 0x79, 0x7a, 0x05, 0xd0, 0x13, 0x42, 0x32, 0x1b, 0x53, 0xb5, 0x14, 0xa0, 0xef, 0x75, 0xb5, 0x5e, + 0x6b, 0x78, 0x6f, 0x13, 0xff, 0x11, 0x9b, 0x87, 0x11, 0xb8, 0xcc, 0x51, 0x4f, 0xac, 0x10, 0x43, + 0x09, 0xa1, 0xe4, 0x8a, 0x2f, 0xc0, 0x3a, 0x6d, 0x65, 0xd2, 0xb7, 0x4b, 0x01, 0xd6, 0x97, 0x3d, + 0x72, 0x98, 0x9b, 0x79, 0xf3, 0xfa, 0xc5, 0xb6, 0x1f, 0xa9, 0x98, 0x82, 0x00, 0x42, 0x95, 0xf9, + 0x69, 0xed, 0xfa, 0x29, 0x00, 0x6a, 0x91, 0xda, 0x5c, 0x42, 0xb4, 0x76, 0xb2, 0x0d, 0x66, 0x6b, + 0xf4, 0x98, 0xb4, 0x5d, 0x90, 0x4e, 0xc4, 0x85, 0xc2, 0x68, 0xca, 0x5d, 0xbd, 0xda, 0xd5, 0x7a, + 0xed, 0xa1, 0x91, 0xc4, 0xe6, 0xfe, 0xa8, 0x58, 0x18, 0x8f, 0xb6, 0xc4, 0xfb, 0x57, 0xa2, 0xb1, + 0x4b, 0x1f, 0x93, 0x9b, 0x4c, 0x08, 0x9f, 0x3b, 0x4c, 0x71, 0x0c, 0xa7, 0x21, 0x0b, 0x40, 0xaf, + 0x5d, 0xbb, 0xe9, 0x8d, 0x12, 0xf7, 0x8a, 0x05, 0x40, 0x9f, 0x12, 0x2a, 0x7c, 0xe6, 0xc0, 0x19, + 0xfa, 0x2e, 0x44, 0xd3, 0x05, 0xf3, 0xe7, 0x20, 0xf5, 0x7a, 0xb7, 0x7a, 0x8d, 0xf8, 0x56, 0x89, + 0x7c, 0x97, 0x81, 0xd6, 0xcf, 0x22, 0xaa, 0x09, 0xda, 0x1b, 0x51, 0xf5, 0x49, 0x63, 0x86, 0x76, + 0x3a, 0x51, 0xde, 0xdb, 0x9d, 0x24, 0x36, 0xeb, 0x13, 0xb4, 0x77, 0x46, 0xa9, 0xcf, 0xd0, 0x1e, + 0xbb, 0xf4, 0x25, 0xf9, 0x2f, 0xe5, 0x4b, 0xd5, 0x3d, 0x4c, 0x62, 0xb3, 0x39, 0x41, 0x3b, 0xed, + 0xe4, 0x6f, 0x5b, 0x6c, 0xce, 0x72, 0x9e, 0x3e, 0x20, 0xff, 0xff, 0x8e, 0x88, 0x63, 0x98, 0xa5, + 0xba, 0x3b, 0x50, 0x19, 0x29, 0xda, 0xaa, 0xfd, 0xa1, 0xad, 0x13, 0x72, 0xb0, 0xd1, 0x56, 0x9e, + 0x54, 0x7b, 0x68, 0x26, 0xb1, 0xd9, 0x2e, 0xd7, 0x25, 0xb7, 0xe4, 0xed, 0x72, 0x5f, 0x92, 0x3e, + 0x23, 0x8d, 0xf4, 0x98, 0xcc, 0xa5, 0xde, 0xf8, 0xb7, 0x53, 0xba, 0x96, 0x0d, 0xef, 0x9f, 0x7f, + 0x33, 0x2a, 0xe7, 0x89, 0xa1, 0x5d, 0x24, 0x86, 0x76, 0x99, 0x18, 0xda, 0xd7, 0xc4, 0xd0, 0x3e, + 0xaf, 0x8c, 0xca, 0xc5, 0xca, 0xa8, 0x5c, 0xae, 0x8c, 0xca, 0xfb, 0xe6, 0xfa, 0x46, 0xda, 0x8d, + 0xec, 0x86, 0x3d, 0xfa, 0x15, 0x00, 0x00, 0xff, 0xff, 0x9f, 0x8d, 0xa7, 0xc7, 0xd5, 0x03, 0x00, + 0x00, } diff --git a/pkg/util/log/eventpb/events.proto b/pkg/util/log/eventpb/events.proto index 1d7d99131676..3b4afd6fe98b 100644 --- a/pkg/util/log/eventpb/events.proto +++ b/pkg/util/log/eventpb/events.proto @@ -42,7 +42,7 @@ message CommonSQLEventDetails { // The primary object descriptor affected by the operation. Set to zero for operations // that don't affect descriptors. - uint32 descriptor_id = 3 [(gogoproto.customname) = "DescriptorID" , (gogoproto.jsontag) = ",omitempty"]; + uint32 descriptor_id = 3 [(gogoproto.customname) = "DescriptorID", (gogoproto.jsontag) = ",omitempty"]; // The application name for the session where the event was emitted. // This is included in the event to ease filtering of logging output @@ -53,3 +53,28 @@ message CommonSQLEventDetails { repeated string placeholder_values = 5 [(gogoproto.jsontag) = ",omitempty"]; } +// CommonJobEventDetails contains the fields common to all job events. +message CommonJobEventDetails { + // The ID of the job that triggered the event. + int64 job_id = 1 [(gogoproto.customname) = "JobID", (gogoproto.jsontag) = ",omitempty"]; + + // The type of the job that triggered the event. + string job_type = 2 [(gogoproto.customname) = "JobType", (gogoproto.jsontag) = ",omitempty", (gogoproto.moretags) = "redact:\"nonsensitive\""]; + + // A description of the job that triggered the event. Some jobs populate the + // description with an approximate representation of the SQL statement run to + // create the job. + string description = 3 [(gogoproto.jsontag) = ",omitempty"]; + + // The user account that triggered the event. + string user = 4 [(gogoproto.jsontag) = ",omitempty"]; + + // The object descriptors affected by the job. Set to zero for operations + // that don't affect descriptors. + repeated uint32 descriptor_ids = 5 [(gogoproto.customname) = "DescriptorIDs", (gogoproto.jsontag) = ",omitempty"]; + + // The status of the job that triggered the event. This allows the job to + // indicate which phase execution it is in when the event is triggered. + string status = 6 [(gogoproto.jsontag) = ",omitempty", (gogoproto.moretags) = "redact:\"nonsensitive\""]; +} + diff --git a/pkg/util/log/eventpb/gen.go b/pkg/util/log/eventpb/gen.go index 8d42f035997a..28a362e4a651 100644 --- a/pkg/util/log/eventpb/gen.go +++ b/pkg/util/log/eventpb/gen.go @@ -501,6 +501,16 @@ func (m *{{.GoType}}) AppendJSONFields(printComma bool, b redact.RedactableBytes b = append(b, "\"{{.FieldName}}\":"...) b = strconv.AppendUint(b, uint64(m.{{.FieldName}}), 10) } + {{- else if eq .FieldType "array_of_uint32" -}} + if len(m.{{.FieldName}}) > 0 { + if printComma { b = append(b, ',')}; printComma = true + b = append(b, "\"{{.FieldName}}\":["...) + for i, v := range m.{{.FieldName}} { + if i > 0 { b = append(b, ',') } + b = strconv.AppendUint(b, uint64(v), 10) + } + b = append(b, ']') + } {{- else if .IsEnum }} if m.{{.FieldName}} != 0 { if printComma { b = append(b, ',')}; printComma = true diff --git a/pkg/util/log/eventpb/job_events.pb.go b/pkg/util/log/eventpb/job_events.pb.go new file mode 100644 index 000000000000..533b7534cb9a --- /dev/null +++ b/pkg/util/log/eventpb/job_events.pb.go @@ -0,0 +1,564 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: util/log/eventpb/job_events.proto + +package eventpb + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +// Import is recorded when an import job is created and successful completion. +// If the job fails, events will be emitted on job creation, failure, and +// successful revert. +type Import struct { + CommonEventDetails `protobuf:"bytes,1,opt,name=common,proto3,embedded=common" json:""` + CommonJobEventDetails `protobuf:"bytes,2,opt,name=job,proto3,embedded=job" json:""` +} + +func (m *Import) Reset() { *m = Import{} } +func (m *Import) String() string { return proto.CompactTextString(m) } +func (*Import) ProtoMessage() {} +func (*Import) Descriptor() ([]byte, []int) { + return fileDescriptor_job_events_8230347c4526922b, []int{0} +} +func (m *Import) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Import) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *Import) XXX_Merge(src proto.Message) { + xxx_messageInfo_Import.Merge(dst, src) +} +func (m *Import) XXX_Size() int { + return m.Size() +} +func (m *Import) XXX_DiscardUnknown() { + xxx_messageInfo_Import.DiscardUnknown(m) +} + +var xxx_messageInfo_Import proto.InternalMessageInfo + +// Restore is recorded when a restore job is created and successful completion. +// If the job fails, events will be emitted on job creation, failure, and +// successful revert. +type Restore struct { + CommonEventDetails `protobuf:"bytes,1,opt,name=common,proto3,embedded=common" json:""` + CommonJobEventDetails `protobuf:"bytes,2,opt,name=job,proto3,embedded=job" json:""` +} + +func (m *Restore) Reset() { *m = Restore{} } +func (m *Restore) String() string { return proto.CompactTextString(m) } +func (*Restore) ProtoMessage() {} +func (*Restore) Descriptor() ([]byte, []int) { + return fileDescriptor_job_events_8230347c4526922b, []int{1} +} +func (m *Restore) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Restore) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *Restore) XXX_Merge(src proto.Message) { + xxx_messageInfo_Restore.Merge(dst, src) +} +func (m *Restore) XXX_Size() int { + return m.Size() +} +func (m *Restore) XXX_DiscardUnknown() { + xxx_messageInfo_Restore.DiscardUnknown(m) +} + +var xxx_messageInfo_Restore proto.InternalMessageInfo + +func init() { + proto.RegisterType((*Import)(nil), "cockroach.util.log.eventpb.Import") + proto.RegisterType((*Restore)(nil), "cockroach.util.log.eventpb.Restore") +} +func (m *Import) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Import) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0xa + i++ + i = encodeVarintJobEvents(dAtA, i, uint64(m.CommonEventDetails.Size())) + n1, err := m.CommonEventDetails.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + dAtA[i] = 0x12 + i++ + i = encodeVarintJobEvents(dAtA, i, uint64(m.CommonJobEventDetails.Size())) + n2, err := m.CommonJobEventDetails.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 + return i, nil +} + +func (m *Restore) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Restore) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0xa + i++ + i = encodeVarintJobEvents(dAtA, i, uint64(m.CommonEventDetails.Size())) + n3, err := m.CommonEventDetails.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n3 + dAtA[i] = 0x12 + i++ + i = encodeVarintJobEvents(dAtA, i, uint64(m.CommonJobEventDetails.Size())) + n4, err := m.CommonJobEventDetails.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n4 + return i, nil +} + +func encodeVarintJobEvents(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *Import) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = m.CommonEventDetails.Size() + n += 1 + l + sovJobEvents(uint64(l)) + l = m.CommonJobEventDetails.Size() + n += 1 + l + sovJobEvents(uint64(l)) + return n +} + +func (m *Restore) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = m.CommonEventDetails.Size() + n += 1 + l + sovJobEvents(uint64(l)) + l = m.CommonJobEventDetails.Size() + n += 1 + l + sovJobEvents(uint64(l)) + return n +} + +func sovJobEvents(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozJobEvents(x uint64) (n int) { + return sovJobEvents(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Import) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Import: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Import: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CommonEventDetails", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthJobEvents + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.CommonEventDetails.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CommonJobEventDetails", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthJobEvents + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.CommonJobEventDetails.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipJobEvents(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthJobEvents + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Restore) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Restore: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Restore: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CommonEventDetails", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthJobEvents + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.CommonEventDetails.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CommonJobEventDetails", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowJobEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthJobEvents + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.CommonJobEventDetails.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipJobEvents(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthJobEvents + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipJobEvents(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowJobEvents + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowJobEvents + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowJobEvents + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthJobEvents + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowJobEvents + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipJobEvents(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthJobEvents = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowJobEvents = fmt.Errorf("proto: integer overflow") +) + +func init() { + proto.RegisterFile("util/log/eventpb/job_events.proto", fileDescriptor_job_events_8230347c4526922b) +} + +var fileDescriptor_job_events_8230347c4526922b = []byte{ + // 243 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x52, 0x2c, 0x2d, 0xc9, 0xcc, + 0xd1, 0xcf, 0xc9, 0x4f, 0xd7, 0x4f, 0x2d, 0x4b, 0xcd, 0x2b, 0x29, 0x48, 0xd2, 0xcf, 0xca, 0x4f, + 0x8a, 0x07, 0xb3, 0x8b, 0xf5, 0x0a, 0x8a, 0xf2, 0x4b, 0xf2, 0x85, 0xa4, 0x92, 0xf3, 0x93, 0xb3, + 0x8b, 0xf2, 0x13, 0x93, 0x33, 0xf4, 0x40, 0x8a, 0xf5, 0x72, 0xf2, 0xd3, 0xf5, 0xa0, 0x8a, 0xa5, + 0x44, 0xd2, 0xf3, 0xd3, 0xf3, 0xc1, 0xca, 0xf4, 0x41, 0x2c, 0x88, 0x0e, 0x29, 0x59, 0x0c, 0x43, + 0x91, 0x0d, 0x54, 0xda, 0xc8, 0xc8, 0xc5, 0xe6, 0x99, 0x5b, 0x90, 0x5f, 0x54, 0x22, 0x14, 0xc2, + 0xc5, 0x96, 0x9c, 0x9f, 0x9b, 0x9b, 0x9f, 0x27, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x6d, 0xa4, 0xa7, + 0x87, 0xdb, 0x32, 0x3d, 0x67, 0xb0, 0x4a, 0x57, 0x10, 0xcf, 0x25, 0xb5, 0x24, 0x31, 0x33, 0xa7, + 0xd8, 0x89, 0xe7, 0xc4, 0x3d, 0x79, 0x86, 0x0b, 0xf7, 0xe4, 0x19, 0x5f, 0xdd, 0x93, 0x67, 0x08, + 0x82, 0x9a, 0x25, 0x14, 0xc8, 0xc5, 0x9c, 0x95, 0x9f, 0x24, 0xc1, 0x04, 0x36, 0xd2, 0x90, 0xb0, + 0x91, 0x5e, 0xf9, 0x49, 0x78, 0x4c, 0x05, 0x99, 0xa5, 0xb4, 0x89, 0x91, 0x8b, 0x3d, 0x28, 0xb5, + 0xb8, 0x24, 0xbf, 0x28, 0x75, 0xc8, 0x38, 0xda, 0x49, 0xf3, 0xc4, 0x43, 0x39, 0x86, 0x13, 0x8f, + 0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0xbc, 0xf1, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x09, + 0x8f, 0xe5, 0x18, 0x2e, 0x3c, 0x96, 0x63, 0xb8, 0xf1, 0x58, 0x8e, 0x21, 0x8a, 0x1d, 0x6a, 0x66, + 0x12, 0x1b, 0x38, 0x6a, 0x8c, 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0x09, 0x10, 0x78, 0xa6, 0x10, + 0x02, 0x00, 0x00, +} diff --git a/pkg/util/log/eventpb/job_events.proto b/pkg/util/log/eventpb/job_events.proto new file mode 100644 index 000000000000..4bf2b02dc0ab --- /dev/null +++ b/pkg/util/log/eventpb/job_events.proto @@ -0,0 +1,45 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.util.log.eventpb; +option go_package = "eventpb"; + +import "gogoproto/gogo.proto"; +import "util/log/eventpb/events.proto"; + +// Category: Job events +// Channel: OPS +// +// Events in this category pertain to long-running jobs that are orchestrated by +// a node's job registry. These system processes can create and/or modify stored +// objects during the course of their execution. +// +// A job might choose to emit multiple events during its execution when +// transitioning from one "state" to another. +// Egs: IMPORT/RESTORE will emit events on job creation and successful +// completion. If the job fails, events will be emitted on job creation, +// failure, and successful revert. + +// Import is recorded when an import job is created and successful completion. +// If the job fails, events will be emitted on job creation, failure, and +// successful revert. +message Import { + CommonEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true]; + CommonJobEventDetails job = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true]; +} + +// Restore is recorded when a restore job is created and successful completion. +// If the job fails, events will be emitted on job creation, failure, and +// successful revert. +message Restore { + CommonEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true]; + CommonJobEventDetails job = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true]; +} diff --git a/pkg/util/log/eventpb/json_encode_generated.go b/pkg/util/log/eventpb/json_encode_generated.go index 1db3a298cb74..c131bf831c28 100644 --- a/pkg/util/log/eventpb/json_encode_generated.go +++ b/pkg/util/log/eventpb/json_encode_generated.go @@ -996,6 +996,80 @@ func (m *CommonEventDetails) AppendJSONFields(printComma bool, b redact.Redactab return printComma, b } +// AppendJSONFields implements the EventPayload interface. +func (m *CommonJobEventDetails) AppendJSONFields(printComma bool, b redact.RedactableBytes) (bool, redact.RedactableBytes) { + + if m.JobID != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"JobID\":"...) + b = strconv.AppendInt(b, int64(m.JobID), 10) + } + + if m.JobType != "" { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"JobType\":\""...) + b = redact.RedactableBytes(jsonbytes.EncodeString([]byte(b), m.JobType)) + b = append(b, '"') + } + + if m.Description != "" { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"Description\":\""...) + b = append(b, redact.StartMarker()...) + b = redact.RedactableBytes(jsonbytes.EncodeString([]byte(b), string(redact.EscapeMarkers([]byte(m.Description))))) + b = append(b, redact.EndMarker()...) + b = append(b, '"') + } + + if m.User != "" { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"User\":\""...) + b = append(b, redact.StartMarker()...) + b = redact.RedactableBytes(jsonbytes.EncodeString([]byte(b), string(redact.EscapeMarkers([]byte(m.User))))) + b = append(b, redact.EndMarker()...) + b = append(b, '"') + } + + if len(m.DescriptorIDs) > 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"DescriptorIDs\":["...) + for i, v := range m.DescriptorIDs { + if i > 0 { + b = append(b, ',') + } + b = strconv.AppendUint(b, uint64(v), 10) + } + b = append(b, ']') + } + + if m.Status != "" { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"Status\":\""...) + b = redact.RedactableBytes(jsonbytes.EncodeString([]byte(b), m.Status)) + b = append(b, '"') + } + + return printComma, b +} + // AppendJSONFields implements the EventPayload interface. func (m *CommonNodeDecommissionDetails) AppendJSONFields(printComma bool, b redact.RedactableBytes) (bool, redact.RedactableBytes) { @@ -1993,6 +2067,16 @@ func (m *FinishSchemaChangeRollback) AppendJSONFields(printComma bool, b redact. return printComma, b } +// AppendJSONFields implements the EventPayload interface. +func (m *Import) AppendJSONFields(printComma bool, b redact.RedactableBytes) (bool, redact.RedactableBytes) { + + printComma, b = m.CommonEventDetails.AppendJSONFields(printComma, b) + + printComma, b = m.CommonJobEventDetails.AppendJSONFields(printComma, b) + + return printComma, b +} + // AppendJSONFields implements the EventPayload interface. func (m *NodeDecommissioned) AppendJSONFields(printComma bool, b redact.RedactableBytes) (bool, redact.RedactableBytes) { @@ -2203,6 +2287,16 @@ func (m *RenameType) AppendJSONFields(printComma bool, b redact.RedactableBytes) return printComma, b } +// AppendJSONFields implements the EventPayload interface. +func (m *Restore) AppendJSONFields(printComma bool, b redact.RedactableBytes) (bool, redact.RedactableBytes) { + + printComma, b = m.CommonEventDetails.AppendJSONFields(printComma, b) + + printComma, b = m.CommonJobEventDetails.AppendJSONFields(printComma, b) + + return printComma, b +} + // AppendJSONFields implements the EventPayload interface. func (m *ReverseSchemaChange) AppendJSONFields(printComma bool, b redact.RedactableBytes) (bool, redact.RedactableBytes) {