Skip to content

Commit

Permalink
log,importccl,backupccl: add event logs for import and restore jobs
Browse files Browse the repository at this point in the history
This change adds a new `CommonJobEventDetails` and two implementations
of the interface in `Import` and `Restore`, to structure the events
emitted during different phases of the job execution.

Both Import/Restore will emit logs at the following stages:
- When the job has completed preparing and is beginning execution.
- When the job has succeeded.
- When the job has started reverting.
- When the job has finished reverting and is in a terminal failed state.

Release note (sql change): IMPORT and RESTORE now emit structured events
to the OPS channel. Refer to the generated documentation for details.
  • Loading branch information
adityamaru committed Mar 31, 2021
1 parent 46a8edd commit afa9b6d
Show file tree
Hide file tree
Showing 22 changed files with 1,651 additions and 38 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1576,7 +1576,8 @@ EVENTLOG_PROTOS = \
pkg/util/log/eventpb/zone_events.proto \
pkg/util/log/eventpb/session_events.proto \
pkg/util/log/eventpb/sql_audit_events.proto \
pkg/util/log/eventpb/cluster_events.proto
pkg/util/log/eventpb/cluster_events.proto \
pkg/util/log/eventpb/job_events.proto

LOGSINKDOC_DEP = pkg/util/log/logconfig/config.go

Expand Down
59 changes: 59 additions & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,65 @@ after being offline.
| `StartedAt` | The time when this node was last started. | no |
| `LastUp` | The approximate last time the node was up before the last restart. | no |

## Job events

Events in this category pertain to long-running jobs that are orchestrated by
a node's job registry. These system processes can create and/or modify stored
objects during the course of their execution.

A job might choose to emit multiple events during its execution when
transitioning from one "state" to another.
Egs: IMPORT/RESTORE will emit events on job creation and successful
completion. If the job fails, events will be emitted on job creation,
failure, and successful revert.

Events in this category are logged to channel OPS.


### `import`

An event of type `import` is recorded when an import job is created and successful completion.
If the job fails, events will be emitted on job creation, failure, and
successful revert.




#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
| `EventType` | The type of the event. | no |
| `JobID` | The ID of the job that triggered the event. | no |
| `JobType` | The type of the job that triggered the event. | no |
| `Description` | A description of the job that triggered the event. Some jobs populate the description with an approximate representation of the SQL statement run to create the job. | yes |
| `User` | The user account that triggered the event. | yes |
| `DescriptorIDs` | The object descriptors affected by the job. Set to zero for operations that don't affect descriptors. | yes |
| `Status` | The status of the job that triggered the event. This allows the job to indicate which phase execution it is in when the event is triggered. | no |

### `restore`

An event of type `restore` is recorded when a restore job is created and successful completion.
If the job fails, events will be emitted on job creation, failure, and
successful revert.




#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
| `EventType` | The type of the event. | no |
| `JobID` | The ID of the job that triggered the event. | no |
| `JobType` | The type of the job that triggered the event. | no |
| `Description` | A description of the job that triggered the event. Some jobs populate the description with an approximate representation of the SQL statement run to create the job. | yes |
| `User` | The user account that triggered the event. | yes |
| `DescriptorIDs` | The object descriptors affected by the job. Set to zero for operations that don't affect descriptors. | yes |
| `Status` | The status of the job that triggered the event. This allows the job to indicate which phase execution it is in when the event is triggered. | no |

## Miscellaneous SQL events

