Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/util/log: flush buffered network sinks on panic #109186

Merged
merged 2 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/ccl/auditloggingccl/audit_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestRoleBasedAuditEnterpriseGated(t *testing.T) {
// Run a test query.
rootRunner.Exec(t, `SHOW CLUSTER SETTING sql.log.user_audit`)

log.Flush()
log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(
0,
Expand All @@ -87,7 +87,7 @@ func TestRoleBasedAuditEnterpriseGated(t *testing.T) {
// Run a test query.
rootRunner.Exec(t, `SHOW CLUSTER SETTING sql.log.user_audit`)

log.Flush()
log.FlushFiles()

entries, err = log.FetchEntriesFromFiles(
0,
Expand Down Expand Up @@ -214,7 +214,7 @@ func TestSingleRoleAuditLogging(t *testing.T) {
rootRunner.Exec(t, fmt.Sprintf("REVOKE %s from testuser", td.role))
}

log.Flush()
log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(
0,
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestMultiRoleAuditLogging(t *testing.T) {
testRunner.Exec(t, query)
}

log.Flush()
log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(
0,
Expand Down Expand Up @@ -412,7 +412,7 @@ func TestReducedAuditConfig(t *testing.T) {
// for the user at that time.
testRunner.Exec(t, testQuery)

log.Flush()
log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(
0,
Expand All @@ -439,7 +439,7 @@ func TestReducedAuditConfig(t *testing.T) {
// The user now has a corresponding audit setting. We use a new query here to differentiate.
testRunner2.Exec(t, `GRANT SELECT ON TABLE u TO root`)

log.Flush()
log.FlushFiles()

entries, err = log.FetchEntriesFromFiles(
0,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func requireRecoveryEvent(
expected eventpb.RecoveryEvent,
) {
testutils.SucceedsSoon(t, func() error {
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(
startTime,
math.MaxInt64,
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1725,7 +1725,7 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) {

cdcTest(t, testFn)

log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1, regexp.MustCompile("cdc ux violation"),
log.WithFlattenedSensitiveData)
if err != nil {
Expand Down Expand Up @@ -2127,7 +2127,7 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {

cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks)

log.Flush()
log.FlushFiles()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it looks like we do this a lot in tests.

log.FlushFiles()
log.FetchEntriesFromFiles(...)

It would be nice if you didn't have to worry about flushing in tests. Maybe this could be one function call to a test helper FlushAndFetchEntriesFromFiles. I don't feel too strongly about this though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could just roll it into the existing FetchEntriesFromFiles without renaming it?

Presumably, if you want to fetch an entry from a file, you want to flush the thing first 😅

entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
Expand Down Expand Up @@ -2323,7 +2323,7 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) {

cdcTestWithSystem(t, testFn)

log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
Expand Down Expand Up @@ -2480,7 +2480,7 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {

cdcTestWithSystem(t, testFn)

log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
Expand Down Expand Up @@ -2543,7 +2543,7 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) {
}

cdcTestWithSystem(t, testFn)
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
Expand Down Expand Up @@ -2574,7 +2574,7 @@ func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) {
}

cdcTest(t, testFn)
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,7 @@ var cmLogRe = regexp.MustCompile(`event_log\.go`)
func checkStructuredLogs(t *testing.T, eventType string, startTime int64) []string {
var matchingEntries []string
testutils.SucceedsSoon(t, func() error {
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(startTime,
math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/nemeses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestChangefeedNemeses(t *testing.T) {
//
// TODO(knz): This seems incorrect, see issue #109417.
cdcTest(t, testFn, feedTestNoTenants)
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/telemetryccl/telemetry_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestTelemetryLogRegions(t *testing.T) {
sqlDB.Exec(t, tc.query)
}

log.Flush()
log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(
0,
Expand Down Expand Up @@ -322,7 +322,7 @@ func TestBulkJobTelemetryLogging(t *testing.T) {
execTimestamp++
}

log.Flush()
log.FlushFiles()

var filteredSampleQueries []logpb.Entry
testutils.SucceedsSoon(t, func() error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/testccl/authccl/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func TestClientAddrOverride(t *testing.T) {
t.Run("check-server-log-uses-override", func(t *testing.T) {
// Wait for the disconnection event in logs.
testutils.SucceedsSoon(t, func() error {
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(testStartTime.UnixNano(), math.MaxInt64, 10000, sessionTerminatedRe,
log.WithMarkedSensitiveData)
if err != nil {
Expand All @@ -539,7 +539,7 @@ func TestClientAddrOverride(t *testing.T) {
})

// Now we want to check that the logging tags are also updated.
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(testStartTime.UnixNano(), math.MaxInt64, 10000, authLogFileRe,
log.WithMarkedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/testccl/sqlccl/tenant_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestGCTenantJobWaitsForProtectedTimestamps(t *testing.T) {

checkGCBlockedByPTS := func(t *testing.T, sj *jobs.StartableJob, tenID uint64) {
testutils.SucceedsSoon(t, func() error {
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile(fmt.Sprintf("GC TTL for dropped tenant %d has expired, but protected timestamp record\\(s\\)", tenID)),
log.WithFlattenedSensitiveData)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func runConnectInit(cmd *cobra.Command, args []string) (retErr error) {
}

// Ensure that log files are populated when the process terminates.
defer log.Flush()
defer log.FlushFiles()

peers := []string(serverCfg.JoinList)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/debug_send_kv_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestSendKVBatch(t *testing.T) {
require.JSONEq(t, jsonResponse, output)

// Check that a structured log event was emitted.
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(start.UnixNano(), timeutil.Now().UnixNano(), 1,
regexp.MustCompile("debug_send_kv_batch"), log.WithFlattenedSensitiveData)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func runDemoInternal(
}

// Ensure the last few entries in the log files are flushed at the end.
defer log.Flush()
defer log.FlushFiles()

return sqlCtx.Run(ctx, conn)
}
4 changes: 2 additions & 2 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ func createAndStartServerAsync(

go func() {
// Ensure that the log files see the startup messages immediately.
defer log.Flush()
defer log.FlushAllSync()
// If anything goes dramatically wrong, use Go's panic/recover
// mechanism to intercept the panic and log the panic details to
// the error reporting server.
Expand Down Expand Up @@ -1497,7 +1497,7 @@ func reportReadinessExternally(ctx context.Context, cmd *cobra.Command, waitForI
// Ensure the configuration logging is written to disk in case a
// process is waiting for the sdnotify readiness to read important
// information from there.
log.Flush()
log.FlushAllSync()

// Signal readiness. This unblocks the process when running with
// --background or under systemd.
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/jobstest/logutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func CheckEmittedEvents(
) {
// Check that the structured event was logged.
testutils.SucceedsSoon(t, func() error {
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(startTime,
math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ SELECT unnest(execution_errors)
t *testing.T, id jobspb.JobID, status jobs.Status,
from, to time.Time, cause string,
) {
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(
from.UnixNano(), to.UnixNano(), 2,
regexp.MustCompile(fmt.Sprintf(
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1569,7 +1569,7 @@ func TestReceiveSnapshotLogging(t *testing.T) {
// When ready, flush logs and check messages from store_raft.go since
// call to repl.ChangeReplicas(..).
<-signals.receiverDoneCh
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(),
math.MaxInt64, 100, regexp.MustCompile(`store_raft\.go`), log.WithMarkedSensitiveData)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func TestCorruptData(t *testing.T) {
require.NoError(t, err)
}

log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 100, msg,
log.WithFlattenedSensitiveData)
require.NoError(t, err)
Expand Down Expand Up @@ -739,7 +739,7 @@ func TestCorruptData(t *testing.T) {
require.Nil(t, got)
_, err = pts.GetState(ctx)
require.NoError(t, err)
log.Flush()
log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 100, msg,
log.WithFlattenedSensitiveData)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ func TestReplicateQueueTracingOnError(t *testing.T) {

// Flush logs and get log messages from replicate_queue.go since just
// before calling store.Enqueue(..).
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(),
math.MaxInt64, 100, regexp.MustCompile(`replicate_queue\.go`), log.WithMarkedSensitiveData)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/security/certmgr/cert_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var cmLogRe = regexp.MustCompile(`event_log\.go`)

// Check that the structured event was logged.
func checkLogStructEntry(t *testing.T, expectSuccess bool, beforeReload time.Time) error {
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(beforeReload.UnixNano(),
math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/security/certs_rotation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func TestRotateCerts(t *testing.T) {
// the moment the structured logging event is actually
// written to the log file.
testutils.SucceedsSoon(t, func() error {
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(beforeReload.UnixNano(),
math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func TestPersistHLCUpperBound(t *testing.T) {
var fatal bool
defer log.ResetExitFunc()
log.SetExitFunc(true /* hideStack */, func(r exit.Code) {
defer log.Flush()
defer log.FlushFiles()
if r == exit.FatalError() {
fatal = true
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -1237,7 +1237,7 @@ func (s *statusServer) LogFilesList(
}
return status.LogFilesList(ctx, req)
}
log.Flush()
log.FlushFiles()
logFiles, err := log.ListLogFiles()
if err != nil {
return nil, srverrors.ServerError(ctx, err)
Expand Down Expand Up @@ -1277,7 +1277,7 @@ func (s *statusServer) LogFile(
inputEditMode := log.SelectEditMode(req.Redact, log.KeepRedactable)

// Ensure that the latest log entries are available in files.
log.Flush()
log.FlushFiles()

// Read the logs.
reader, err := log.GetLogReader(req.File)
Expand Down Expand Up @@ -1407,7 +1407,7 @@ func (s *statusServer) Logs(
}

// Ensure that the latest log entries are available in files.
log.Flush()
log.FlushFiles()

// Read the logs.
entries, err := log.FetchEntriesFromFiles(
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/status/runtime_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestStructuredEventLogging(t *testing.T) {
time.Sleep(10 * time.Second)

// Ensure that the entry hits the OS so it can be read back below.
log.Flush()
log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(),
math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData)
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/structlogging/hot_ranges_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestHotRangesStats(t *testing.T) {
})

testutils.SucceedsWithin(t, func() error {
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(
0,
math.MaxInt64,
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/admin_audit_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestAdminAuditLogBasic(t *testing.T) {
db.Exec(t, `SELECT 1;`)

var selectAdminRe = regexp.MustCompile(`"EventType":"admin_query","Statement":"SELECT ‹1›","Tag":"SELECT","User":"root"`)
log.Flush()
log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 10000, selectAdminRe,
log.WithMarkedSensitiveData)
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestAdminAuditLogRegularUser(t *testing.T) {

var selectRe = regexp.MustCompile(`SELECT 1`)

log.Flush()
log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 10000, selectRe,
log.WithMarkedSensitiveData)
Expand Down Expand Up @@ -168,7 +168,7 @@ COMMIT;
},
}

log.Flush()
log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(
0,
Expand Down Expand Up @@ -253,7 +253,7 @@ COMMIT;
},
}

log.Flush()
log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(
0,
Expand Down Expand Up @@ -297,7 +297,7 @@ COMMIT;
t.Fatal(err)
}

log.Flush()
log.FlushFiles()

entries, err = log.FetchEntriesFromFiles(
0,
Expand Down
Loading