Skip to content

Commit

Permalink
pkg/util/log: unify explicit flush of network and file sinks
Browse files Browse the repository at this point in the history
Ref: cockroachdb#101562

both for files *and* network sinks, such as fluent-servers.

I received some feedback that we shouldn't divide these
flush operations based on sink type, and instead we should
unify the flush operation to flush both (as the crash reporter
already does).

The existing function to flush just the file sinks is
quite widely used. Introducing flushing of network sinks
begs the question, "What if this adds to the runtime of
code using this explicit flush mechanism?"

Well, the existing FlushFileSinks calls fsync() [1] with
F_FULLFSYNC [2]. IIUC, this means that it already is a
blocking operation as the fsync() call will wait for the
buffered data to be written to permanent storage (not just
the disk cache). Given that, I think any caller of this
function already assumes it's a blocking operation and
therefore should be tolerant of that.

[1]: https://developer.apple.com/library/archive/documentation/System/Conceptual/ManPages_iPhoneOS/man2/fsync.2.html
[2]: https://developer.apple.com/library/archive/documentation/System/Conceptual/ManPages_iPhoneOS/man2/fcntl.2.html#//apple_ref/doc/man/2/fcntl

Nonetheless, we implement a timeout mechanism for the
flushing of the buffered network sinks as an added
protection.
  • Loading branch information