Events in this category report miscellaneous SQL events.
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"split_and_scatter_processor.go",
"system_schema.go",
"targets.go",
"testutils.go",
],
embed = [":backupccl_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/backupccl",
Expand Down Expand Up @@ -86,11 +87,13 @@ go_library(
"//pkg/storage",
"//pkg/storage/cloud",
"//pkg/storage/cloudimpl",
"//pkg/testutils",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/interval",
"//pkg/util/log",
"//pkg/util/log/eventpb",
"//pkg/util/metric",
"//pkg/util/protoutil",
"//pkg/util/retry",
Expand All @@ -104,6 +107,7 @@ go_library(
"@com_github_gogo_protobuf//types",
"@com_github_gorhill_cronexpr//:cronexpr",
"@com_github_lib_pq//oid",
"@com_github_stretchr_testify//require",
],
)

Expand Down
63 changes: 63 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7707,3 +7707,66 @@ table_name FROM [SHOW TABLES] ORDER BY schema_name, table_name`
return nil
})
}

func TestRestoreJobEventLogging(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.ScopeWithoutShowLogs(t).Close(t)

defer jobs.TestingSetProgressThresholds()()
defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()

baseDir := "testdata"
args := base.TestServerArgs{ExternalIODir: baseDir}
params := base.TestClusterArgs{ServerArgs: args}
_, tc, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, 1,
InitManualReplication, params)
defer cleanupFn()

var forceFailure bool
for i := range tc.Servers {
tc.Servers[i].JobRegistry().(*jobs.Registry).TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*restoreResumer)
r.testingKnobs.beforePublishingDescriptors = func() error {
if forceFailure {
return errors.New("testing injected failure")
}
return nil
}
return r
},
}
}

sqlDB.Exec(t, `CREATE DATABASE r1`)
sqlDB.Exec(t, `CREATE TABLE r1.foo (id INT)`)
sqlDB.Exec(t, `BACKUP DATABASE r1 TO 'nodelocal://0/eventlogging'`)
sqlDB.Exec(t, `DROP DATABASE r1`)

beforeRestore := timeutil.Now()
restoreQuery := `RESTORE DATABASE r1 FROM 'nodelocal://0/eventlogging'`

var jobID int64
var unused interface{}
sqlDB.QueryRow(t, restoreQuery).Scan(&jobID, &unused, &unused, &unused, &unused,
&unused)

expectedStatus := []string{string(jobs.StatusSucceeded), string(jobs.StatusRunning)}
CheckEmittedEvents(t, expectedStatus, beforeRestore.UnixNano(), jobID, "restore",
"RESTORE")

sqlDB.Exec(t, `DROP DATABASE r1`)

// Now let's test the events that are emitted when a job fails.
forceFailure = true
beforeSecondRestore := timeutil.Now()
sqlDB.ExpectErrSucceedsSoon(t, "testing injected failure", restoreQuery)

row := sqlDB.QueryRow(t, "SELECT job_id FROM [SHOW JOBS] WHERE status = 'failed'")
row.Scan(&jobID)

expectedStatus = []string{string(jobs.StatusFailed), string(jobs.StatusReverting),
string(jobs.StatusRunning)}
CheckEmittedEvents(t, expectedStatus, beforeSecondRestore.UnixNano(), jobID,
"restore", "RESTORE")
}
34 changes: 33 additions & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/interval"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -1345,6 +1346,9 @@ func createImportingDescriptors(
// Update the job once all descs have been prepared for ingestion.
err := r.job.SetDetails(ctx, txn, details)

// Emit to the event log now that the job has finished preparing descs.
emitRestoreJobEvent(ctx, p, jobs.StatusRunning, r.job)

return err
})
if err != nil {
Expand Down Expand Up @@ -1507,6 +1511,7 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error
return err
}
}
emitRestoreJobEvent(ctx, p, jobs.StatusSucceeded, r.job)
return nil
}

Expand Down Expand Up @@ -1613,6 +1618,9 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error

r.restoreStats = resTotal

// Emit an event now that the restore job has completed.
emitRestoreJobEvent(ctx, p, jobs.StatusSucceeded, r.job)

// Collect telemetry.
{
numClusterNodes, err := clusterNodeCount(p.ExecCfg().Gossip)
Expand Down Expand Up @@ -1850,19 +1858,36 @@ func (r *restoreResumer) publishDescriptors(
return nil
}

func emitRestoreJobEvent(
ctx context.Context, p sql.JobExecContext, status jobs.Status, job *jobs.Job,
) {
// Emit to the event log now that we have completed the prepare step.
var restoreEvent eventpb.Restore
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return sql.LogEventForJobs(ctx, p.ExecCfg(), txn, &restoreEvent, int64(job.ID()),
job.Payload(), p.User(), status)
}); err != nil {
log.Warningf(ctx, "failed to log event: %v", err)
}
}

// OnFailOrCancel is part of the jobs.Resumer interface. Removes KV data that
// has been committed from a restore that has failed or been canceled. It does
// this by adding the table descriptors in DROP state, which causes the schema
// change stuff to delete the keys in the background.
func (r *restoreResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
p := execCtx.(sql.JobExecContext)
// Emit to the event log that the job has started reverting.
emitRestoreJobEvent(ctx, p, jobs.StatusReverting, r.job)

telemetry.Count("restore.total.failed")
telemetry.CountBucketed("restore.duration-sec.failed",
int64(timeutil.Since(timeutil.FromUnixMicros(r.job.Payload().StartedMicros)).Seconds()))

details := r.job.Details().(jobspb.RestoreDetails)

execCfg := execCtx.(sql.JobExecContext).ExecCfg()
return descs.Txn(ctx, execCfg.Settings, execCfg.LeaseManager, execCfg.InternalExecutor,
err := descs.Txn(ctx, execCfg.Settings, execCfg.LeaseManager, execCfg.InternalExecutor,
execCfg.DB, func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error {
for _, tenant := range details.Tenants {
tenant.State = descpb.TenantInfo_DROP
Expand All @@ -1874,6 +1899,13 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}
}
return r.dropDescriptors(ctx, execCfg.JobRegistry, execCfg.Codec, txn, descsCol)
})
if err != nil {
return err
}

// Emit to the event log that the job has completed reverting.
emitRestoreJobEvent(ctx, p, jobs.StatusFailed, r.job)
return nil
}

// dropDescriptors implements the OnFailOrCancel logic.
Expand Down
68 changes: 68 additions & 0 deletions pkg/ccl/backupccl/testutils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package backupccl

import (
"encoding/json"
"math"
"regexp"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// CheckEmittedEvents is a helper method used by IMPORT and RESTORE tests to
// ensure events are emitted deterministically.
func CheckEmittedEvents(
t *testing.T,
expectedStatus []string,
startTime int64,
jobID int64,
expectedMessage, expectedJobType string,
) {
// Check that the structured event was logged.
testutils.SucceedsSoon(t, func() error {
log.Flush()
entries, err := log.FetchEntriesFromFiles(startTime,
math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData)
if err != nil {
t.Fatal(err)
}
foundEntry := false
for i, e := range entries {
if !strings.Contains(e.Message, expectedMessage) {
continue
}
foundEntry = true
// TODO(knz): Remove this when crdb-v2 becomes the new format.
e.Message = strings.TrimPrefix(e.Message, "Structured entry:")
// crdb-v2 starts json with an equal sign.
e.Message = strings.TrimPrefix(e.Message, "=")
jsonPayload := []byte(e.Message)
var ev eventpb.CommonJobEventDetails
if err := json.Unmarshal(jsonPayload, &ev); err != nil {
t.Errorf("unmarshalling %q: %v", e.Message, err)
}
require.Equal(t, expectedJobType, ev.JobType)
require.Equal(t, jobID, ev.JobID)
require.Equal(t, expectedStatus[i], ev.Status)
}
if !foundEntry {
return errors.New("structured entry for import not found in log")
}
return nil
})
}

var cmLogRe = regexp.MustCompile(`event_log\.go`)
1 change: 1 addition & 0 deletions pkg/ccl/importccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ go_library(
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/log/eventpb",
"//pkg/util/protoutil",
"//pkg/util/retry",
"//pkg/util/timeutil",
Expand Down
Loading

0 comments on commit afa9b6d

Please sign in to comment.