Skip to content

Commit

Permalink
server: Implement logsink for the system event logs.
Browse files Browse the repository at this point in the history
Fixes cockroachdb#45643

Cockroach server logs important system events into the eventlog table.
These events are exposed on the web UI.  However, the operators often
want to see those global events while tailing a log file on a single
node.

Implement a mechanism for the server running on each node
to emit those system events into server log file.

If the system log scanning is enabled (via server.eventlogsink.enabled setting),
then each node scans the system log table periodically,
every server.eventlogsink.period period;

For example, below is a single system event emitted to the regular log file.:
  I200323 .... [n1] system.eventlog:n=1:'set_cluster_setting':2020-03-23 19:24:29.948279
    +0000 UTC '{"SettingName":"server.eventlogsink.max_entries","Value":"101","User":"root"}'

There is no guaranteed that all events from system log will be eimitted.
In particular, upon node restart, we only emit events that were generated
from that point on.  Also, if for whatever reason,we start emitting
too many system log messages, then only up to the
server.eventlogsink.max_entries (default 100) recent events will be emitted.
However, if we think we have "dropped" some events due to confuration settings,
we will indicate so in the log.

Release notes (feature): Log system wide events into cockroach.log
file on every node.
  • Loading branch information
Yevgeniy Miretskiy committed Mar 23, 2020
1 parent 0f220ff commit 5c55957
Show file tree
Hide file tree
Showing 3 changed files with 320 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1698,6 +1698,8 @@ func (s *Server) Start(ctx context.Context) error {

// Start garbage collecting system events.
s.startSystemLogsGC(ctx)
// Start system.eventlog sink to local logs
s.startEventlogSink(ctx)

// Serve UI assets.
//
Expand Down
193 changes: 193 additions & 0 deletions pkg/server/server_systemlog_logsink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

var eventlogSinkEnabled = settings.RegisterBoolSetting(
"server.eventlogsink.enabled",
`set to true to enable system.eventlog entries to be emitted to the node log`,
false,
)

var eventlogSinkPeriod = settings.RegisterDurationSetting(
"server.eventlogsink.period",
`how frequently should the eventlog table be polled`,
10*time.Second,
)

var eventlogSinkMaxEntries = settings.RegisterIntSetting(
"server.eventlogsink.max_entries",
`max number of eventlog entries to emit; set to 0 to emit all`,
100,
)

var eventlogSincIncludeEvents = settings.RegisterStringSetting(
"server.eventlogsink.include_events",
`extract events matching this regex`,
".*",
)

var eventlogSincExcludeEvents = settings.RegisterStringSetting(
"server.eventlogsink.exclude_events",
`exclude events matching this regex`,
"comment|sequence",
)

func getEventlogSinkPeriod(sv *settings.Values) time.Duration {
if !eventlogSinkEnabled.Get(sv) {
// If log sink is not enabled, recheck after a minute.
return time.Minute
}

p := eventlogSinkPeriod.Get(sv)
if p < 100*time.Millisecond {
return 100 * time.Millisecond
}
return p
}

// eventLogSink is just a wrapper around log methods to facilitate testing.
type eventLogSink interface {
ReportEvent(ctx context.Context, ts time.Time, event string, reporter string, info string)
Errorf(ctx context.Context, format string, args ...interface{})
}

type defaultLogSink struct{}

var _ eventLogSink = &defaultLogSink{}

func (d defaultLogSink) ReportEvent(
ctx context.Context, ts time.Time, event string, reporter string, info string,
) {
log.Infof(ctx, "system.eventlog:n=%s:%s:%s %s", reporter, event, ts, info)
}

func (d defaultLogSink) Errorf(ctx context.Context, format string, args ...interface{}) {
log.Errorf(ctx, format, args...)
}

// sinkEventLog emits system.eventlog entries into the node log file.
// TODO(yevgeniy): Replace implementation with rangefeed based one, once rangefeeds
// support listening to system tables.
func sinkEventlog(
ctx context.Context,
sink eventLogSink,
db *kv.DB,
ex *sql.InternalExecutor,
sv *settings.Values,
logTimestamp time.Time,
) (time.Time, error) {
maxEntries := eventlogSinkMaxEntries.Get(sv)
var limit interface{}
if maxEntries > 0 {
limit = maxEntries
}
includeEvents := eventlogSincIncludeEvents.Get(sv)
excludeEvents := eventlogSincExcludeEvents.Get(sv)

const (
tsCol = iota
eventTypeCol
reportingIDCol
infoCol
)

const getEventsQuery = `
SELECT timestamp, "eventType", "reportingID", info
FROM [
SELECT * FROM system.eventlog
WHERE
timestamp > $1 AND
regexp_extract("eventType", $2) IS NOT NULL AND -- include events
regexp_extract("eventType", $3) IS NULL -- exclude events
ORDER BY timestamp DESC
LIMIT $4
] ORDER BY timestamp`

maxTs := logTimestamp
err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
var err error
rows, err := ex.QueryEx(
ctx,
"system.eventlog-sink",
txn,
sqlbase.InternalExecutorSessionDataOverride{User: security.RootUser},
getEventsQuery,
logTimestamp,
includeEvents,
excludeEvents,
limit,
)

if err != nil {
return err
}

if int64(len(rows)) == maxEntries {
log.Infof(ctx,
"system.eventlog likely contains more than %d events emitted since %s; "+
"consider increasing server.eventlogsink.max_entries setting, "+
"or lowering the server.eventlogsink.period",
maxEntries, logTimestamp,
)
}

for _, row := range rows {
ts, ok := row[tsCol].(*tree.DTimestamp)
if !ok {
return errors.Errorf("timestamp is of unknown type %T", rows[0])
}

maxTs = ts.Time
sink.ReportEvent(
ctx, maxTs, row[eventTypeCol].String(), row[reportingIDCol].String(), row[infoCol].String())
}
return nil
})