abarganier committed May 15, 2023
1 parent 5c97460 commit 1629e11
Show file tree
Hide file tree
Showing 38 changed files with 114 additions and 89 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ func requireRecoveryEvent(
expected eventpb.RecoveryEvent,
) {
testutils.SucceedsSoon(t, func() error {
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(
startTime,
math.MaxInt64,
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1802,7 +1802,7 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) {

cdcTest(t, testFn)

log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1, regexp.MustCompile("cdc ux violation"),
log.WithFlattenedSensitiveData)
if err != nil {
Expand Down Expand Up @@ -2172,7 +2172,7 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {

cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks)

log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
Expand Down Expand Up @@ -2358,7 +2358,7 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {

cdcTestWithSystem(t, testFn)

log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
Expand Down Expand Up @@ -2410,7 +2410,7 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) {
}

cdcTestWithSystem(t, testFn)
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
Expand Down Expand Up @@ -2517,7 +2517,7 @@ func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) {
}

cdcTest(t, testFn)
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ var cmLogRe = regexp.MustCompile(`event_log\.go`)
func checkStructuredLogs(t *testing.T, eventType string, startTime int64) []string {
var matchingEntries []string
testutils.SucceedsSoon(t, func() error {
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(startTime,
math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/nemeses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestChangefeedNemeses(t *testing.T) {
// nemeses_test.go:39: pq: unimplemented: operation is
// unsupported in multi-tenancy mode
cdcTest(t, testFn, feedTestNoTenants)
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/telemetryccl/telemetry_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestTelemetryLogRegions(t *testing.T) {
sqlDB.Exec(t, tc.query)
}

log.FlushFileSinks()
log.Flush()

entries, err := log.FetchEntriesFromFiles(
0,
Expand Down Expand Up @@ -322,7 +322,7 @@ func TestBulkJobTelemetryLogging(t *testing.T) {
execTimestamp++
}

log.FlushFileSinks()
log.Flush()

var filteredSampleQueries []logpb.Entry
testutils.SucceedsSoon(t, func() error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/testccl/authccl/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func TestClientAddrOverride(t *testing.T) {
t.Run("check-server-log-uses-override", func(t *testing.T) {
// Wait for the disconnection event in logs.
testutils.SucceedsSoon(t, func() error {
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(testStartTime.UnixNano(), math.MaxInt64, 10000, sessionTerminatedRe,
log.WithMarkedSensitiveData)
if err != nil {
Expand All @@ -541,7 +541,7 @@ func TestClientAddrOverride(t *testing.T) {
})

// Now we want to check that the logging tags are also updated.
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(testStartTime.UnixNano(), math.MaxInt64, 10000, authLogFileRe,
log.WithMarkedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/testccl/sqlccl/tenant_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func TestGCTenantJobWaitsForProtectedTimestamps(t *testing.T) {

checkGCBlockedByPTS := func(t *testing.T, sj *jobs.StartableJob, tenID uint64) {
testutils.SucceedsSoon(t, func() error {
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile(fmt.Sprintf("GC TTL for dropped tenant %d has expired, but protected timestamp record\\(s\\)", tenID)),
log.WithFlattenedSensitiveData)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func runConnectInit(cmd *cobra.Command, args []string) (retErr error) {
}

// Ensure that log files are populated when the process terminates.
defer log.FlushFileSinks()
defer log.Flush()

peers := []string(serverCfg.JoinList)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/debug_send_kv_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestSendKVBatch(t *testing.T) {
require.JSONEq(t, jsonResponse, output)

// Check that a structured log event was emitted.
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(start.UnixNano(), timeutil.Now().UnixNano(), 1,
regexp.MustCompile("debug_send_kv_batch"), log.WithFlattenedSensitiveData)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func runDemoInternal(
}

// Ensure the last few entries in the log files are flushed at the end.
defer log.FlushFileSinks()
defer log.Flush()

return sqlCtx.Run(ctx, conn)
}
4 changes: 2 additions & 2 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ func createAndStartServerAsync(

go func() {
// Ensure that the log files see the startup messages immediately.
defer log.FlushFileSinks()
defer log.Flush()
// If anything goes dramatically wrong, use Go's panic/recover
// mechanism to intercept the panic and log the panic details to
// the error reporting server.
Expand Down Expand Up @@ -1524,7 +1524,7 @@ func reportReadinessExternally(ctx context.Context, cmd *cobra.Command, waitForI
// Ensure the configuration logging is written to disk in case a
// process is waiting for the sdnotify readiness to read important
// information from there.
log.FlushFileSinks()
log.Flush()

// Signal readiness. This unblocks the process when running with
// --background or under systemd.
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/jobstest/logutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func CheckEmittedEvents(
) {
// Check that the structured event was logged.
testutils.SucceedsSoon(t, func() error {
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(startTime,
math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ SELECT unnest(execution_errors)
t *testing.T, id jobspb.JobID, status jobs.Status,
from, to time.Time, cause string,
) {
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(
from.UnixNano(), to.UnixNano(), 2,
regexp.MustCompile(fmt.Sprintf(
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ func TestCorruptData(t *testing.T) {
require.NoError(t, err)
}

log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 100, msg,
log.WithFlattenedSensitiveData)
require.NoError(t, err)
Expand Down Expand Up @@ -732,7 +732,7 @@ func TestCorruptData(t *testing.T) {
require.Nil(t, got)
_, err = pts.GetState(ctx)
require.NoError(t, err)
log.FlushFileSinks()
log.Flush()

entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 100, msg,
log.WithFlattenedSensitiveData)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13530,7 +13530,7 @@ func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) {
if _, pErr := tc.repl.Send(ctx, ba); pErr != nil {
t.Fatal(pErr)
}
log.FlushFileSinks()
log.Flush()

stopper.Quiesce(ctx)
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ func TestReplicateQueueTracingOnError(t *testing.T) {

// Flush logs and get log messages from replicate_queue.go since just
// before calling store.Enqueue(..).
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(),
math.MaxInt64, 100, regexp.MustCompile(`replicate_queue\.go`), log.WithMarkedSensitiveData)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/security/certmgr/cert_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var cmLogRe = regexp.MustCompile(`event_log\.go`)

// Check that the structured event was logged.
func checkLogStructEntry(t *testing.T, expectSuccess bool, beforeReload time.Time) error {
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(beforeReload.UnixNano(),
math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/security/certs_rotation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestRotateCerts(t *testing.T) {
// the moment the structured logging event is actually
// written to the log file.
testutils.SucceedsSoon(t, func() error {
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(beforeReload.UnixNano(),
math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func TestPersistHLCUpperBound(t *testing.T) {
var fatal bool
defer log.ResetExitFunc()
log.SetExitFunc(true /* hideStack */, func(r exit.Code) {
defer log.FlushFileSinks()
defer log.Flush()
if r == exit.FatalError() {
fatal = true
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,7 @@ func (s *statusServer) LogFilesList(
}
return status.LogFilesList(ctx, req)
}
log.FlushFileSinks()
log.Flush()
logFiles, err := log.ListLogFiles()
if err != nil {
return nil, serverError(ctx, err)
Expand Down Expand Up @@ -1278,7 +1278,7 @@ func (s *statusServer) LogFile(
inputEditMode := log.SelectEditMode(req.Redact, log.KeepRedactable)

// Ensure that the latest log entries are available in files.
log.FlushFileSinks()
log.Flush()

// Read the logs.
reader, err := log.GetLogReader(req.File)
Expand Down Expand Up @@ -1408,7 +1408,7 @@ func (s *statusServer) Logs(
}

// Ensure that the latest log entries are available in files.
log.FlushFileSinks()
log.Flush()

// Read the logs.
entries, err := log.FetchEntriesFromFiles(
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/status/runtime_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestStructuredEventLogging(t *testing.T) {
time.Sleep(10 * time.Second)

// Ensure that the entry hits the OS so it can be read back below.
log.FlushFileSinks()
log.Flush()

entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(),
math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData)
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/structlogging/hot_ranges_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestHotRangesStats(t *testing.T) {
})

testutils.SucceedsWithin(t, func() error {
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(
0,
math.MaxInt64,
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/admin_audit_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestAdminAuditLogBasic(t *testing.T) {
db.Exec(t, `SELECT 1;`)

var selectAdminRe = regexp.MustCompile(`"EventType":"admin_query","Statement":"SELECT ‹1›","Tag":"SELECT","User":"root"`)
log.FlushFileSinks()
log.Flush()

entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 10000, selectAdminRe,
log.WithMarkedSensitiveData)
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestAdminAuditLogRegularUser(t *testing.T) {

var selectRe = regexp.MustCompile(`SELECT 1`)

log.FlushFileSinks()
log.Flush()

entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 10000, selectRe,
log.WithMarkedSensitiveData)
Expand Down Expand Up @@ -180,7 +180,7 @@ COMMIT;
},
}

log.FlushFileSinks()
log.Flush()

entries, err := log.FetchEntriesFromFiles(
0,
Expand Down Expand Up @@ -275,7 +275,7 @@ COMMIT;
},
}

log.FlushFileSinks()
log.Flush()

entries, err := log.FetchEntriesFromFiles(
0,
Expand Down Expand Up @@ -319,7 +319,7 @@ COMMIT;
t.Fatal(err)
}

log.FlushFileSinks()
log.Flush()

entries, err = log.FetchEntriesFromFiles(
0,
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/event_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestStructuredEventLogging(t *testing.T) {
}

// Ensure that the entries hit the OS so they can be read back below.
log.FlushFileSinks()
log.Flush()

entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(),
math.MaxInt64, 10000, execLogRe, log.WithMarkedSensitiveData)
Expand Down Expand Up @@ -736,7 +736,7 @@ func TestPerfLogging(t *testing.T) {
}

var logRe = regexp.MustCompile(tc.logRe)
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(
start, math.MaxInt64, 1000, logRe, log.WithMarkedSensitiveData,
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/pgwire/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ func TestClientAddrOverride(t *testing.T) {
t.Run("check-server-log-uses-override", func(t *testing.T) {
// Wait for the disconnection event in logs.
testutils.SucceedsSoon(t, func() error {
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(testStartTime.UnixNano(), math.MaxInt64, 10000, sessionTerminatedRe,
log.WithMarkedSensitiveData)
if err != nil {
Expand All @@ -747,7 +747,7 @@ func TestClientAddrOverride(t *testing.T) {
})

// Now we want to check that the logging tags are also updated.
log.FlushFileSinks()
log.Flush()
entries, err := log.FetchEntriesFromFiles(testStartTime.UnixNano(), math.MaxInt64, 10000, authLogFileRe,
log.WithMarkedSensitiveData)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func checkNumTotalEntriesAndNumIndexEntries(
expectedIndividualIndexEntries int,
scheduleCompleteChan chan struct{},
) error {
log.FlushFileSinks()
log.Flush()
// Fetch log entries.
entries, err := log.FetchEntriesFromFiles(
0,
Expand Down
Loading

0 comments on commit 1629e11

Please sign in to comment.