return maxTs, err
}

func (s *Server) startEventlogSink(ctx context.Context) {
logTimestamp := time.Now()

s.stopper.RunWorker(ctx, func(ctx context.Context) {
sv := &s.ClusterSettings().SV
for {
select {
case <-s.stopper.ShouldStop():
return
case <-time.After(getEventlogSinkPeriod(sv)):
if eventlogSinkEnabled.Get(sv) {
newTs, err := sinkEventlog(ctx, defaultLogSink{}, s.db, s.internalExecutor, sv, logTimestamp)
if err == nil {
logTimestamp = newTs
} else {
log.Errorf(ctx, "failed to sink system.eventlog records: %v", err)
}
}
}
}
})
}
125 changes: 125 additions & 0 deletions pkg/server/server_systemlog_logsink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2018 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server

import (
"context"
"database/sql"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

type testLogSink struct {
server *TestServer
db *sql.DB
events []string
haveErr bool
}

var _ eventLogSink = &testLogSink{}

func (t *testLogSink) ReportEvent(
ctx context.Context, ts time.Time, event string, reporter string, info string,
) {
t.events = append(t.events, event)
}

func (t *testLogSink) Errorf(ctx context.Context, format string, args ...interface{}) {
t.haveErr = true
}

func (t *testLogSink) doSink(ts time.Time) (time.Time, error) {
t.events = nil
t.haveErr = false
return sinkEventlog(context.Background(),
t, t.server.db, t.server.internalExecutor, &t.server.ClusterSettings().SV, ts)
}

func addEvents(t *testing.T, sqlDB *sqlutils.SQLRunner, startTs time.Time, num int) time.Time {
for i := 0; i < num; i++ {
startTs = startTs.Add(time.Second)
sqlDB.Exec(t,
`INSERT INTO system.eventlog (
"timestamp", "eventType", "targetID", "reportingID"
) VALUES ($1, $2, $3, $4)`,
startTs, fmt.Sprintf("test_event-%d", i), i, i,
)
}
return startTs
}

func TestSystemEventLogSink(t *testing.T) {
defer leaktest.AfterTest(t)()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
sqlDB := sqlutils.MakeSQLRunner(db)
ts := s.(*TestServer)
ctx := context.Background()
defer s.Stopper().Stop(ctx)

sink := &testLogSink{server: ts, db: db}

// Write some events to the event table.
startTs := timeutil.Now()
expectedEndTs := addEvents(t, sqlDB, startTs, 10)

// Scan all entries.
_ = sqlDB.Exec(t, "set cluster setting server.eventlogsink.max_entries=-1")

// Since 'set cluster setting' also logs to the events table,
// arrange only for 'test_event' events to be picked up.
_ = sqlDB.Exec(t, "set cluster setting server.eventlogsink.include_events='test_event'")

// Verify we have 10 events.
endTs, err := sink.doSink(startTs)
require.NoError(t, err)
require.Equal(t, expectedEndTs, endTs)
require.Equal(t, 10, len(sink.events))
require.False(t, sink.haveErr)

// Limit the number of events.
_ = sqlDB.Exec(t, "set cluster setting server.eventlogsink.max_entries=3")

// Verify we get 3 latest events and ignore 7 older ones.
endTs, err = sink.doSink(startTs)
require.NoError(t, err)
require.Equal(t, expectedEndTs, endTs)
require.False(t, sink.haveErr)
require.Equal(t, []string{"'test_event-7'", "'test_event-8'", "'test_event-9'"}, sink.events)

// Exclude test_event 7 & 9
_ = sqlDB.Exec(t, "set cluster setting server.eventlogsink.exclude_events='test_event-(7|9)'")
endTs, err = sink.doSink(startTs)
require.NoError(t, err)
require.Equal(t, []string{"'test_event-5'", "'test_event-6'", "'test_event-8'"}, sink.events)
require.Equal(t, startTs.Add(9*time.Second), endTs)
require.False(t, sink.haveErr)

// Query events starting from the previous endTs -- nothing should be returned.
oldEndTs := endTs
endTs, err = sink.doSink(oldEndTs)
require.NoError(t, err)
require.EqualValues(t, oldEndTs, endTs)
require.Nil(t, sink.events)

// Add a new event and make sure it is retrieved.
expectedEndTs = addEvents(t, sqlDB, endTs, 1)
endTs, err = sink.doSink(endTs)
require.NoError(t, err)
require.EqualValues(t, []string{"'test_event-0'"}, sink.events)
}

0 comments on commit 5c55957

Please sign in to comment.